Parsing infinite streams with attoparsec
In a previous article, we looked at how Redis replication works and obtained a replication stream in our terminal using netcat
. However, the data sent over was not very readable due to being encoded with the Redis Serialization Protocol (RESP). Since RESP is not native to most programs, parsing it will be necessary before an application can make use of the data. We’ll write a short parser with Haskell’s attoparsec
library to parse the (potentially infinitely long) binary stream into a slightly more useful (but still potentially infinitely long) stream of parsed Redis values.
Parsing the Redis Serialization Protocol
As always when parsing something in Haskell, one of the most important things is deciding what data structure to parse the source text into. Luckily, the RESP documentation is extremely clear and we can pretty much type down the Haskell code straight from the text. A RedisValue
can be an Integer
, an Error
, a SimpleString
or a BulkString
, or an Array
containing zero or more RedisValue
s:
data RedisValue = RedisInteger Integer
| RedisError B.ByteString
| RedisSimpleString B.ByteString
| RedisBulkString B.ByteString
| RedisArray [RedisValue]
To make the printing later a little nicer we’ll define a Show
instance for this datatype that unwraps the types and just shows their ‘contents’:
instance Show RedisValue where
show (RedisArray elements) =
concat $ ["[", intercalate "," (map show elements), "]"]
show (RedisBulkString bs) = show bs
show (RedisInteger num) = show num
show (RedisError bs) = show bs
show (RedisSimpleString bs) = show bs
Every type of value has a separate prefixing character that uniquely identifies it. This makes parsing extremely straightforward. For the Integer
, SimpleString
and Error
cases we just need to check whether they start with the correct magic character and then parse in the text (for Error
and SimpleString
) or number (for Integer
). They all end with a newline, so we parse that out as well:
parseRedisInteger = RedisInteger <$> (char ':' *> signed decimal) <* endOfLine
parseRedisSimpleString = RedisSimpleString <$>
(char '+' *> AB.takeWhile (not . isEndOfLine))
<* endOfLine
parseRedisError = RedisError <$>
(char '-' *> AB.takeWhile (not . isEndOfLine))
<* endOfLine
A BulkString
takes a little more effort, since it is prefixed not only by a magic character, but also by the length of the string. The parser is still very straightforward:
parseRedisBulkString = do
char '$'
bStringLength <- decimal
endOfLine
actualString <- AB.take bStringLength
return $ RedisBulkString actualString
An parser for a RedisArray
can be slightly confusing, since it can contain multiple instances of RedisValue
within itself. Luckily, attoparsec has no problem with recursive definitions. The implementation once again flows directly from the documentation:
parseRedisArray = do
char '*'
numElements <- decimal
endOfLine
elements <- count numElements parseRedisValue
return $ RedisArray elements
With all these sub-parsers present, the parser for a RedisValue is simply “any of these”:
parseRedisValue :: Parser RedisValue
parseRedisValue = choice [ parseRedisArray
, parseRedisBulkString
, parseRedisInteger
, parseRedisError
, parseRedisSimpleString
]
The whole thing is only 35 lines, including some whitespace and the type definitions. Now that we have a functional parser for the Redis data, we can get to the task of actually parsing the replication stream.
Applying the parser to the replication stream
As we saw in the previous article, you can get the replication stream by sending a SYNC\n
command to a Redis server. In Haskell, we will need to acquire a TCP socket, but otherwise this should be no problem. Parsing all the commands will be more tricky. We could use the many'
combinator to simply parse the stream into an infinite list of RedisValue
s, but this would rapidly run into problems because attoparsec will keep all of the input so far in memory to allow for arbitrary backtracking. So, it makes sense to parse only one value at a time, in order to allow the GC to let go of parts of the replication stream as fast as possible.
To actually accomplish this, we will make heavy use of attoparsecs parseWith
, which can be supplied with a ‘refill’ function as well as a string to be parsed. If the initial string to be parsed only contains a partial RedisValue
, attoparsec will call the refill function to obtain more data to parse, until there is enough data to either complete the parse or to decide that the input data does not describe a correct RedisValue
. This will be extremely useful for acquiring more input from the replication stream, especially when some of the values that should be parsed could be extremely large. The refill function is any function in some monad (usually IO
) that can be called to acquire more input. In our case, we will simply try to get more data from the Socket
by using the recv
function. Note that we can use the empty string as the initial value, since the empty string is not enough to succeed or fail parsing and so recv
will be called immediately:
parseWith (recv sock 1024) parseRedisValue ""
Since the replication stream we get from the server does not necessarily come in neat 1024-byte long blocks, and since there could be more commands per block, we may have some data left over after parsing a RedisValue
. Attoparsec will return both the parsed value and any remainder of the string for further parsing. In this example case we can simply print the value to STDOUT and recurse, feeding the ‘leftover’ string into parseWith
as the initial input. If the leftover string does not contain a complete RedisValue
, it will automatically call recv
again to obtain more input.
streamingParser :: Socket -> B.ByteString -> IO ()
streamingParser sock leftovers = do
result <- parseWith (recv sock 1024) parseRedisValue leftovers
case result of
Done rest result -> do
print result
streamingParser sock rest
Fail _ _ msg -> error msg
The streamingParser
function will loop infinitely, reading the replication stream from the supplied socket and printing the parsed RedisValue
s to the standard output. In a more serious application you may want to do additional filtering and/or transforming of the value before passing it on, instead of just printing the value (in addition to avoiding print
, since it will convert any bytestring
s into a regular String
first, which may lead to performance problems).
Hooking it all up
Now that we have a function which can parse an infinite stream of RedisValue
s from a socket, all we have to do is to hook it up to a socket and test it. We will just borrow some code from the documentation of Network.Socket
for our client:
main = withSocketsDo $ do
addr <- resolve "127.0.0.1" "6379"
bracket (open addr) close talk
where
resolve host port = do
let hints = defaultHints { addrSocketType = Stream }
addr:_ <- getAddrInfo (Just hints) (Just host) (Just port)
return addr
open a = do
sock <- socket (addrFamily a) (addrSocketType a) (addrProtocol a)
connect sock $ addrAddress a
return sock
talk sock = do
sendAll sock "SYNC\n"
Done rest (RedisBulkString rdb) <- parseWith (recv sock 1024) parseRDB ""
putStr $ "Skipped " ++ (show . B.length $ rdb) ++ " bytes of RDB data.\n"
streamingParser sock rest
For some reason, the initial streaming of the .rdb
file by the Redis master almost follows the Redis protocol specification for a RedisBulkString
, but it does not include the newline at the end. I wrote a parseRDB
parser for it which is exactly the same as the parseRedisBulkString
parser except for the newline at the end. I have uploaded to whole thing as a gist if you are interested in running it yourself. The output looks like this (in ghci):
*RedisParsing> main
Skipped 177 bytes of RDB data.
["PING"]
["PING"]
["PING"]
["SELECT","0"]
["setex","a","5","abc"]
["DEL","a"]
["PING"]
In this example, I set up a connection to an empty Redis instance running locally on my machine. It sends over the RDB file and then starts pinging as usual. After some time I use the redis-cli
tool to send a SETEX
command and after 5 seconds when the key expires, the master sends out a DEL
for the expired key. This is all as expected.
Conclusion
It was pretty straightforward to define some parsers for the various shapes a RedisValue
can have and to apply these to the incoming replication stream. However, operating like this will still not update the INFO
section of the master. It also uses the older SYNC
method instead of the newer PSYNC
. In a next article we will look at ways to improve these two shortcomings.