pipes-bytestring-1.0.0
is complete, providing pipes
utilities for reading and writing effectul streams of ByteString
s. Most people who have been waiting on this library were mainly interested in the following four pipes:
stdin :: MonadIO m => Producer' ByteString m ()
stdout :: MonadIO m => Consumer' ByteString m ()
fromHandle :: MonadIO m => Handle -> Producer' ByteString m ()
toHandle :: MonadIO m => Handle -> Consumer' ByteString m r
However, I delayed releasing pipes-bytestring
for so long because there was a separate problem I wanted to solve first, which I like to call the Perfect Streaming Problem.
The Problem
Here is the statement of the Perfect Streaming Problem:
How do you partition an effectful stream into logical units while strictly enforcing an upper bound on memory use?
This problem will resonate with anybody who has ever built a streaming web service: how do you group your stream of bytes into logical units like files or messages? If you naively load each file or message into memory then a malicious input that is one extremely long file or message will take down your server by exhausting all available memory. Most web services work around this by imposing artificial limits on file or message length, or by writing brittle and non-compositional code or protocols that are difficult to understand, debug, and maintain.
However, we don't have to invoke web services to illustrate this problem. Just consider this simpler problem: how do you implement the Unix head
utility in Haskell so that it runs in constant space on any input, including infinite inputs without any line boundaries like /dev/zero
:
$ head < /dev/zero > /dev/null # This must run in constant space
Let's keep it simple and assume that our head
utility only needs to forward exactly 10 lines from stdin
and to stdout
.
Before pipes-bytestring
, the only way to do this was using lazy IO
, like this:
takeLines :: String -> String
takeLines n = unlines . take n . lines
main = do
str <- getContents
putStr (takeLines 10 str)
However, pipes
and other streaming libraries were built to replace lazy IO
so that Haskell programmers could easily reason about the order of effects and decouple them from Haskell's evaluation order. Yet, this simple head
challenge remains a perennial problem for streaming libraries. To illustrate why, let's consider how conduit
does this, using Data.Conduit.Text.lines
:
lines :: Monad m => Conduit Text m Text
This conduit receives a stream of Text
chunks of arbitrary size and outputs Text
chunks one line long. This sounds reasonable until you feed your program /dev/zero
. The lines
conduit would then attempt to load an infinitely large chunk of zeroes as the first output and exhaust all memory.
Michael Snoyman already realized this problem and added the linesBounded
conduit as a temporary workaround:
linesBounded :: MonadThrow m => Int -> Conduit Text m Text
linesBounded
throws an exception when given a line that exceeds the specified length. This is the same workaround that web services use: artificially limit your input stream. I wasn't satisfied with this solution, though. How can I claim that pipes
is a mature replacement for lazy IO
if lazy IO
soundly beats pipes
on something as simple as head
?
The Solution
I will introduce the solution by beginning from the final code:
Note: There was a last minute bug which I introduced in thelines
function before release. Usepipes-bytestring-1.0.1
, which fixes this bug, to run this example.
-- head.hs
import Pipes
import Pipes.ByteString
import Pipes.Parse (takeFree)
import Prelude hiding (lines, unlines)
takeLines
:: (Monad m)
=> Int
-> Producer ByteString m () -> Producer ByteString m ()
takeLines n = unlines . takeFree n . lines
main = runEffect $ takeLines 10 stdin >-> stdout
Compile and run this to verify that it takes the first ten lines of input for any file:
$ ./head < head.hs
-- head.hs
import Pipes
import Pipes.ByteString
import Pipes.Parse (takeFree)
import Prelude hiding (lines, unlines)
takeLines
:: (Monad m)
=> Int
... while still handling infinitely long lines in constant space:
$ ./head < /dev/zero >/dev/null # Runs forever in constant space
To see why this works, first take a look at the type of Pipes.ByteString.lines
:
lines
:: (Monad m)
=> Producer ByteString m ()
-> FreeT (Producer ByteString m) m ()
Now compare to that the type of Data.ByteString.Lazy.Char8.lines
:
lines :: ByteString -> [ByteString]
pipes
treats a Producer
of ByteString
s as the effectful analog of a lazy ByteString
:
-- '~' means "is analogous to"
Producer ByteString m () ~ ByteString
Similarly, pipes
also treats a FreeT
of Producer
s as the effectful analog of a list of lazy ByteString
s:
FreeT (Producer ByteString m) m () ~ [ByteString]
You can think of FreeT
as a "linked list" of zero or more Producer
s, where each Producer
's return value contains either the next Producer
or the final return value (()
in this case). So if a Producer
is analogous to a lazy ByteString
then a FreeT
-based "linked list" of Producer
s is analogous to a true linked list of lazy ByteString
s.
Each layer of our FreeT
is a Producer
that streams one line's worth of chunks. This differs from a single chunk one line long because it's still in Producer
form so we haven't actually read anything from the file yet. Also, FreeT
is smart and statically enforces that we cannot read lines from the next Producer
(i.e. the next line) until we finish the first line.
FreeT
has a very important property which other solutions do not have: we can use FreeT
to sub-divide the Producer
into logical units while still keeping the original chunking scheme. Once we have these nice logical boundaries we can manipulate the FreeT
using high-level list-like functions such as Pipes.Parse.takeFree
:
takeFree
:: (Functor f, Monad m)
=> Int -> FreeT f m () -> FreeT f m ()
takeFree
is the FreeT
analog of Prelude.take
. We keep the first three f
layers of the FreeT
and discard the rest. In this case our f
is (Producer ByteString m)
so if each Producer
represents one line then we can take a fixed number of lines.
This works because FreeT (Producer ByteString m) m ()
is just an ordinary Haskell data type. This data type doesn't contain any actual chunks. Instead, it is just a description of how we might traverse our input stream. When we call takeFree
on it we are just saying: "Now that I think about it, I don't intend to traverse as much of the input as I had originally planned".
unlines
completes the analogy, collapsing our logical units back down into an unannotated stream of bytes:
unlines
:: (Monad m)
=> FreeT (Producer ByteString m) m r
-> Producer ByteString m r
We can then combine lines
, takeFree
and unline
s to mirror the lazy IO
approach:
-- Lazy IO:
takeLines n = unlines . take n . lines
-- pipes:
takeLines n = unlines . takeFree n . lines
The main difference is that the intermediate types are larger because we're moving more information in to the type system and, more importantly, moving that same information out of implicit magic.
Conclusions
The next pipes
library in development is pipes-text
. The main development bottle-neck (besides my own graduation) is that the text
library does not export functions to partially decode incomplete ByteString
fragments as much as possible without throwing errors. If somebody were to write that up it would speed up the release of pipes-text
significantly.
As always, you can follow or contribute to pipes
development or just ask questions by joining the haskell-pipes mailing list.