If you’re a programmer now, there’s one reality you’d best be getting used to. People expect you to know how to deal with big data. The kind of data that will take a while to process. The kind that will crash your program if you try to bring it all into memory at the same time. But you also want to avoid making individual SQL requests to a database to access every single row. That would be awfully slow. So how do you solve this problem? More specifically, how do you solve this problem in Haskell?
This is exactly the type of problem the
Data.Conduit library exists for. This article will go through a simple example. We’ll stream some integers using a conduit "source" and add them all together. Luckily, we only ever need to see one number at a time from the source, so we have no need of bringing a ton a data into memory. Let’s first take a look at how we create this source.
Getting Our Data
Suppose we are just trying to find the sum of the numbers from 1 up to 10 million. The naive way to do this would be to have our “source” of numbers be a raw list, and then we would take the sum of this list:
myIntegerSourceBad :: [Integer] myIntegerSourceBad = [1..10000000] sumFromSource :: [Integer] -> Integer sumFromSource lst = sum lst
This method necessitates having the entire list in memory at the same time. Imagine if we were querying a database with tens of millions of entries. That would be problematic. Our first stroke to solve this will be to use
Data.Conduit to create a “Source” of integers from this list. We can do this with the
sourceList function. Note that
Identity is the simple Identity monad. That is, the monad base monad that doesn’t actually do anything:
import Control.Monad.Identity (Identity) import Data.Conduit (Source) import Data.Conduit.List (sourceList) myIntegerSource :: Source Identity Integer myIntegerSource = sourceList [1..10000000]
So now instead of returning a lump list of
Int values, we’re actually getting a stream of values we call a
Source. Let’s describe the meaning of some of the conduit types so we’ll have a better idea of what’s going on here.
The conduit types are all built on
ConduitM, which is the fundamental monad for conduits. This type has four different parameters. We’ll write out its definition like so:
type ConduitM i o m r
You should think of each conduit as a data processing funnel. The
i type is the input that you can bring into the funnel. The
o type stands for output. These are the values you can push out of the funnel. The
m type is the underlying monad. We'll see examples with both the
Identity monad and the
We can think of the last
r type as the “result”. But it’s different from the output. This is what the function itself will return as its value once it’s done. We'll see a return type when we write our sink later. But otherwise, we'll generally just use
(). As a result, we can rewrite some types using the
Conduit type synonym. This synonym is helpful for ignoring the return type. But it is also annoying because it flips the placement of the
o types. Don’t let this trip you up.
type Conduit i m o = ConduitM i o m ()
Now we can define our source and sink types. A
Source is a conduit that takes no input (so
i is the unit type) and produces some output. Conversely, a
Sink is a conduit that takes some input but produces no output (so the
type Source m o = Conduit () m o type Sink i m r = ConduitM i Void m r
So in the above example, our function is a “source”. It creates a series of integer values without taking any inputs. We’ll see how we can match up different conduits that have different matching types.
There are three basic functions in the conduits library: yield, await, and leftover. Yield is how we pass a value downstream to another conduit. In other words, it is a “production” function, or a source of values. We can only yield types that fit with the output type of our conduit function.
So then how do we receive values from upstream? The answer is the
await function. This operates as a sink and allows a function to remain inert while it waits to receive a value from upstream. Naturally, this value must be of the type of the input of the conduit. Now, the actual resulting type of
await is a
Maybe value. We could receive a value of
Nothing. This indicates that our upstream source has terminated and we won't receive any more values. This is usually the circumstance in which we’ll let our function return.
The final function is the
leftover function. This allows you to take a value that you have already pulled from upstream and put it back upstream. This way, we can consume the value again from a different sink, or a different iteration of this sink. One caveat the docs add is that you should not use
leftover with values that you have have created yourself. You should ONLY use values you got from upstream.
Writing Our Conduits
So above we’ve already written a source conduit for our toy program. We return a list of integers so we can stream them into our program 1-by-1. Now we’ll write a sink that will take these values and add them together. This will take the form of a recursive function with an accumulator argument. Sinks often function recursively to receive their next input.
So we’ll start off by awaiting some value from the upstream conduit.
myIntegerSink :: Integer -> Sink Integer Identity Integer myIntegerSink accum = do maybeFirstVal <- await ...
If that value is
Nothing, we’ll know there are no more values coming. We can then proceed to return whatever accumulated value we have.
myIntegerSink :: Integer -> Sink Integer Identity Integer myIntegerSink accum = do maybeFirstVal <- await case maybeFirstVal of Nothing -> return accum ...
If that value contains another
Int, we’ll first calculate the new sum by adding them together. Then we’ll make a recursive call to the sink function with the new accumulator argument:
myIntegerSink :: Integer -> Sink Integer Identity Integer myIntegerSink accum = do maybeFirstVal <- await case maybeFirstVal of Nothing -> return accum Just val -> do let newSum = val + accum myIntegerSink newSum
And that’s it really! Our example is simple so the code ends up being simple as well.
Combining Our Conduits
Now we’ll want to combine our conduits. We do this with the “fuse” operator, written out as
=$=. Haskell libraries can be notorious for having strange operators. This library isn’t necessarily an exception. But think of conduits as a tunnel, and this operator looks like it's connecting two parts of a tunnel.
With this operator, the output type of the first conduit needs to match the input type of the second conduit. With how we’ve set up our conduits, this is the case. So now to polish things off, we use
runConduitPure with our combined conduit:
fullConduit :: Integer fullConduit = runConduitPure $ myIntegerSource =$= myIntegerSink 0
It’s generally quite easy using fuse to add more conduits. For instance, suppose we wanted even numbers to count double for our purposes. We could accomplish this in our source or sink, but we could also add another conduit. It will take an
Int as input and yield an
Int as output. It will want for its input integer, check its value, and then double the value if it is even. Then we will yield the resulting value. Once again, if we haven’t seen the end of the conduit, we need to recursively jump back into the conduit.
myDoublingConduit :: Conduit Integer Identity Integer myDoublingConduit = do maybeVal <- await case maybeVal of Nothing -> return () Just val -> do let newVal = if val `mod` 2 == 0 then val * 2 else val yield newVal myDoublingConduit
Then we can stick this conduit between our other conduits with the fuse operator!
fullConduit :: Integer fullConduit = runConduitPure $ myIntegerSource =$= myDoublingConduit =$= myIntegerSink 0
There are also times when you’ll want to batch up certain transactions. A great example of this is when you want to insert all results from your sink into a database. You don’t want to keep sending individual insert queries down the line. You can instead wait and group a bunch of inputs together and send batch inserts.
For our toy example, we’ll gather our ints in groups of 100000 so that we can give log progress updates along the way. This will require changing our conduits to live on top of the IO monad. But once we’ve made this change, we can use the
conduitVector function like so:
fullConduitIO :: IO Integer fullConduitIO = runConduit $ myIntegerSourceIO =$= conduitVector 100000 =$= myIntegerVectorSink 0 myIntegerSourceIO :: Source IO Integer myIntegerSourceIO = sourceList [1..100000000] myIntegerVectorSink :: Integer -> Sink (Vector Integer) IO Integer myIntegerVectorSink accum = do maybeFirstVal <- await case maybeFirstVal of Nothing -> return accum Just vals -> do let newSum = (Vec.sum vals) + accum lift $ print newSum myIntegerVectorSink newSum
By vectorizing the conduit, we need to change the sink so that it takes
Vector Int as its input type instead of
Int. Then we get a vector from
await, so we have to sum those values as well.
The Data.Conduit library allows you to deal with large amounts of data in sustainable ways. You can use it to stream data from a database or some other source. This is more efficient than bringing a large chunk of information into memory all at once. It also allows you to pass information through “tunnels”, called conduits. You can make these perform many complicated operations. You mainly compose conduit functions from the
leftover functions. You merge conduits together into a larger conduit with the “fuse” operator
=$=. You can also use the
conduitVector function to batch certain operators.
This was more advanced of a topic. If you’ve never written Haskell before, it’s not as scary as this article makes it out to be! Check out our Getting Started Checklist to take the first steps on your Haskell journey!
If you’ve done a little bit of Haskell before but need some more practice on the fundamentals, you should download our Recursion Workbook. It has some great material on recursion as well as 10 practice problems!
Next week we’ll keep up with some more advanced topics. We’ll look into the
Data.Aeson library and how it allows us to serialize Haskell objects into JSON format! So stay tuned to Monday Morning Haskell!
I mentioned earlier that operators can be a major pain point in Haskell. Unfortunately, so can documentation. This can be especially true when tracking down the right docs for the conduit concepts. So for reference, here are all the imports I used:
import Conduit (conduitVector, lift) import Control.Monad.Identity (Identity) import Data.Conduit (Source, Sink, await, runConduitPure, (=$=), Conduit, yield, runConduit) import Data.Conduit.List (sourceList) import Data.Vector hiding (sum) import qualified Data.Vector as Vec
Data.Conduit.List come from the
conduit library on hackage. However, the
Conduit module actually comes from
conduit-combinators. This is very deceptive.