Making type-safe internet bots with Haskell

Posted on May 6, 2022 by wjwh


There are basically two types of client applications on the internet:

In this article, I will describe a fairly general way to use Haskell for constructing a specific kind of clients of the second type: clients that listen to a single network socket and send replies to that same socket, possibly while maintaining state. This article assumes basic proficiency with Haskell and networking.

Conduits

The conduit package is a library that provides primitives for streaming data. You can think of a Conduit as a datatype representing a single element in a pipeline, similar to how you can write Unix pipelines in the shell. In most shells every input and output must be text-based, but since we run this inside a Haskell program we can make use of the type system to enable richer in- and output types of each element. Every element in the pipeline has:

Individual elements can be chained together with the .| operator, which makes sure that the output type of the argument on the left matches the input type of the argument on the right. A very basic pipeline might be something like yieldMany [1..15] .| mapC (length . show) .| printC, which will print the number of digits in the numbers 1 to 15 to standard output. You can run a pipeline with the runConduit function or one of its monadic siblings. This is in itself not something you would need Conduit for; a simple list could do this perfectly well. However, there are several benefits to using it:

For more information on the rationale behind conduits and how to use them, the conduit readme is an excellent resource.

Some basic examples

To test these examples, I made an extremely basic server program which will listen on port 8000 and serve an endless stream of JSON values to whatever connects. The values are shaped like this:

$ nc localhost 8000
{"counter":1,"time":1647525924}
{"counter":2,"time":1647525925}
{"counter":3,"time":1647525926}
{"counter":4,"time":1647525927}
{"counter":5,"time":1647525928}
{"counter":6,"time":1647525929}
^C

Every JSON value contains only the counter and time keys. The time key contains the current UNIX timestamp, while the counter key contains the amount of values that have been sent so far. Values are sent out at a rate of 1 per second. Every test program in the following section will connect to this server. All the test programs including the test program can be found here.

A basic network conduit example

As a first example, let’s make a program that connects to the test server and then simply prints all the received values to the standard output. We can do this with the functions from Data.Conduit.Network from the conduit-extra package. This package provides a runTCPClient function, which does exactly what the name implies. It takes as arguments a ClientSettings and a function with signature (AppData -> IO a). The AppData contains everything you need to know about the connection, you can stream all the received data from it with the appSource function:

main :: IO ()
main = do
  let settings = clientSettings 8000 "127.0.0.1"
  runTCPClient settings $ \ad -> do
    runConduitRes $ appSource ad .| stdout

Since the appSource conduit outputs ByteString values and the stdout wants ByteStrings as input values, we have to do no further transformations to make the types match up. The output of this program is pretty much what you would expect:

$ cabal run
{"counter":1,"time":1647529231}
{"counter":2,"time":1647529232}
{"counter":3,"time":1647529233}
{"counter":4,"time":1647529234}
^C

So far, we have not done anything we couldn’t just do with netcat, so let’s look at a slightly more advanced example.

A conduit that parses the incoming stream

One of the main benefits of Haskell is that we can leverage the type system to encode extra information about our values. Since the JSON values follow a well defined format, we can use the aeson package to parse them into a data structure:

data CounterMessage = CounterMessage { counter :: Integer, time :: Integer }
    deriving stock (Show, Generic)
    deriving anyclass (FromJSON)

main :: IO ()
main = do
  let settings = clientSettings 8000 "127.0.0.1"
  runTCPClient settings $ \ad -> do
    runConduitRes $ appSource ad
                 .| linesUnboundedAscii
                 .| mapC (decodeStrict :: ByteString -> Maybe CounterMessage)
                 .| printC

By using the typeclass deriving functionality in GHC, we can automatically derive Show and FromJSON instances for our CounterMessage data structure. We can then use the decodeStrict function from the aeson package to decode the received JSON strings into Maybe CounterMessage values with mapC. The mapC conduit is similar to map on lists, it will apply a function to every value passed to it and pass the result downstream.

Since parsing can fail if the JSON value does not have the right structure to fit into a CounterMessage, it will return a value wrapped in Maybe. Since network connections gives no guarantees about how much data becomes available at a time, we also have to use the linesUnboundedAscii conduit. It collects all the incoming ByteString values until it encounters a newline and will then release a new ByteString containing the entire line. Finally, we can no longer use the stdout conduit to print to standard output. The mapC decodeStrict conduit will output Maybe CounterMessage values but stdout expects ByteString values as input, so the types do not match up. Instead, we can use the printC conduit, which will print any incoming values to standard output as long as they have a Show instance defined. Since CounterMessage has an automatically derived Show instance and Maybe a also has a Show instance as long as a has one, the combined Maybe CounterMessage will also have a Show instance.

$ cabal run
Just (CounterMessage {counter = 1, time = 1647529721})
Just (CounterMessage {counter = 2, time = 1647529722})
Just (CounterMessage {counter = 3, time = 1647529723})
Just (CounterMessage {counter = 4, time = 1647529724})
^C

Since all the JSON values were parsed succesfully, we have only Just values in the output stream.

A stateful conduit

So far, we have not done anything permanent with the values we receive except printing them to standard output. One way of keeping state and updating it with every new incoming value is the scanlC conduit, which behaves similar to the scanl function from Data.List: It computes a value like foldl, but also outputs all the intermediate values. You can see how it works in the following example:

data CounterMessage = CounterMessage { counter :: Integer, time :: Integer }
    deriving stock (Show, Generic)
    deriving anyclass (FromJSON)

data CounterState = CounterState
  { total :: Integer
  , lastUpdate :: Integer 
  } deriving (Show,Eq)

updateCounterState :: CounterState -> CounterMessage -> CounterState
updateCounterState (CounterState total _) (CounterMessage counter newTime)
  = CounterState (total + counter) newTime

main :: IO ()
main = do
  let settings = clientSettings 8000 "127.0.0.1"
  runTCPClient settings $ \ad -> do
    runConduitRes $ appSource ad
                 .| linesUnboundedAscii
                 .| mapC (decodeStrict :: C.ByteString -> Maybe CounterMessage)
                 .| mapC fromJust
                 .| scanlC updateCounterState (CounterState 0 0)
                 .| printC

The scanlC conduit has type signature Monad m => (a -> b -> a) -> a -> ConduitT b a m (). Since the appSource conduit forces m to be IO, we can simplify the type signature to (a -> b -> a) -> a -> ConduitT b a IO (). Since we know we want it to accept input value of type CounterMessage and maintain a state of type CounterState, we can further “fill in” the type signature to be (CounterState -> CounterMessage -> CounterState) -> CounterState -> ConduitT CounterMessage CounterState IO (). In normal language: it will take a function CounterState -> CounterMessage -> CounterState that updates the previous state with new information (very similar to the function you would pass to foldl') and an initial CounterState as arguments and performs as a conduit with CounterMessage inputs and CounterState outputs. In this case, we define the updateCounterState function to maintain a sum of all the counter values received so far and also to keep the last timestamp received. Every time our scanlC conduit receives a new CounterMessage, it will use the updateCounterState function to update its current CounterState and then emit this new value to the next element in the pipeline. Since CounterState also has a Show instance, we can send it directly into printC to be printed:

$ cabal run
CounterState {total = 0, lastUpdate = 0}
CounterState {total = 1, lastUpdate = 1647532637}
CounterState {total = 3, lastUpdate = 1647532638}
CounterState {total = 6, lastUpdate = 1647532639}
CounterState {total = 10, lastUpdate = 1647532640}
^C

It starts off with the initial state we passed to scanlC, then update it with each CounterMessage it receives. This update function was very simple, but you can make it (and the state it maintains) as complex as you want. Since the entire conduit pipeline is in IO there is nothing stopping you from doing database queries or even web requests for each update, although you will need to use scanlMC if you wish to have a non-pure update function.

Responding based on the state

Thus far we have only made conduit pipelines that ended up printing their values to standard output, possibly after doing some transformations on the data received. It will often not be enough to only receive data, sometimes we wish to send data as well. The AppData structure which we used for the appSource conduit can also be used for an appSink conduit, which takes in ByteString inputs and will send them to the socket that the AppData represents. Hooking it into pipeline we already have works pretty much as you would expect:

main :: IO ()
main = do
  let settings = clientSettings 8000 "127.0.0.1"
  runTCPClient settings $ \ad -> do
    runConduitRes $ appSource ad
                 .| linesUnboundedAscii
                 .| mapC (decodeStrict :: C.ByteString -> Maybe CounterMessage)
                 .| mapC fromJust
                 .| scanlC updateCounterState (CounterState 0 0)
                 .| filterC (odd . total)
                 .| mapC (ByteString.Char8.pack . (++ "\n") . show . lastUpdate)
                 .| appSink ad

The above example will send back the lastUpdate value of the CounterState back over the socket on a new line, but only if the total so far is odd. In this contrived example we simply use the Show instance to convert via String, but you can of course use any function you want to generate the ByteString.

If you do not wish to return value over the same socket but want to send requests to some other service, you can use the mapM_ combinator instead. It works just like the mapM_ function from Data.Foldable, except (you guessed it) it works on conduits.

Firming up the code

The above examples only scratch the surface of what is possible with conduit pipelines. The ecosystem around conduit is huge and many library authors have made adaptors to add interoperability with it to their packages. Some interesting directions to explore might be:

The technique described above works well for a client application talking to a single server. This is often enough, but sometimes you need to talk to several servers and make decisions based on information from all of them. In such a case, there are several frameworks built around Functional Reactive Programming (FRP) concepts available, such as Reactive, Reflex, Sodium and many more. FRP is a huge topic though and out of scope for this blog post.

Conclusion

Haskell is a programming language focused on lazy evaluation, and as such it also started out with lazily evaluated I/O. Several decades later, the problems with lazy I/O have proven greater than the benefits and several libraries have been developed that provide a better abstraction. Conduit is one such library and it provides a clean and very composable model for operating with streams of data. This makes it possible to develop and test components of the pipeline in separation, even by separate teams if necessary. The wide variety of available combinator functions combined with the type system also allows for some very nice ways express what you wish to accomplish.

The network conduits make it very easy to construct conduit pipelines that interact with a socket. The resulting programs look very different than their counterparts in imperative languages, but they are quite readable to anyone with some basic Haskell knowledge and can be altered easily by adding or removing parts of the pipeline. Next time you find yourself writing a client application for some network protocol, give the conduits a try!