Begin with the tutorial if you want to learn how to use the library. This post mainly highlights some features and compares the pipes-concurrency approach to other libraries. Also, I'll provide some bonus examples that are not in the tutorial.
Before I continue, I want to credit Eric Jones, who first began the library as pipes-stm on Github. Unfortunately, I lost all contact with him and he didn't include a LICENSE in his repository, so I had to rebuild the library from scratch because I wasn't sure if a fork would constitute copyright infringement. If he reads this and gets in touch with me and approves of the BSD license then I will add him to the LICENSE and also add him as a library author.
Reactive programming
Several people are beginning to realize that streaming libraries overlap substantially with reactive programming frameworks. pipes-concurrency provides the basic building blocks necessary to build reactive programming abstractions.
For example, let's say that I want to simulate reactive-banana's Events using pipes-concurrency:
{-# LANGUAGE RankNTypes #-} import Control.Proxy -- `pipes` does not need the `t` parameter from reactive-banana type Event a = forall p . (Proxy p) => () -> Producer p a IO ()If you want to take the union of two asynchronous streams, you spawn a mailbox, merge two streams into it using sendD, and then read out the results using recvS:
import Control.Monad import Control.Concurrent.Async import Control.Proxy.Concurrent union :: Event a -> Event a -> Event a union e1 e2 () = runIdentityP $ do (input, output) <- lift $ spawn Unbounded as <- lift $ forM [e1, e2] $ \e -> async $ do runProxy $ e >-> sendD input performGC recvS output () lift $ mapM_ wait asNow we can define two event sources:
clock :: Event String clock = fromListS (cycle ["Tick", "Tock"]) >-> execD (threadDelay 1000000) user :: Event String user = stdinS... and cleanly merge them:
main = runProxy $ union clock user >-> takeWhileD (/= "quit") >-> stdoutD
$ ./union Tick Tock test<Enter> test Tick Tock quit<Enter> $People often tout spreadsheets as the classic example of functional-reactive programming, so why not simulate that, too? Well, a spreadsheet cell is just a non-empty stream of values:
{-# LANGUAGE PolymorphicComponents #-} import Control.Proxy data Cell a = Cell { initial :: a , stream :: forall p . (Proxy p) => () -> Producer p a IO () } runCell :: (Proxy p) => Cell a -> () -> Producer p a IO () runCell (Cell a ga) () = runIdentityP $ do respond a ga ()Each value in the stream represents an update to the cell's contents, either by the user:
input :: Cell String input = Cell "" stdinS... or some data source:
time :: Cell Int time = Cell 0 $ \() -> evalStateP 1 $ forever $ do n <- get respond n lift $ threadDelay 1000000 put (n + 1)Spreadsheet cells are both Functors and Applicatives:
instance Functor Cell where fmap f (Cell x gx) = Cell (f x) (gx >-> mapD f) instance Applicative Cell where pure a = Cell a (runIdentityK return) (Cell f0 gf) <*> (Cell x0 gx) = Cell (f0 x0) $ \() -> runIdentityP $ do (input, output) <- lift $ spawn Unbounded lift $ do a1 <- async $ runProxy $ gf >-> mapD Left >-> sendD input a2 <- async $ runProxy $ gx >-> mapD Right >-> sendD input link2 a1 a2 link a1 (recvS output >-> handler) () where handler () = evalStateP (f0, x0) $ forever $ do e <- request () (f, x) <- get case e of Left f' -> do put (f', x) respond (f' x) Right x' -> do put (f, x') respond (f x')... so we can use Applicative style to combine spreadsheet cells, which will only update when their dependencies update:
both :: Cell (Int, String) both = (,) <$> time <*> input main = runProxy $ runCell both >-> printD
$ ./cell (0,"") (1,"") (2,"") test<Enter> (2,"test") (3,"test") apple<Enter> (3,"apple") (4,"apple") (5,"apple") ...
Simple API
pipes-concurrency has a really, really, really simple API, and the three key functions are:
spawn :: Size -> IO (Input a, Output a) send :: Input a -> a -> STM Bool recv :: Output a -> STM (Maybe a)The spawn function creates a FIFO channel, send adds messages to the channel, and recv takes messages off the channel. That's it! The rest of the library are the following two higher-level pipes that build on those two functions to stream messages into and out of the channel:
sendD :: Proxy p => Input a -> x -> p x a x a IO () recvS :: Proxy p => Output a -> () -> Producer p a IO ()The library only has five functions total, making it very easy to learn.
Deadlock safety
What distinguishes this abstraction from traditional STM channels is that send and recv hook into the garbage collection system to automatically detect and avoid deadlocks. If they detect a deadlock they just terminate cleanly instead.
Surprisingly, this works so well that it even correctly handles crazy scenarios like cyclic graphs. For example, the run-time system magically ties the knot in the following example and both pipelines successfully terminate and get garbage collected:
import Control.Concurrent.Async import Control.Proxy import Control.Proxy.Concurrent main = do (in1, out1) <- spawn Unbounded (in2, out2) <- spawn Unbounded a1 <- async $ do runProxy $ recvS out1 >-> sendD in2 performGC a2 <- async $ do runProxy $ recvS out2 >-> sendD in1 performGC mapM_ wait [a1, a2]I don't even know why the above example works, to be completely honest. I really only designed pipes-concurrency to avoid deadlocks for acyclic graphs and the above was just a really nice emergent behavior that fell out of the implementation. I think this is an excellent tribute to how amazing ghc is, and I want to give a big shout-out to all the people who contribute to it.
I call this "semi-automatic" reference management because you must still call the garbage collector manually after you stop using each reference, otherwise you cannot guarantee promptly releasing the reference. However, even if you forget to do this, all that happens is that it just delays stream termination until the next garbage collection cycle.
Severability
I designed the API so that if any other streaming library wants to use it I can cleanly separate out the pipes-agnostic part, consisting of spawn, send, and recv. If you want to build on this neat deadlock-free abstraction with, say, conduit or io-streams, then just let me know and I will factor those functions out into their own library.
Comparisons
People might wonder how pipes-concurrency compares to the stm-conduit and io-streams approaches for managing concurrency. Before I continue I just want to point out that I contributed some of the concurrency code to io-streams, so I am at fault for some of its current weaknesses. One of the reasons I made the pipes-concurrency functionality severable was so that io-streams could optionally merge in this same feature to fix some of the concurrency issues that I couldn't resolve the first time around.
pipes-concurrency does several things that are very unique in the streaming concurrency space, including:
- Dynamic communication graphs with semi-automatic reference management
- Correctly handling multiple readers and writers on the same resource
- Deadlock safety (as mentioned above)
- Exception safety (by virtue of deadlock safety)
Conclusion
There are still more features that I haven't even mentioned, so I highly recommend you read the tutorial to learn other cool tricks you can do with the library.
For people following the pipes ecosystem, the next library coming up is pipes-parse which is getting very close to completion, although the version currently on Github is stale and doesn't reflect the current state of the project. Expect to see some very cool and unique features when it comes out, which should be within the next two weeks.
I see this pattern is used regularly, why it isn't wrapped in a some handy shortcut?
ReplyDelete> run pipe = async (runProxy pipe >> performGC)
There are two main reasons:
Delete* To avoid an `async` dependency
* There may be more run functions than just `runProxy` (i.e. `runEitherK`, `evalStateK`), especially if using `pipes-safe`.
Hitting it out of the park, once again! Your libraries alone demonstrate Haskell's importance and value for front-line development.
ReplyDelete