Making type-safe internet bots with Haskell
There are basically two types of client applications on the internet:
- Clients that use the request-response model. One request will generally result in one response. Some examples would be web browsers and many API clients.
- Clients that receive a continuous stream of data from a server and may or may not send data back at any time. Examples of this type would include chatbots, automated trading applications and multiplayer video games.
In this article, I will describe a fairly general way to use Haskell for constructing a specific kind of clients of the second type: clients that listen to a single network socket and send replies to that same socket, possibly while maintaining state. This article assumes basic proficiency with Haskell and networking.
Conduits
The conduit
package is a library that provides primitives for streaming data. You can think of a Conduit
as a datatype representing a single element in a pipeline, similar to how you can write Unix pipelines in the shell. In most shells every input and output must be text-based, but since we run this inside a Haskell program we can make use of the type system to enable richer in- and output types of each element. Every element in the pipeline has:
- An input type;
- An output type which may or may not be the same as the input type;
- A monadic environment such as
IO
orState
in which it does its computations; - A result type to describe any results. For all but the last conduit in a pipeline this will be the empty type
()
.
Individual elements can be chained together with the .|
operator, which makes sure that the output type of the argument on the left matches the input type of the argument on the right. A very basic pipeline might be something like yieldMany [1..15] .| mapC (length . show) .| printC
, which will print the number of digits in the numbers 1 to 15 to standard output. You can run a pipeline with the runConduit
function or one of its monadic siblings. This is in itself not something you would need Conduit
for; a simple list could do this perfectly well. However, there are several benefits to using it:
- It can maintain constant memory use even when working with very large amounts of data, like big files or long-running streams of network data.
- It will handle the cleaning up of system resources like file handles and sockets as soon as possible, unlike lazy I/O where it is much harder to control when this happens.
- Finally, it is much easier to interleave monadic and pure effects with each other in a controlled fashion. For example: if you want to add logging statements in the middle of your pipeline for debugging or auditing reasons then a pure pipeline of list functions becomes unwieldy quite quickly.
For more information on the rationale behind conduits and how to use them, the conduit readme is an excellent resource.
Some basic examples
To test these examples, I made an extremely basic server program which will listen on port 8000 and serve an endless stream of JSON values to whatever connects. The values are shaped like this:
$ nc localhost 8000
{"counter":1,"time":1647525924}
{"counter":2,"time":1647525925}
{"counter":3,"time":1647525926}
{"counter":4,"time":1647525927}
{"counter":5,"time":1647525928}
{"counter":6,"time":1647525929}
^C
Every JSON value contains only the counter
and time
keys. The time
key contains the current UNIX timestamp, while the counter
key contains the amount of values that have been sent so far. Values are sent out at a rate of 1 per second. Every test program in the following section will connect to this server. All the test programs including the test program can be found here.
A basic network conduit example
As a first example, let’s make a program that connects to the test server and then simply prints all the received values to the standard output. We can do this with the functions from Data.Conduit.Network
from the conduit-extra
package. This package provides a runTCPClient
function, which does exactly what the name implies. It takes as arguments a ClientSettings
and a function with signature (AppData -> IO a)
. The AppData
contains everything you need to know about the connection, you can stream all the received data from it with the appSource
function:
main :: IO ()
= do
main let settings = clientSettings 8000 "127.0.0.1"
$ \ad -> do
runTCPClient settings $ appSource ad .| stdout runConduitRes
Since the appSource
conduit outputs ByteString
values and the stdout
wants ByteString
s as input values, we have to do no further transformations to make the types match up. The output of this program is pretty much what you would expect:
$ cabal run
{"counter":1,"time":1647529231}
{"counter":2,"time":1647529232}
{"counter":3,"time":1647529233}
{"counter":4,"time":1647529234}
^C
So far, we have not done anything we couldn’t just do with netcat, so let’s look at a slightly more advanced example.
A conduit that parses the incoming stream
One of the main benefits of Haskell is that we can leverage the type system to encode extra information about our values. Since the JSON values follow a well defined format, we can use the aeson
package to parse them into a data structure:
data CounterMessage = CounterMessage { counter :: Integer, time :: Integer }
deriving stock (Show, Generic)
deriving anyclass (FromJSON)
main :: IO ()
= do
main let settings = clientSettings 8000 "127.0.0.1"
$ \ad -> do
runTCPClient settings $ appSource ad
runConduitRes .| linesUnboundedAscii
.| mapC (decodeStrict :: ByteString -> Maybe CounterMessage)
.| printC
By using the typeclass deriving functionality in GHC, we can automatically derive Show
and FromJSON
instances for our CounterMessage
data structure. We can then use the decodeStrict
function from the aeson
package to decode the received JSON strings into Maybe CounterMessage
values with mapC
. The mapC
conduit is similar to map
on lists, it will apply a function to every value passed to it and pass the result downstream.
Since parsing can fail if the JSON value does not have the right structure to fit into a CounterMessage
, it will return a value wrapped in Maybe
. Since network connections gives no guarantees about how much data becomes available at a time, we also have to use the linesUnboundedAscii
conduit. It collects all the incoming ByteString
values until it encounters a newline and will then release a new ByteString
containing the entire line. Finally, we can no longer use the stdout
conduit to print to standard output. The mapC decodeStrict
conduit will output Maybe CounterMessage
values but stdout
expects ByteString
values as input, so the types do not match up. Instead, we can use the printC
conduit, which will print any incoming values to standard output as long as they have a Show
instance defined. Since CounterMessage
has an automatically derived Show
instance and Maybe a
also has a Show
instance as long as a
has one, the combined Maybe CounterMessage
will also have a Show
instance.
$ cabal run
Just (CounterMessage {counter = 1, time = 1647529721})
Just (CounterMessage {counter = 2, time = 1647529722})
Just (CounterMessage {counter = 3, time = 1647529723})
Just (CounterMessage {counter = 4, time = 1647529724})
^C
Since all the JSON values were parsed succesfully, we have only Just
values in the output stream.
A stateful conduit
So far, we have not done anything permanent with the values we receive except printing them to standard output. One way of keeping state and updating it with every new incoming value is the scanlC
conduit, which behaves similar to the scanl
function from Data.List
: It computes a value like foldl
, but also outputs all the intermediate values. You can see how it works in the following example:
data CounterMessage = CounterMessage { counter :: Integer, time :: Integer }
deriving stock (Show, Generic)
deriving anyclass (FromJSON)
data CounterState = CounterState
total :: Integer
{ lastUpdate :: Integer
,deriving (Show,Eq)
}
updateCounterState :: CounterState -> CounterMessage -> CounterState
CounterState total _) (CounterMessage counter newTime)
updateCounterState (= CounterState (total + counter) newTime
main :: IO ()
= do
main let settings = clientSettings 8000 "127.0.0.1"
$ \ad -> do
runTCPClient settings $ appSource ad
runConduitRes .| linesUnboundedAscii
.| mapC (decodeStrict :: C.ByteString -> Maybe CounterMessage)
.| mapC fromJust
.| scanlC updateCounterState (CounterState 0 0)
.| printC
The scanlC
conduit has type signature Monad m => (a -> b -> a) -> a -> ConduitT b a m ()
. Since the appSource
conduit forces m
to be IO
, we can simplify the type signature to (a -> b -> a) -> a -> ConduitT b a IO ()
. Since we know we want it to accept input value of type CounterMessage
and maintain a state of type CounterState
, we can further “fill in” the type signature to be (CounterState -> CounterMessage -> CounterState) -> CounterState -> ConduitT CounterMessage CounterState IO ()
. In normal language: it will take a function CounterState -> CounterMessage -> CounterState
that updates the previous state with new information (very similar to the function you would pass to foldl'
) and an initial CounterState
as arguments and performs as a conduit with CounterMessage
inputs and CounterState
outputs. In this case, we define the updateCounterState
function to maintain a sum of all the counter
values received so far and also to keep the last timestamp received. Every time our scanlC
conduit receives a new CounterMessage
, it will use the updateCounterState
function to update its current CounterState
and then emit this new value to the next element in the pipeline. Since CounterState
also has a Show
instance, we can send it directly into printC
to be printed:
$ cabal run
CounterState {total = 0, lastUpdate = 0}
CounterState {total = 1, lastUpdate = 1647532637}
CounterState {total = 3, lastUpdate = 1647532638}
CounterState {total = 6, lastUpdate = 1647532639}
CounterState {total = 10, lastUpdate = 1647532640}
^C
It starts off with the initial state we passed to scanlC
, then update it with each CounterMessage
it receives. This update function was very simple, but you can make it (and the state it maintains) as complex as you want. Since the entire conduit pipeline is in IO
there is nothing stopping you from doing database queries or even web requests for each update, although you will need to use scanlMC
if you wish to have a non-pure update function.
Responding based on the state
Thus far we have only made conduit pipelines that ended up printing their values to standard output, possibly after doing some transformations on the data received. It will often not be enough to only receive data, sometimes we wish to send data as well. The AppData
structure which we used for the appSource
conduit can also be used for an appSink
conduit, which takes in ByteString
inputs and will send them to the socket that the AppData
represents. Hooking it into pipeline we already have works pretty much as you would expect:
main :: IO ()
= do
main let settings = clientSettings 8000 "127.0.0.1"
$ \ad -> do
runTCPClient settings $ appSource ad
runConduitRes .| linesUnboundedAscii
.| mapC (decodeStrict :: C.ByteString -> Maybe CounterMessage)
.| mapC fromJust
.| scanlC updateCounterState (CounterState 0 0)
.| filterC (odd . total)
.| mapC (ByteString.Char8.pack . (++ "\n") . show . lastUpdate)
.| appSink ad
The above example will send back the lastUpdate
value of the CounterState
back over the socket on a new line, but only if the total so far is odd. In this contrived example we simply use the Show
instance to convert via String
, but you can of course use any function you want to generate the ByteString
.
If you do not wish to return value over the same socket but want to send requests to some other service, you can use the mapM_
combinator instead. It works just like the mapM_
function from Data.Foldable
, except (you guessed it) it works on conduits.
Firming up the code
The above examples only scratch the surface of what is possible with conduit pipelines. The ecosystem around conduit
is huge and many library authors have made adaptors to add interoperability with it to their packages. Some interesting directions to explore might be:
- Almost every interesting data stream these days will require a TLS connection. For this, the
network-conduit-tls
package provides arunTLSClient
which is a drop-in replacement for therunTCPClient
function used in the examples above. - It is often desirable to add logging at some point in the pipeline. The
iterM
conduit will apply a monadic action on anything that it receives and then pass on the value unaltered. You can use this for logging like so:iterM (liftIO . print)
, assuming that the value passed in is a member of theShow
typeclass of course. - Websockets: websocket clients can be created with the aptly named
websockets
library. - The examples used a stream of newline delimited JSON values, but not every stream is formatted like that. For example, the Redis replication stream uses a custom protocol that is designed to be both human readable and efficient to parse. For these kind of streams, you can define a custom attoparsec parser and hook it into the pipeline with
conduitParser
. - If you wish to test your application, you can store a pre-recorded stream as a test fixture and then use
sourceFile
to mimic a normal stream. This will allow you to quickly iterate without having to depend on a (possibly third-party) network service.
The technique described above works well for a client application talking to a single server. This is often enough, but sometimes you need to talk to several servers and make decisions based on information from all of them. In such a case, there are several frameworks built around Functional Reactive Programming (FRP) concepts available, such as Reactive, Reflex, Sodium and many more. FRP is a huge topic though and out of scope for this blog post.
Conclusion
Haskell is a programming language focused on lazy evaluation, and as such it also started out with lazily evaluated I/O. Several decades later, the problems with lazy I/O have proven greater than the benefits and several libraries have been developed that provide a better abstraction. Conduit is one such library and it provides a clean and very composable model for operating with streams of data. This makes it possible to develop and test components of the pipeline in separation, even by separate teams if necessary. The wide variety of available combinator functions combined with the type system also allows for some very nice ways express what you wish to accomplish.
The network conduits make it very easy to construct conduit pipelines that interact with a socket. The resulting programs look very different than their counterparts in imperative languages, but they are quite readable to anyone with some basic Haskell knowledge and can be altered easily by adding or removing parts of the pipeline. Next time you find yourself writing a client application for some network protocol, give the conduits a try!