I receive repeated questions for how to implement a
Handle-like API similar to
io-streams using
pipes even after I outlined how to do this
about a year ago. However, I don't blame these people because that tutorial uses an out-of-date (and uglier)
pipes-3.2 API and also takes an approach that I felt was a bit inconsistent and missed the mark, so this is a rewrite of the original post, both to upgrade the code to the latest
pipes-4.1 API and to make some key changes to the original idioms to improve conceptual consistency.
I usually find that the best way to introduce this
Handle-like intuition for
pipes is to make an analogy to
io-streams, so this post assumes that you are familiar with both
pipes and
io-streams. If not, then you can read the
pipes tutorial or the
io-streams tutorial, both of which I wrote! I want to really emphasize that I mainly discuss
io-streams as a stepping stone for understanding the equivalent
pipes abstractions, but the purpose is not to reimplement
io-streams in
pipes. Rather I want to introduce a more general
IO-free abstraction that just happens to overlap a bit with
io-streams.
Types
I'll begin with the correspondence between
io-streams and
pipes types:
import Pipes
type InputStream a = Effect IO (Maybe a)
type OutputStream a = Maybe a -> Effect IO ()
makeInputStream :: IO (Maybe a) -> InputStream a
makeInputStream = lift
makeOutputStream :: (Maybe a -> IO ()) -> OutputStream a
makeOutputStream f = lift . f
For example, the
pipes analogs of
stdin and
stdout from
io-streams would be:
import qualified Data.ByteString as B
import qualified System.IO as IO
import qualified Data.Foldable as F
stdin :: InputStream B.ByteString
stdin = makeInputStream $ do
bs <- B.hGetSome IO.stdin 4096
return (if B.null bs then Nothing else Just bs)
stdout :: OutputStream B.ByteString
stdout = makeOutputStream (F.mapM_ B.putStr)
Notice how this does
not convert the read and write actions to repetitive streams. Instead, you just trivially wrap them using
lift, promoting them to
Effect. However, for the rest of this post I will use
Effect instead of
InputStream and
OutputStream to avoid confusion between the two libraries.
Also, the
pipes implementations of
makeInputStream and
makeOutputStream are pure, whereas their
io-streams counterparts require
IO. In fact,
io-streams fudges a little bit and implements
stdin and
stdout in terms of
unsafePerformIO, otherwise they would have these types:
stdin :: IO (InputStream ByteString)
stdout :: IO (OutputStream ByteString)
io-streams is tightly integrated with
IO (thus the name), but I believe we can do better.
Reading/Writing
The
pipes analogs of the
io-streams read and
write commands are
await and
yield:
import Prelude hiding (read)
read :: Monad m => Consumer (Maybe a) m (Maybe a)
read = await
write :: Monad m => Maybe a -> Producer (Maybe a) m ()
write = yield
Here are examples of how you might use them:
import Control.Applicative
import Data.Monoid
-- `Consumer (Maybe a) m (Maybe b)` is the analog of
-- `InputStream a -> IO (InputStream b)`
combine
:: Monad m
=> Consumer (Maybe B.ByteString) m (Maybe B.ByteString)
combine = do
mstr1 <- read
mstr2 <- read
return $ liftA2 (<>) mstr1 mstr2
-- `Maybe a -> Producer (Maybe b) m ()` is the analog of
-- `OutputStream b -> IO (OutputStream a)`
duplicate :: Monad m => Maybe a -> Producer (Maybe a) m ()
duplicate ma = do
write ma
write ma
Compare to
io-streams:
combine :: InputStream B.ByteString -> IO (Maybe B.ByteString)
combine is = do
mbs1 <- read is
mbs2 <- read is
return $ liftA2 (<>) mbs1 mbs2
duplicate :: OutputStream a -> a -> IO ()
duplicate os a = do
write a os
write a os
Unlike
io-streams,
pipes does not need to specify the upstream source for
read commands. This is because we can supply
read with an input stream using
(>~):
>>> runEffect $ stdin >~ read
Test<Enter>
Just "Test\n"
>>> runEffect $ stdin >~ combine
Hello<Enter>
world!<Enter>
Just "Hello\nworld!\n"
In fact, the
read command in the first example is totally optional, because
read is just a synonym for
await, and one of the
pipes laws states that:
f >~ await = f
Therefore, we could instead just write:
>>> runEffect stdin
Test<Enter>
Just "Test\n"
Dually, we don't need to specify the downstream source for
write. This is because you connect
write with an output stream using
(~>)
>>> :set -XOverloadedStrings
>>> runEffect $ (write ~> stdout) (Just "Test\n")
Test
>>> runEffect $ (duplicate ~> stdout) (Just "Test\n")
Test
Test
Again, the
write in the first example is optional, because
write is just a synonym for
yield and one of the
pipes laws states that:
yield ~> f = f
Therefore, we can simplify it to:
>>> runEffect $ stdout (Just "Test")
Test
Mixed reads and writes
Like
io-streams, we do not need to specify the entire streaming computation up front. We can step these input and output streams one value at a time using
runEffect instead of consuming them all in one go. However, you lose no power and gain a lot of elegance by writing everything entirely within
pipes.
For example, just like
io-streams, you can freely mix
reads and
writes from multiple diverse sources:
inputStream1 :: Effect IO (Maybe A)
inputStream2 :: Effect IO (Maybe B)
mixRead :: Effect IO (Maybe A, Maybe B)
mixRead = do
ma <- inputStream1 >~ read
mb <- inputStream2 >~ read
return (ma, mb)
outputStream1 :: Maybe a -> Effect IO ()
outputStream2 :: Maybe a -> Effect IO ()
mixWrite :: Maybe a -> Effect IO ()
mixWrite ma = do
(write ~> outputStream1) ma
(write ~> outputStream2) ma
Oh, wait! Didn't we just get through saying that you can simplify these?
-- Reminder:
-- f >~ read = f
-- write ~> f = f
mixRead :: Effect IO (Maybe A, Maybe B)
mixRead = do
ma <- inputStream1
mb <- inputStream2
return (ma, mb)
mixWrite :: Maybe a -> Effect IO ()
mixWrite ma = do
outputStream1 ma
outputStream2 ma
In other words, if we stay within
pipes we can read and write from input and output streams just by directly calling them. Unlike
io-streams, we don't need to use
read and
write when we select a specific stream. We only require
read and
write if we wish to inject the input or output stream later on.
Transformations
What's neat about the
pipes idioms is that you use the exact same API to implement and connect transformations on streams. For example, here's how you implement the
map function from
io-streams using
pipes:
map :: Monad m => (a -> b) -> Consumer (Maybe a) m (Maybe b)
map f = do
ma <- read
return (fmap f ma)
Compare that to the
io-streams implementation of
map:
map :: (a -> b) -> InputStream a -> IO (InputStream b)
map f is = makeInputStream $ do
ma <- read is
return (fmap f ma)
Some differences stand out:
- The pipes version does not need to explicitly thread the input stream
- The pipes version does not require makeInputStream
- The pipes transformation is pure (meaning no IO)
Now watch how we apply this transformation:
>>> runEffect $ stdin >~ map (<> "!")
Test<Enter>
Just "Test!\n"
The syntax for implementing and connecting
map is completely indistinguishable from the syntax we used in the previous section to implement and connect
combine. And why should there be a difference? Both of them read from a source and return a derived value. Yet,
io-streams distinguishes between things that transform input streams and things that read from input streams, both in the types and implementation:
iReadFromAnInputStream :: InputStream a -> IO b
iReadFromAnInputStream is = do
ma <- read is
...
iTransformAnInputStream :: InputStream a -> IO (InputStream b)
iTransformAnInputStream is = makeInputStream $ do
ma <- read is
...
This means that
io-streams code cannot reuse readers as input stream transformations or, vice versa, reuse input stream transformations as readers.
As you might have guessed, the dual property holds for writers and output stream transformations. In
pipes, we model both of these using a
Producer Kleisli arrow:
contramap
:: Monad m => (a -> b) -> Maybe a -> Producer (Maybe b) m ()
contramap f ma = yield (fmap f ma)
... and we connect them the same way:
>>> runEffect $ (contramap (<> "!\n") ~> stdout) (Just "Test")
Test!
... whereas
io-streams distinguishes these two concepts:
iWriteToAnOutputStream :: OutputStream b -> Maybe a -> IO ()
iWriteToAnOutputStream os ma = do
write os ma
...
iTransformAnOutputStream :: OutputStream b -> IO (OutputStream a)
iTransformAnOutputStream os = makeOutputStream $ \ma -> do
write os ma
...
End of input
None of this machinery is specific to the
Maybe type. I only used
Maybe for analogy with
io-streams, which uses
Maybe to signal end of input. For the rest of this post I will just drop the
Maybes entirely to simplify the type signatures, meaning that I will model the relevant types as:
inputStream :: Effect m a
outputStream :: a -> Effect m ()
inputStreamTransformation :: Consumer a m b
outputStreamTransformation :: a -> Producer b m ()
This is also the approach that my upcoming
pipes-streams library will take, because
pipes does not use
Maybe to signal end of input. Instead, if you have a finite input you represent that input as a
Producer:
finiteInput :: Producer a m ()
What's great is that you can still connect this to an output stream, using
for:
for finiteInput outputStream :: Effect m ()
Dually, you model a finite output as a
Consumer:
finiteOutput :: Consumer a m ()
... and you can connect that to an input stream using
(>~):
inputStream >~ finiteOutput :: Effect m ()
These provide a convenient bridge between classic
pipes abstractions and handle-like streams.
Polymorphism
Interestingly, we can use the same
(>~) operator to:
- connect input streams to input stream transformations, and:
- connect input stream transformations to each other.
This is because
(>~) has a highly polymorphic type which you can specialize down to two separate
types:
-- Connect an input stream to an input stream transformation,
-- returning a new input stream
(>~) :: Monad m
=> Effect m a
-> Consumer a m b
-> Effect m b
-- Connect two input stream transformations together,
-- returning a new input stream transformation
(>~) :: Monad m
=> Consumer a m b
-> Consumer b m c
-> Consumer a m c
Dually, we can use the same
(~>) operator to:
- connect output stream transformations to output streams, and:
- connect output stream transformations to each other.
Again, this is because
(~>) has a highly polymorphic type which you can specialize down to two separate types:
-- Connect an output stream transformation to an output stream,
-- returning a new output stream
(~>) :: Monad m
=> (a -> Producer b m ())
-> (b -> Effect m ())
-> (a -> Effect m ())
-- Connect two output stream transformations together,
-- returning a new output stream transformation
(~>) :: Monad m
=> (a -> Producer b m ())
-> (b -> Producer c m ())
-> (a -> Producer c m ())
In fact, you will see these specialized type signatures in the documentation for
(>~) for
(~>). Hopefully, the analogy to input streams and output streams will help you see those types in a new light.
Categories
People familiar with
pipes know that
await/read and
(>~) form a category, so we get three equations for free from the three category laws:
-- Reading something = calling it
read >~ f = f
-- `read` is the identity input stream transformation
f >~ read
-- It doesn't matter how you group sinks/transformations
(f >~ g) >~ h = f >~ (g >~ h)
Dually,
yield/write and
(~>) form a category, so we get another three equations for free:
-- Writing something = calling it
f >~ write = f
-- `write` is the identity output stream transformation
write >~ f
-- It doesn't matter how you group sources/transformations
(f ~> g) ~> h = f ~> (g ~> h)
Those are really nice properties to use when equationally reasoning about handles and as we've already seen they allow you to simplify your code in unexpected ways.
Bidirectionality
But wait, there's more!
pipes actually has a fully bidirectional core in the
Pipes.Core module. You can generalize everything above to use this bidirectional interface with surprising results.
For example, in the bidirectional version of
pipes-based streams, you can have streams that both accept input and produce output upon each step:
inputOutputStream :: a -> Effect m b
You also get the following generalized versions of all commands and operators:
- request generalizes read, accepting an argument for upstream
- respond generalizes write, returning a value from downstream
- (\>\) generalizes (>~)
- (/>/) generalizes (~>)
I will leave that as a teaser for now and I will discuss this in greater detail once I finish writing a new tutorial for the underlying bidirectional implementation.
Conclusion
pipes provides an elegant way to model
Handle-like abstractions in a very compact way that requires no new extensions to the API. I will soon provide a
pipes-streams library that implements utilities along these lines to further solidify these concepts.
One thing I hope people take away from this is that a
Handle-based API need not be deeply intertwined with
IO. All the commands and connectives from
pipes-based handles are completely independent of the base monad and you can freely generalize the
pipes notion of handles to pure monads.