pipes-bytestring-1.0.0
is complete, providing pipes
utilities for reading and writing effectul streams of ByteString
s. Most people who have been waiting on this library were mainly interested in the following four pipes:
stdin :: MonadIO m => Producer' ByteString m ()
stdout :: MonadIO m => Consumer' ByteString m ()
fromHandle :: MonadIO m => Handle -> Producer' ByteString m ()
toHandle :: MonadIO m => Handle -> Consumer' ByteString m r
However, I delayed releasing pipes-bytestring
for so long because there was a separate problem I wanted to solve first, which I like to call the Perfect Streaming Problem.
The Problem
Here is the statement of the Perfect Streaming Problem:
How do you partition an effectful stream into logical units while strictly enforcing an upper bound on memory use?
This problem will resonate with anybody who has ever built a streaming web service: how do you group your stream of bytes into logical units like files or messages? If you naively load each file or message into memory then a malicious input that is one extremely long file or message will take down your server by exhausting all available memory. Most web services work around this by imposing artificial limits on file or message length, or by writing brittle and non-compositional code or protocols that are difficult to understand, debug, and maintain.
However, we don't have to invoke web services to illustrate this problem. Just consider this simpler problem: how do you implement the Unix head
utility in Haskell so that it runs in constant space on any input, including infinite inputs without any line boundaries like /dev/zero
:
$ head < /dev/zero > /dev/null # This must run in constant space
Let's keep it simple and assume that our head
utility only needs to forward exactly 10 lines from stdin
and to stdout
.
Before pipes-bytestring
, the only way to do this was using lazy IO
, like this:
takeLines :: String -> String
takeLines n = unlines . take n . lines
main = do
str <- getContents
putStr (takeLines 10 str)
However, pipes
and other streaming libraries were built to replace lazy IO
so that Haskell programmers could easily reason about the order of effects and decouple them from Haskell's evaluation order. Yet, this simple head
challenge remains a perennial problem for streaming libraries. To illustrate why, let's consider how conduit
does this, using Data.Conduit.Text.lines
:
lines :: Monad m => Conduit Text m Text
This conduit receives a stream of Text
chunks of arbitrary size and outputs Text
chunks one line long. This sounds reasonable until you feed your program /dev/zero
. The lines
conduit would then attempt to load an infinitely large chunk of zeroes as the first output and exhaust all memory.
Michael Snoyman already realized this problem and added the linesBounded
conduit as a temporary workaround:
linesBounded :: MonadThrow m => Int -> Conduit Text m Text
linesBounded
throws an exception when given a line that exceeds the specified length. This is the same workaround that web services use: artificially limit your input stream. I wasn't satisfied with this solution, though. How can I claim that pipes
is a mature replacement for lazy IO
if lazy IO
soundly beats pipes
on something as simple as head
?
The Solution
I will introduce the solution by beginning from the final code:
Note: There was a last minute bug which I introduced in thelines
function before release. Usepipes-bytestring-1.0.1
, which fixes this bug, to run this example.
-- head.hs
import Pipes
import Pipes.ByteString
import Pipes.Parse (takeFree)
import Prelude hiding (lines, unlines)
takeLines
:: (Monad m)
=> Int
-> Producer ByteString m () -> Producer ByteString m ()
takeLines n = unlines . takeFree n . lines
main = runEffect $ takeLines 10 stdin >-> stdout
Compile and run this to verify that it takes the first ten lines of input for any file:
$ ./head < head.hs
-- head.hs
import Pipes
import Pipes.ByteString
import Pipes.Parse (takeFree)
import Prelude hiding (lines, unlines)
takeLines
:: (Monad m)
=> Int
... while still handling infinitely long lines in constant space:
$ ./head < /dev/zero >/dev/null # Runs forever in constant space
To see why this works, first take a look at the type of Pipes.ByteString.lines
:
lines
:: (Monad m)
=> Producer ByteString m ()
-> FreeT (Producer ByteString m) m ()
Now compare to that the type of Data.ByteString.Lazy.Char8.lines
:
lines :: ByteString -> [ByteString]
pipes
treats a Producer
of ByteString
s as the effectful analog of a lazy ByteString
:
-- '~' means "is analogous to"
Producer ByteString m () ~ ByteString
Similarly, pipes
also treats a FreeT
of Producer
s as the effectful analog of a list of lazy ByteString
s:
FreeT (Producer ByteString m) m () ~ [ByteString]
You can think of FreeT
as a "linked list" of zero or more Producer
s, where each Producer
's return value contains either the next Producer
or the final return value (()
in this case). So if a Producer
is analogous to a lazy ByteString
then a FreeT
-based "linked list" of Producer
s is analogous to a true linked list of lazy ByteString
s.
Each layer of our FreeT
is a Producer
that streams one line's worth of chunks. This differs from a single chunk one line long because it's still in Producer
form so we haven't actually read anything from the file yet. Also, FreeT
is smart and statically enforces that we cannot read lines from the next Producer
(i.e. the next line) until we finish the first line.
FreeT
has a very important property which other solutions do not have: we can use FreeT
to sub-divide the Producer
into logical units while still keeping the original chunking scheme. Once we have these nice logical boundaries we can manipulate the FreeT
using high-level list-like functions such as Pipes.Parse.takeFree
:
takeFree
:: (Functor f, Monad m)
=> Int -> FreeT f m () -> FreeT f m ()
takeFree
is the FreeT
analog of Prelude.take
. We keep the first three f
layers of the FreeT
and discard the rest. In this case our f
is (Producer ByteString m)
so if each Producer
represents one line then we can take a fixed number of lines.
This works because FreeT (Producer ByteString m) m ()
is just an ordinary Haskell data type. This data type doesn't contain any actual chunks. Instead, it is just a description of how we might traverse our input stream. When we call takeFree
on it we are just saying: "Now that I think about it, I don't intend to traverse as much of the input as I had originally planned".
unlines
completes the analogy, collapsing our logical units back down into an unannotated stream of bytes:
unlines
:: (Monad m)
=> FreeT (Producer ByteString m) m r
-> Producer ByteString m r
We can then combine lines
, takeFree
and unline
s to mirror the lazy IO
approach:
-- Lazy IO:
takeLines n = unlines . take n . lines
-- pipes:
takeLines n = unlines . takeFree n . lines
The main difference is that the intermediate types are larger because we're moving more information in to the type system and, more importantly, moving that same information out of implicit magic.
Conclusions
The next pipes
library in development is pipes-text
. The main development bottle-neck (besides my own graduation) is that the text
library does not export functions to partially decode incomplete ByteString
fragments as much as possible without throwing errors. If somebody were to write that up it would speed up the release of pipes-text
significantly.
As always, you can follow or contribute to pipes
development or just ask questions by joining the haskell-pipes mailing list.
Gabriel, wouldn't the `FreeT` trick (which is great, by the way, I love the idea) work just as well for any other streaming framework, like Conduit or io-streams?
ReplyDeleteFor `conduit`, yes. For, `io-streams` you would need to modify input streams to return `Either r a` (where `r` is analogous to the `conduit`\`pipes` return value) instead of `Maybe a`, and make them functors over the `r` so that they would work with `FreeT`.
DeleteCould you make a type family that turns the type signature of a lazy IO function into the type signature of a pipes function?
ReplyDeleteYou mean like an automated way that new users can use to find out what the equivalent pipe type would be?
DeleteYes, and perhaps to give simplified type signatures to functions.
DeleteI'm generally reluctant to add type synonyms or type families unless they are "sort of opaque" (i.e. the user can meaningfully interact with without understanding what the type synonym expands to).
DeleteIn some trivial use cases this works (i.e. when consuming simple functions already written for you this way, such as `lines`, `takeFree`, and `unlines`), but in other cases if you don't know that it is using `FreeT` or how that works then you are missing out on a lot of functionality, such as traversing the data type by hand if there is not a `FreeT` recursion scheme for what you have in mind.
Also, I would like people to grow more comfortable with using `FreeT` directly as I think it is a rather fundamental type, right up there with lists. For me, I would prefer to hide things that are ugly, but there is nothing ugly about `FreeT` in my mind.
"Perfect streaming" is inseparable from "Perfect parsing", it seems. Or at least from "Perfect regex-matching": splitting input at fixed separators/at fixed lengths/at encountering different sort of input symbols.
ReplyDeleteBy the way, imagine I have a string-producer which produces a potentially infinite string. I want to turn it into "Key-Value pairs" producer, by treating each line as a key-value pair in "key: value" format. So how would be "Producer KeyString" and "Producer ValueString" combined to express this?
So I generally provide a two-tiered approach. For every function of producers I usually provide a lower-level parser that you can use to build it up step by step. The exception is `pipes-bytestring` where I left out most of the parsers because I wanted to get it out, but I plan on adding them later.
DeleteI can use the example you gave: the way I would break down the problem is to first provide two parsers, one for keys and one for values, with these types:
parseKey :: (Monad m) => StateT (Producer ByteString m r) m Key
parseValue :: (Monad m) => StateT (Producer ByteString m r) m Value
Then you could combine those into a parser for a key-value pair:
parseKeyVal :: (Monad m) => StateT (Producer ByteString m r) (Key, Val)
parseKeyVal = (,) <$> parseKey <*> parseVal
Then I would provide a high-level function that applied that parser repeatedly for user convenience:
keyVals :: (Monad m) => Producer ByteString m r -> Producer (Key, Val) m r
The idea is that the parsers are more reusable but the functions are more convenient, so the pipes ecosystem usually provides both. For an example of this, see the `pipes-binary` package, which provides the `decode` parser (to decode one element from the stream) and the `decodeMany` function (to decode a stream of elements).
Don't you need to parse the ":" separately ? (and throw it away ?)
DeleteYes, you would. For more sophisticated cases like this you can use `pipes-attoparsec` which turns any `attoparsec` parser into the equivalent `pipes-parse` parser (using the `parse` function). So in practice you could actually do the entire example within `pipes-attoparsec` instead of the way I just suggested.
Delete