This post assumes that you are familiar with both pipes and io-streams. If not, then you can read the pipes tutorial or the io-streams tutorial, both of which I wrote!
Edit: I also just discovered that when you re-implement io-streams using pipes, then pipes ties io-streams in performance. See this reddit comment for code and benchmarks.
Conversions
I'll begin with the following generic correspondence between io-streams and pipes:
{-# LANGUAGE RankNTypes #-} import Control.Proxy type InputStream a = forall p . (Proxy p) => () -> Session p IO (Maybe a) type OutputStream a = forall p . (Proxy p) => Maybe a -> Session p IO () makeInputStream :: IO (Maybe a) -> InputStream a makeInputStream m () = runIdentityP $ lift m makeOutputStream :: (Maybe a -> IO ()) -> OutputStream a makeOutputStream f a = runIdentityP $ lift (f a)For example:
stdin :: InputStream String stdin = makeInputStream (fmap Just getLine) stdout :: OutputStream String stdout = makeOutputStream (maybe (return ()) putStrLn)Notice how this does not convert the read and write actions to repetitive streams. Instead, you lift each action just once into the Proxy monad.
To read or write to these sources you just run the proxy monad:
import Prelude hiding (read) read :: InputStream a -> IO (Maybe a) read is = runProxy is {- This requires `pipes` HEAD, which generalizes the type of `runProxyK. Alternatively, you can hack around the current definition using: write ma os = runProxy (\() -> os ma) -} write :: Maybe a -> OutputStream a -> IO () write ma os = runProxyK os maFor example:
>>> read stdin Test<Enter> Just "Test" >>> write (Just "Test") stdout "Test"Now consider the following io-streams transformation:
map :: (a -> b) -> InputStream a -> IO (InputStream b) map f is = makeInputStream $ do ma <- read is return (fmap f ma)Here's the naive translation to pipes (later I will show the more elegant version):
import Prelude hiding (map) map :: (a -> b) -> InputStream a -> InputStream b map f is () = runIdentityP $ do ma <- is () return (fmap f ma) )For example:
>>> read (map (++ "!") stdin) Test<Enter> Just "Test!"The only real difference is that instead of using read is I just use is () directly.
So far I've used absolutely no pipes-specific features to implement any of this. I could have done this just as easily in any other library, including conduit.
Input streams
Now I will add a twist: we can reify transformations to be Consumers instead of functions of an InputStream.
For example, let's transform map to be a Consumer instead of a function:
map :: (Proxy p) => (a -> b) -> () -> Consumer p (Maybe a) IO (Maybe b) map f () = runIdentityP $ do ma <- request () return (fmap f ma)I've only made two changes:
- map is no longer a function of an InputStream
- Instead of calling the input stream, I use request ()
() -> Consumer p a m bAn input stream transformation is a fold in disguise! These transformations fold 0 or more values from the old input stream to return a new value.
Notice how this differs from the conventional flow of information pipes. We're not converting a stream of as into a new stream of bs. Instead, we're folding a stream of as into a single return value of type b.
How do we even connect map to stdin, though? They don't have the right types for ordinary pipe composition:
stdin :: (Proxy p) => () -> Session p IO (Maybe String) map length :: (Proxy p) => () -> Consumer p (Maybe String) IO (Maybe Int )We need some way to feed the return value of stdin into the upstream input of map. This means we need to call stdin once each time that map requests a new value and feed that into the request. How can we do that using the existing pipes API?
Fortunately, this is exactly the problem that the (\>\) composition operator solves:
>>> read (stdin \>\ map length) Test<Enter> Just 4Woah, slow down! What just happened?
The "request" category
The definition of (\>\) is actually really simple:
(f \>\ g) replaces all of g's requests with f.So when we wrote (stdin \>\ map length), we replaced the single request in map with stdin as if we had written it in by hand:
-- Before map length () = runIdentityP $ do ma <- request () return (fmap length ma) -- After (stdin \>\ map length) () = runIdentityP $ do ma <- stdin () return (fmap length ma)In fact, this behavior means that (\>\) and request form a category. (\>\) is the composition operation, request is the identity, and they satisfy the category laws:
-- Replacing each 'request' in 'f' with 'request' gives 'f' request \>\ f = f -- Replacing each 'request' in 'request' with 'f' gives 'f' f \>\ request = f -- Substitution of 'request's is associative (f \>\ g) \>\ h = f \>\ (g \>\ h)Let's take a closer look at the type of (\>\) to understand what is going on:
(\>\) :: (Monad m, ListT p) => (b' -> p a' a x' x m b) -> (c' -> p b' b x' x m c) -> (c' -> p a' a x' x m c)We can illuminate this type by type-restricting the arguments. First, let's consider the case where the first argument is a Session and the second argument is a Consumer, and see what type the compiler infers for the result:
(\>\) :: (Monad m, ListT p) => (() -> Session p m b) -> (() -> Consumer p b m c) -> (() -> Session p m c)The compiler infers that we must get back a readable Session, just like we wanted! This is precisely what happened when we composed stdin with map:
stdin :: () -> Session p IO (Maybe String) map length :: () -> Consumer p (Maybe String) IO (Maybe Int ) stdin \>\ map length :: () -> Session p IO (Maybe Int ) read (stdin \>\ map) :: IO (Maybe String)However, there's another way we can specialize the type of (\>\). We can instead restrict both arguments to be Consumers:
(\>\) :: (Monad m, ListT p) => (() -> Consumer p a m b) -> (() -> Consumer p b m c) -> (() -> Consumer p a m c)The compiler infers that if we compose two folds, we must get a new fold!
This means that we can compose input stream transformations using the same (\>\) operator:
map length :: () -> Consumer p (Maybe String) IO (Maybe Int ) map even :: () -> Consumer p (Maybe Int ) IO (Maybe Bool) map length \>\ map even :: () -> Consumer p (Maybe String) IO (Maybe Bool)Then we will use the exact same operator to connect our input stream:
stdin \>\ map length \>\ map even :: () -> Session p IO (Maybe Bool) read (stdin \>\ map length \>\ map even) :: IO (Maybe Bool)Let's try it!
>>> read (stdin \>\ map length \>\ map even) Test<Enter> Just TrueNow let's make things more interesting to really drive the point home:
import Control.Monad twoReqs :: (Proxy p) => () -> Consumer p (Maybe a) IO (Maybe [a]) twoReqs () = runIdentityP $ do mas <- replicateM 2 (request ()) return (sequence mas)
>>> read (stdin \>\ twoReqs \>\ twoReqs) :: IO [[String]] 1<Enter> 2<Enter> 3<Enter> 4<Enter> Just [["1","2"],["3","4"]] >>> -- We can still read from raw 'stdin' >>> read stdin 5<Enter> Just 5In other words, the "request" category is actually the category of InputStreams and pipes already has InputStream support today!
Output streams
Now let's do the exact same thing for OutputStreams! We'll begin with the filterOutput function from io-streams:
filterOutput :: (a -> Bool) -> OutputStream a -> IO (OutputStream a) filterOutput pred os = makeOutputStream $ \ma -> case ma of Nothing -> write ma os Just a | pred a -> write ma os | otherwise -> return ()The pipes equivalent is:
filterOutput :: (Proxy p) => (a -> Bool) -> Maybe a -> Producer p (Maybe a) IO () filterOutput pred ma = runIdentityP $ case ma of Nothing -> respond ma Just a | pred a -> respond ma | otherwise -> return ()This time we get a new shape for our type signature:
a -> Producer p b m ()An output stream transformation is an unfold in disguise! These transformations unfold each output destined for the old output stream into 0 or more outputs destined for the new output stream.
Let's hook this transformation up to stdout and start writing values to it ... oh wait. How do we hook it up? Well, let's look at the types of stdout and filterOutput:
filterOutput f :: (Proxy p) => Maybe a -> Producer p (Maybe a) IO () stdout :: (Proxy p) => Maybe a -> Session p IO ()We need some way to feed the stream output of filterOutput into the argument of stdout. This means that we need to call stdout on each output of filterOutput. How can we do that using the existing pipes API?
Why, we use (/>/), the dual of (\>\)!
>>> write (Just "") (filterOutput (not . null) />/ stdout) >>> write (Just "Test") (filterOutput (not . null) />/ stdout) Test
The "respond" category
(/>/) also has a simple behavior:
(f />/ g) replaces all of f's responds with g.This behavior means that (/>/) and respond form a category, too! (/>/) is the composition operation, respond is the identity, and they satisfy the category laws:
-- Replacing each 'respond' in 'f' with 'respond' gives 'f' f />/ respond = f -- Replacing each 'respond' in 'respond' with 'f' gives 'f' respond />/ f = f -- Substitution of 'respond's is associative (f />/ g) />/ h = f />/ (g />/ h)Let's learn more by reviewing the type of (/>/):
(/>/) :: (Monad m, ListT p) => (a -> p x' x b' b m a') -> (b -> p x' x c' c m b') -> (a -> p x' x c' c m a')We'll specialize this type signature in two ways. First, we'll consider the case where the first argument is a Producer and the second argument is a Session and then see what type the compiler infers for the result:
(/>/) :: (Monad m, ListT p) => (a -> Producer p b m ()) -> (b -> Session p m ()) -> (a -> Session p m ())The compiler infers that we must get back a writeable session, just like we wanted! This is precisely what happened when we composed filterOutput and stdout:
filterOutput (not . null) :: Maybe String -> Producer p (Maybe String) IO () stdout :: Maybe String -> Session p IO () filterOutput (not . null) />/ stdout :: Maybe String -> Session p IO () flip write (filterOutput (not . null) />/ stdout) :: Maybe String -> IO ()However, there's another way we can specialize the type of (/>/). We can instead restrict both arguments to be Producers:
(/>/) :: (Monad m, ListT p) => (a -> Producer p b m ()) -> (b -> Producer p c m ()) -> (a -> Producer p c m ())The compiler infers that if we compose two unfolds, we must get a new unfold!
This means that we can compose output stream transformations using (/>/). To show this, let's define the equivalent of io-stream's contramap:
contramap :: (Proxy p) => (a -> b) -> Maybe a -> Producer p (Maybe b) IO () contramap f ma = runIdentityP $ respond (fmap f ma)We can connect contramap's using (/>/):
contramap even :: Maybe Int -> Producer p (Maybe Bool ) IO () contramap show :: Maybe Bool -> Producer p (Maybe String) IO () contramap even />/ contramap show :: Maybe Int -> Producer p (Maybe String) IO ()... then using the exact same operator we connect them to stdout:
contramap even />/ contramap show />/ stdout :: Maybe Int -> Session p IO () flip write (contramap even />/ contramap show />/ stdout) :: Maybe Int -> IO ()Let's try it!
>>> write (Just 4) (contramap even />/ contramap show />/ stdout) TrueNow let's make things more interesting to really drive the point home:
hiBye :: (Proxy p) => Maybe String -> Producer p (Maybe String) IO () hiBye mstr = runIdentityP $ do respond $ fmap ("Hello, " ++) mstr respond $ fmap ("Goodbye, " ++) mstr
>>> write (Just "world") (hiBye />/ hiBye />/ stdout) Hello, Hello, world Goodbye, Hello, world Hello, Goodbye, world Goodbye, Goodbye, world >>> -- We can still write to raw 'stdout' >>> write (Just "Haskell") stdout HaskellIn other words, the "respond" category is actually the category of OutputStreams and pipes already has OutputStream support today!
Unify input and output streams
We can do much more than just implement the io-streams API, though! We can generalize it in many more powerful ways than io-streams permits.
First off, I'm going to inline the InputStream and OutputStream types into the type signatures of read and write:
read :: (forall p . (Proxy p) => () -> Session p IO (Maybe a)) -> IO (Maybe a) write :: Maybe a -> (forall p . (Proxy p) => Maybe a -> Session p IO ()) -> IO ()Then I will generalize the types to not be Maybe-specific and flip the arguments for write
read :: (forall p . (Proxy p) => () -> Session p IO a) -> IO a write :: (forall p . (Proxy p) => a -> Session p IO ()) -> a -> IO ()Now, I will generalize both read and write by adding bidirectionality to them. read will now accept an argument parametrizing its request and write can now optionally receive a result.
read :: (forall p . (Proxy p) => a -> Session p IO b) -> a -> IO b read is = runProxyK is write :: (forall p . (Proxy p) => a -> Session p IO b) -> a -> IO b write is = runProxyK isWell, would you look at that! They both have the same type and implementation! In other words, we can unify read and write into a single function:
once :: (forall p . (Proxy p) => a -> Session p IO b) -> a -> IO b once is = runProxyK isAlso, once is really polymorphic over the base monad:
once :: (Monad m) => (forall p . (ListT p) => a -> Session p m b) -> a -> m b once is = runProxyK isUsing this function, we can both read from InputStreams and write to OutputStreams:
>>> once (stdin \>\ map length) () Test<Enter> Just 4 >>> once (contramap show />/ stdout) (Just 1) 1Let's test the bidirectionality. I'll write a new stdin that accepts a count parameter telling how many lines to get. This time, there will be no Maybes:
stdinN :: (Proxy p) => Int -> Session p IO [String] stdinN n = runIdentityP $ replicateM n $ lift getLineFirst, briefly test it:
>>> once stdinN 3 Test Apple 123 ["Test","Apple","123"]Similarly, let's define a transformation to automate supplying values to stdinN:
automate :: (Proxy p) => () -> Client p Int [String] IO () automate () = runIdentityP $ do lift $ putStrLn "First batch:" xs <- request 2 lift $ print xs lift $ putStrLn "Second batch:" ys <- request 2 lift $ print ysNow compose!
>>> once (stdinN \>\ automate) () First batch: 1<Enter> 2<Enter> ["1","2"] Second batch: 3<Enter> 4<Enter> ["3","4"] >>> -- Go back to using 'stdinN' unfiltered >>> once stdinN 1 5<Enter> ["5"]We can similarly unify makeInputStream and makeOutputStream:
make :: (Monad m, Proxy p) => (q -> m r) -> q -> p a' a b' b m r make f a = runIdentityP (lift (f a))The only difference between an input stream and an output stream is whether it consumes or produces a value and bidirectional streams blur the line even further.
Purity
None of this machinery is specific to the IO monad. Everything I've introduced is polymorphic over the base monad! We only incur IO if any of our stages use IO. This departs greatly from io-streams, where you are trapped in the IO monad to do everything. However, I want to qualify that statement with a caveat: I expect that pipes-parse will require either IO or an ST base monad (your choice) to properly manage pushback like io-streams does. Even still, you only pay this price if you actually need it.
pipes improves on purity in two other ways. First, building input and output streams is pure when you use pipes:
-- pipes makeInputStream :: IO (Maybe a) -> InputStream a makeOutputStream :: (Maybe a -> IO ()) -> OutputStream a -- io-streams makeInputStream :: IO (Maybe a) -> IO (InputStream a) makeOutputStream :: (Maybe a -> IO ()) -> IO (OutputStream a)Consequently, io-streams must use unsafePerformIO for its stdin:
stdin :: InputStream ByteString stdin = unsafePerformIO $ handleToInputStream IO.stdin >>= lockingInputStreamSecond, composing and transforming input streams is pure when you use pipes:
>>> -- pipes >>> read (stdin \>\ map length \>\ map even) ... >>> -- io-streams >>> is <- (map even <=< map length) stdin >>> read is ...... and if none of the stages use IO, then the entire operation is pure!
Extra interfaces
You can generalize the type signatures even further. Let's revisit the types of (\>\) and (/>/):
(\>\) :: (Monad m, ListT p) => (b' -> p a' a x' x m b) -> (c' -> p b' b x' x m c) -> (c' -> p a' a x' x m c) (/>/) :: (Monad m, ListT p) => (a -> p x' x b' b m a') -> (b -> p x' x c' c m b') -> (a -> p x' x c' c m a')So far we've been ignoring the x'/x interfaces entirely, treating them as closed interfaces. However, there's no reason we have to! If we open up this interface, then every processing stage that we connect can read and write to this streaming interface.
In a future post, I will show you how you can really elegantly use this trick to generalize Hutton-Meijer parsers to permit effects.
ListT
The people who read the pipes-3.2 announcement post know that the (\>\) and (/>/) composition operators correspond to (>=>) for the two pipes ListT monads.
This means that you can take any output stream transformation and assemble more sophisticated behaviors using do notation:
pairs :: (ListT p) => () -> Producer p (Maybe (String, String)) IO () pairs () = runRespondT $ do mstr1 <- RespondT $ hiBye (Just "world") mstr2 <- RespondT $ hiBye (Just "Haskell") return $ (,) <$> mstr1 <*> mstr2These behave just like the list monad, non-deterministically selecting all possible paths:
>>> once (pairs />/ contramap show />/ stdout) () ("Hello, world","Hello, Haskell") ("Hello, world","Goodbye, Haskell") ("Goodbye, world","Hello, Haskell") ("Goodbye, world","Goodbye, Haskell")The exact same trick works for input stream transformations, too, but it's less useful because we don't send information upstream as frequently as downstream.
Conclusions
One of the common questions I often get was "How do I escape from the pipe monad?" and I wrote this post to demonstrate how to do so. You can use pipes to program exactly in the same style as io-streams. The only thing missing is a standard library of io-streams-like utilities.
So far, I have not touched on the issue of push-back, a feature which io-streams provides. pipes-parse will build on the ideas I've introduced in this post to provide an io-streams-style interface to parsing that includes push-back functionality. This will make it easy to dynamically add or remove processing stages throughout the parsing process, which is necessary for parsing HTTP. In fact, everything I've presented here fell out very naturally from my work on pipes-parse and I only later discovered that I had accidentally reinvented io-streams entirely within pipes.
So I will cautiously state that I believe pipes is "io-streams done right". Everything that Greg has been doing with io-streams matches quite nicely with the pre-existing "request" and "respond" pipes categories that I independently discovered within pipes. This indicates that Greg was really onto something and pipes provides the elegant theoretical foundation for his work.
Gabriel,
ReplyDeleteAny chance of an article for the Monad Reader, or a submission to the Haskell Symposium perhaps?
Yeah, I'm aiming for a Haskell symposium submission.
DeleteExcellent! I shall hope to review it!
Delete