Sunday, January 19, 2014

Shortcut fusion for pipes

Rewrite rules are a powerful tool that you can use to optimize Haskell code without breaking backwards compatibility. This post will illustrate how I use rewrite rules to implement a form of shortcut fusion for my pipes stream programming library. I compare pipes performance before and after adding shortcut fusion and I also compare the peformance of pipes-4.0.2 vs. conduit-1.0.10 and io-streams-1.1.4.0.

This post also includes a small introduction to Haskell's rewrite rule system for those who are interested in optimizing their own libraries.

Edit: This post originally used the term "stream fusion", but Duncan Coutts informed me that the more appropriate term for this is probably "short-cut fusion".

Rule syntax

The following rewrite rule from pipes demonstrates the syntax for defining optimization rules.

{-# RULES "for p yield" forall p . for p yield = p #-}
--        ^^^^^^^^^^^^^        ^   ^^^^^^^^^^^^^^^
--        Label                |   Substitution rule
--                             |
--        `p` can be anything -+

All rewrite rules are substitution rules, meaning that they instruct the compiler to replace anything that matches the left-hand side of the rule with the right-hand side. The above rewrite rule says to always replace for p yield with p, no matter what p is, as long as everything type-checks before and after substitution.

Rewrite rules are typically used to substitute code with equivalent code of greater efficiency. In the above example, for p yield is a for loop that re-yields every element of p. The re-yielded stream is behaviorally indistinguishable from the original stream, p, because all that for p yield does is replace every yield in p with yet another yield. However, while both sides of the equation behave identically their efficiency is not the same; the left-hand side is less efficient because for loops are not free.

Safety

Rewrite rules are not checked for correctness. The only thing the compiler does is verify that the left-hand side and right-hand side of the equation both type-check. The programmer who creates the rewrite rule is responsible for proving that the substitution preserves the original behavior of the program.

In fact, rewrite rules can be used to rewrite terms from other Haskell libraries without limitation. For this reason, modules with rewrite rules are automatically marked unsafe by Safe Haskell and they must be explicitly marked TrustWorthy to be used by code marked Safe.

You can verify a rewrite rule is safe using equational reasoning. If you can reach the right-hand side of the equation from the left-hand side using valid code substitutions, then you can prove (with some caveats) that the two sides of the equation are functionally identical.

pipes includes a complete set of proofs for its rewrite rules for this purpose. For example, the above rewrite rule is proven here, where (//>) is an infix synonym for for and respond is a synonym for yield:

for   = (//>)

yield = respond

This means that equational reasoning is useful for more than just proving program correctness. You can also use derived equations to optimize your program , assuming that you already know which side of the equation is more efficient.

Laws

Rewrite rules have a significant limitation: we cannot possibly anticipate every possible expression that downstream users might build using our libraries. So how can we optimize as much code as possible without an excessive proliferation of rewrite rules?

I anecdotally observe that equations inspired from category theory prove to be highly versatile optimizations that fit within a small number of rules. These equations include:

  • Category laws

  • Functor laws

  • Natural transformation laws (i.e. free theorems)

The first half of the shortcut fusion optimization consists of three monad laws, which are a special case of category laws. For those new to Haskell, the three monad laws are:

-- Left Identity
return x >>= f = f x

-- Right Identity
m >>= return = m

-- Associativity
(m >>= f) >>= g = m >>= (\x -> f x >>= g)

If you take these three laws and replace (>>=)/return with for/yield (and rename m to p, for 'p'ipe), you get the following "for loop laws":

-- Looping over a yield simplifies to function application
for (yield x) f = f x

-- Re-yielding every element returns the original stream
for p yield = p

-- You can transform two passes over a stream into a single pass
for (for p f) g = for p (\x -> for (f x) g)

This analogy to the monad laws is precise because for and yield are actually (>>=) and return for the ListT monad when you newtype them appropriately, and they really form a Monad in the Haskell sense of the word.

What's amazing is that these monad laws also double as shortcut fusion optimizations when we convert them to rewrite rules. We already encountered the second law as our first rewrite rule, but the other two laws are useful rewrite rules, too:

{-# RULES
    "for (yield x) f" forall x f   .
        for (yield x) f = f x
  ; "for (for p f) g" forall p f g .
        for (for p f) g = for p (\x -> for (f x) g)
  #-}

Note that the RULES pragma lets you group multiple rules together, as long as you separate them by semicolons. Also, there is no requirement that the rule label must match the left-hand side of the equation, but I use this convention since I'm bad at naming rewrite rules. This labeling convention also helps when diagnosing which rules fired (see below) without having to consult the original rule definitions.

Free theorems

These three rewrite rules alone do not suffice to optimize most pipes code. The reason why is that most idiomatic pipes code is not written in terms of for loops. For example, consider the map function from Pipes.Prelude:

map :: Monad m => (a -> b) -> Pipe a b m r
map f = for cat (\x -> yield (f x))

The idiomatic way to transform a pipe's output is to compose the map pipe downstream:

p >-> map f

We can't optimize this using our shortcut fusion rewrite rules unless we rewrite the above code to the equivalent for loop:

for p (\y -> yield (f y))

In other words, we require the following theorem:

p >-> map f = for p (\y -> yield (f y))

This is actually a special case of the following "free theorem":

-- Exercise: Derive the previous equation from this one

p1 >-> for p2 (\y -> yield (f y))
    = for (p1 >-> p2) (\y -> yield (f y))

A free theorem is an equation that you can prove solely from studying the types of all terms involved. I will omit the proof of this free theorem for now, but I will discuss how to derive free theorems in detail in a follow-up post. For now, just assume that the above equations are correct, as codified by the following rewrite rule:

{-# RULES
    "p >-> map f" . forall p f .
        p >-> map f = for p (\y -> yield (f y))
  #-}

With this rewrite rule the compiler can begin to implement simple map fusion. To see why, we'll compose two map pipes and then pretend that we are the compiler, applying rewrite rules at every opportunity. Every time we apply a rewrite rule we will refer to the rule by its corresponding string label:

map f >-> map g

-- "p >-> map f" rule fired
= for (map f) (\y -> yield (g y))

-- Definition of `map`
= for (for cat (\x -> yield (f x))) (\y -> yield (g y))

-- "for (for p f) g" rule fired
= for cat (\x -> for (yield (f x)) (\y -> yield (g y)))

-- "for (yield x) f" rule fired
= for cat (\x -> yield (g (f x)))

This is identical to a single map pass, which we can prove by equational reasoning:

for cat (\x -> yield (g (f x)))

-- Definition of `(.)`, in reverse
= for cat (\x -> yield ((g . f) x))

-- Definition of `map`, in reverse
= map (g . f)

So those rewrite rules sufficed to fuse the two map passes into a single pass. You don't have to take my word for it, though. For example, let's say that we want to prove that these rewrite rules fire for the following sample program, which increments, doubles, and then discards every number from 1 to 100000000:

-- map-fusion.hs

import Pipes
import qualified Pipes.Prelude as P

main = runEffect $
    for (each [1..10^8] >-> P.map (+1) >-> P.map (*2)) discard

The -ddump-rule-firings flag will output every rewrite rule that fires during compilation, identifying each rule with the string label accompanying the rule:

$ ghc -O2 -ddump-rule-firings map-fusion.hs
[1 of 1] Compiling Main             ( test.hs, test.o )
...
Rule fired: p >-> map f
Rule fired: for (for p f) g
Rule fired: for (yield x) f
...

I've highlighted the rule firings that correspond to map fusion, although there are many other rewrite rules that fire (including more shortcut fusion rule firings).

Shortcut fusion

We don't have to limit ourselves to just fusing maps. Many pipes in Pipes.Prelude has an associated free theorem that rewrites pipe composition into an equivalent for loop. After these rewrites, the "for loop laws" go to town on the pipeline and fuse it into a single pass.

For example, the filter pipe has a rewrite rule similar to map:

{-# RULES
    "p >-> filter pred" forall p pred .
        p >-> filter pred =
            for p (\y -> when (pred y) (yield y))
  #-}

So if we combine map and filter in a pipeline, they will also fuse into a single pass:

p >-> map f >-> filter pred

-- "p >-> map f" rule fires
for p (\x -> yield (f x)) >-> filter pred

-- "p >-> filter pred" rule fires
for (for p (\x -> yield (f x))) (\y -> when (pred y) (yield y))

-- "for (for p f) g" rule fires
for p (\x -> for (yield (f x)) (\y -> when (pred y) (yield y)))

-- for (yield x) f" rule fires
for p (\x -> let y = f x in when (pred y) (yield y))

This is the kind of single pass loop we might have written by hand if we were pipes experts, but thanks to rewrite rules we can write high-level, composable code and let the library automatically rewrite it into efficient and tight loops.

Note that not all pipes are fusible in this way. For example the take pipe cannot be fused in this way because it cannot be rewritten in terms of a for loop.

Benchmarks

These rewrite rules make fusible pipe stages essentially free. To illustrate this I've set up a criterion benchmark testing running time as a function of the number of map stages in a pipeline:

import Criterion.Main
import Data.Functor.Identity (runIdentity)
import Pipes
import qualified Pipes.Prelude as P

n :: Int
n = 10^6

main = defaultMain
    [ bench' "1 stage " $ \n ->
            each [1..n]
        >-> P.map (+1)
    , bench' "2 stages" $ \n ->
            each [1..n]
        >-> P.map (+1)
        >-> P.map (+1)
    , bench' "3 stages" $ \n ->
            each [1..n]
        >-> P.map (+1)
        >-> P.map (+1)
        >-> P.map (+1)
    , bench' "4 stages" $ \n ->
            each [1..n]
        >-> P.map (+1)
        >-> P.map (+1)
        >-> P.map (+1)
        >-> P.map (+1)
    ]
  where
    bench' label f = bench label $
        whnf (\n -> runIdentity $ runEffect $ for (f n) discard)
             (10^5 :: Int)

Before shortcut fusion (i.e. pipes-4.0.0), the running time scales linearly with the number of map stages:

warming up
estimating clock resolution...
mean is 24.53411 ns (20480001 iterations)
found 80923 outliers among 20479999 samples (0.4%)
  32461 (0.2%) high severe
estimating cost of a clock call...
mean is 23.89897 ns (1 iterations)

benchmarking 1 stage 
mean: 4.480548 ms, lb 4.477734 ms, ub 4.485978 ms, ci 0.950
std dev: 19.42991 us, lb 12.11399 us, ub 35.90046 us, ci 0.950

benchmarking 2 stages
mean: 6.304547 ms, lb 6.301067 ms, ub 6.310991 ms, ci 0.950
std dev: 23.60979 us, lb 14.01610 us, ub 37.63093 us, ci 0.950

benchmarking 3 stages
mean: 10.60818 ms, lb 10.59948 ms, ub 10.62583 ms, ci 0.950
std dev: 61.05200 us, lb 34.79662 us, ub 102.5613 us, ci 0.950

benchmarking 4 stages
mean: 13.74065 ms, lb 13.73252 ms, ub 13.76065 ms, ci 0.950
std dev: 61.13291 us, lb 29.60977 us, ub 123.3071 us, ci 0.950

Stream fusion (added in pipes-4.0.1) makes additional map stages essentially free:

warming up
estimating clock resolution...
mean is 24.99854 ns (20480001 iterations)
found 1864216 outliers among 20479999 samples (9.1%)
  515889 (2.5%) high mild
  1348320 (6.6%) high severe
estimating cost of a clock call...
mean is 23.54777 ns (1 iterations)

benchmarking 1 stage 
mean: 2.427082 ms, lb 2.425264 ms, ub 2.430500 ms, ci 0.950
std dev: 12.43505 us, lb 7.564554 us, ub 20.11641 us, ci 0.950

benchmarking 2 stages
mean: 2.374217 ms, lb 2.373302 ms, ub 2.375435 ms, ci 0.950
std dev: 5.394149 us, lb 4.270983 us, ub 8.407879 us, ci 0.950

benchmarking 3 stages
mean: 2.438948 ms, lb 2.436673 ms, ub 2.443006 ms, ci 0.950
std dev: 15.11984 us, lb 9.602960 us, ub 23.05668 us, ci 0.950

benchmarking 4 stages
mean: 2.372556 ms, lb 2.371644 ms, ub 2.373949 ms, ci 0.950
std dev: 5.684231 us, lb 3.955916 us, ub 9.040744 us, ci 0.950

In fact, once you have just two stages in your pipeline, pipes greatly outperforms conduit and breaks roughly even with io-streams. To show this I've written up a benchmark comparing pipes performance against these libraries for both pure loops and loops that are slightly IO-bound (by writing to /dev/null):

import Criterion.Main
import Data.Functor.Identity (runIdentity)
import qualified System.IO as IO

import Data.Conduit
import qualified Data.Conduit.List as C

import Pipes
import qualified Pipes.Prelude as P

import qualified System.IO.Streams as S

criterion :: Int -> IO ()
criterion n = IO.withFile "/dev/null" IO.WriteMode $ \h ->
    defaultMain
    [ bgroup "pure"
        [ bench "pipes"      $ whnf (runIdentity . pipes) n
        , bench "conduit"    $ whnf (runIdentity . conduit) n
        , bench "io-streams" $ nfIO (iostreams n)
        ]
    , bgroup "io"
        [ bench "pipes"     $ nfIO (pipesIO     h n)
        , bench "conduit"   $ nfIO (conduitIO   h n)
        , bench "iostreams" $ nfIO (iostreamsIO h n)
        ]
    ]

pipes :: Monad m => Int -> m ()
pipes n = runEffect $
    for (each [1..n] >-> P.map (+1) >-> P.filter even) discard

conduit :: Monad m => Int -> m ()
conduit n =
    C.enumFromTo 1 n $= C.map (+1) $= C.filter even $$ C.sinkNull

iostreams :: Int -> IO ()
iostreams n = do
    is0 <- S.fromList [1..n]
    is1 <- S.map (+1) is0
    is2 <- S.filter even is1
    S.skipToEof is2

pipesIO :: IO.Handle -> Int -> IO ()
pipesIO h n = runEffect $
        each [1..n]
    >-> P.map (+1)
    >-> P.filter even
    >-> P.map show
    >-> P.toHandle h

conduitIO :: IO.Handle -> Int -> IO ()
conduitIO h n =
       C.enumFromTo 1 n
    $= C.map (+1)
    $= C.filter even
    $= C.map show
    $$ C.mapM_ (IO.hPutStrLn h)

iostreamsIO :: IO.Handle -> Int -> IO ()
iostreamsIO h n = do
    is0 <- S.fromList [1..n]
    is1 <- S.map (+1) is0
    is2 <- S.filter even is1
    is3 <- S.map show is2
    os  <- S.makeOutputStream $ \ma -> case ma of
        Just str -> IO.hPutStrLn h str
        _        -> return ()
    S.connect is3 os

main = criterion (10^6)

The benchmarks place pipes neck-and-neck with io-streams on pure loops and 10% slower on slightly IO-bound code. Both libraries perform faster than conduit:

warming up
estimating clock resolution...
mean is 24.50726 ns (20480001 iterations)
found 117040 outliers among 20479999 samples (0.6%)
  45158 (0.2%) high severe
estimating cost of a clock call...
mean is 23.89208 ns (1 iterations)

benchmarking pure/pipes
mean: 24.04860 ms, lb 24.02136 ms, ub 24.10872 ms, ci 0.950
std dev: 197.3707 us, lb 91.05894 us, ub 335.2267 us, ci 0.950

benchmarking pure/conduit
mean: 172.8454 ms, lb 172.6317 ms, ub 173.1824 ms, ci 0.950
std dev: 1.361239 ms, lb 952.1500 us, ub 1.976641 ms, ci 0.950

benchmarking pure/io-streams
mean: 24.16426 ms, lb 24.12789 ms, ub 24.22919 ms, ci 0.950
std dev: 242.5173 us, lb 153.9087 us, ub 362.4092 us, ci 0.950

benchmarking io/pipes
mean: 267.7021 ms, lb 267.1789 ms, ub 268.4542 ms, ci 0.950
std dev: 3.189998 ms, lb 2.370387 ms, ub 4.392541 ms, ci 0.950

benchmarking io/conduit
mean: 310.3034 ms, lb 309.8225 ms, ub 310.9444 ms, ci 0.950
std dev: 2.827841 ms, lb 2.194127 ms, ub 3.655390 ms, ci 0.950

benchmarking io/iostreams
mean: 239.6211 ms, lb 239.2072 ms, ub 240.2354 ms, ci 0.950
std dev: 2.564995 ms, lb 1.879984 ms, ub 3.442018 ms, ci 0.950

I hypothesize that pipes performs slightly slower on IO compared to io-streams because of the cost of calling lift, whereas iostreams operates directly within the IO monad at all times.

These benchmarks should be taken with a grain of salt. All three libraries are most frequently used in strongly IO-bound scenarios, where the overhead of each library is pretty much negligible. However, this still illustrates how big of an impact shortcut fusion can have on pure code paths.

Conclusions

pipes is a stream programming library with a strong emphasis on theory and the library's contract with the user is a set of laws inspired by category theory. My original motivation behind proving these laws was to fulfill the contract, but I only later realized that the for loop laws doubled as fortuitous shortcut fusion optimizations. This is a recurring motif in Haskell: thinking mathematically pays large dividends.

For this reason I like to think of Haskell as applied category theory: I find that many topics I learn from category theory directly improve my Haskell code. This post shows one example of this phenomenon, where shortcut fusion naturally falls out of the monad laws for ListT.

4 comments:

  1. Sorry for necroing the comments on this post, but I'm wondering why you use nice (albeit non-standard) combinators for pipes and conduit, but the io-streams examples are written in clunky explicit style. For example, your "iostreams" example could be written as

    iostreams :: Int -> IO ()
    iostreams n = S.fromList [1..n] >>= S.map (+1) >>= S.filter even >>= S.skipToEof

    ReplyDelete
    Replies
    1. I almost never use `(>>=)` in my code, even if it would make code shorter. I prefer the way `do` notation looks.

      Delete
  2. typo in the "p >-> filter pred" rule definition, I think it should be:

    for p (\y -> when (pred y) (yield y))

    thanks for this post! it made me want to work through a few hello worlds for your library.

    ReplyDelete