Streaming the Redis replication stream
Back in 2019, I wrote a post in which I constructed an Attoparsec parser for the Redis protocol. This protocol is used in all communication between Redis clients and servers and also in the Redis replication protocol, since registering as a replication client is just another command to be executed from the perspective of the Redis server. In the previous post I constructed a simple function to read from the replication socket and iteratively apply the parser, but there are simpler and more idiomatic ways to achieve the same goal with Conduit
s. In this post I’ll implement this simpler way and also show off a nicer way to initialize the replication stream with PSYNC
that does not rely on pulling in the entire redis contents first.
Skipping the contents of the master Redis
In the previous article we used the SYNC
command to start replication. This works, but will also send over the entire contents of the Redis instance you are trying to connect to. This is not always what you want, since there can be a significant amount of data in there. Luckily, if you are just interested in the replication stream there is also the PSYNC
command. This command is intended for resuming replication at a certain point in the stream, for example if the connection dropped only momentarily. In our case, we can also use it to skip the transmission of the contents and just get the replication stream. It takes two arguments: the replication id of the redis instance to follow and an offset in the replication stream from which to start. The replication id and the current offset of the master Redis can be pulled from the output of the INFO REPLICATION
command:
127.0.0.1:6379> INFO REPLICATION
# Replication
role:master
connected_slaves:1
slave0:ip=127.0.0.1,port=0,state=online,offset=0,lag=909
master_replid:8bf049fa0df8a8e0b9c79a60bc5890a821540c8a
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:1274
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:1274
In this particular case, the replication id is 8bf049fa0df8a8e0b9c79a60bc5890a821540c8a
and the corresponding offset is 1274
. Submitting a PSYNC with these parameters results in a reply consisting of a Redis “simple string” containing the word CONTINUE
and afterwards the replication stream starting at the specified offset:
+CONTINUE
*1
$4
PING
*1
$4
PING
This technique can be very useful when starting replication without having to work through the contents of the database first. However, Redis will only retain a part of the entire replication stream to prevent excessive memory use. The amount retained is configurable. In this case, because we use INFO REPLICATION
and use the offset obtained immediately afterwards, the offset will still be available in all but the busiest Redis instances. If the replication offset falls outside the retained part, Redis will fall back to the normal behavior as if you had used SYNC
instead of PSYNC
.
Combining Attoparsec and Conduit
In the previous article we wrote a small function to read data from the socket and iteratively apply the Redis protocol parser to it. This function looked like this:
streamingParser :: Socket -> B.ByteString -> IO ()
streamingParser sock leftovers = do
result <- parseWith (recv sock 1024) parseRedisValue leftovers
case result of
Done rest result -> do
print result
streamingParser sock rest
Fail _ _ msg -> error msg
This function uses the parseWith
function, which can be supplied with an IO action to fetch more input. In this case this is recv sock 1024
, to read chunks of at most 1024 from the socket. It then prints any parsed values to STDOUT
, or just errors out if the messge could not be parsed. This is not really a problem in practice, since Redis only emits valid messages in its own protocol. While this function performs its task pretty well, it is not very extensible and is not very reusable with respect to the wider Haskell ecosystem. We also need to do some fiddling with leftover values that I would prefer to avoid having to do manually. As remarked on the previous article by the reddit user Solonarv, the conduit-extra
package already contains several functions to use sockets as sources for conduits and for applying parsers to them. This sounds exactly like what we need! After rewriting the function using conduits it becomes much shorter:
streamingConduitParser :: Socket -> IO ()
streamingConduitParser sock = runConduit $ sourceSocket sock .|
conduitParser parseRedisValue .|
Conduit.map snd .|
Conduit.print
Much better! Not only is the manual leftover management gone, we can also easily swap out the final stage of our conduit for something else. There are many, many options available that provide conduits of every shape and size. The only part I am not super happy with is having to map snd
over the output of the parser, since it will output tuples (PositionRange,RedisValue)
that also give you the current position of the parser. Oh well.
Conclusion
Functional programming is a very good way to deal with streaming data, and the many excellent streaming libraries available in Haskell show this off. Apart from the readability benefits of parser combinators, their functional nature also shines when fitting them into conduit-like pipelines. Overall, I think this problem (following and parsing streaming binary data) is extremely well matched with the strong points of Haskell. To be honest, I still haven’t found a proper application of following along with the Redis replication yet, but at least it has been a fun topic for blog posts!