37 Kafka & Zmq for Automated Trading Workflow

37 Kafka & ZMQ for Automated Trading Workflow #

Hello, I am Jingxiao.

Before we start this lesson, let’s review what we have learned in the previous three lessons.

In Lesson 34, we introduced how to place orders on an exchange through RESTful API. In Lesson 35, we explained how to obtain order book data from an exchange through Websocket. In Lesson 36, we introduced how to implement a strategy and how to backtest the strategy.

In fact, at this stage, a simple and functioning quantitative trading system has been formed. You can repeatedly modify the strategy and expect a decent PnL (Profit and Loss). However, for a comprehensive quantitative trading system, a basic framework is not enough.

In large-scale quantitative trading companies, systems are generally run in a distributed manner, with different modules running independently on different machines and being connected to each other. Even for personal trading systems, when implementing algorithms like high-frequency arbitrage, the execution layer needs to be deployed on machines close to the exchange.

Therefore, starting from today’s lesson, we will return to the Python technology stack and explain to you how to implement complex collaboration between distributed systems from the perspective of a quantitative trading system.

Middleware #

Let’s start by introducing the concept of middleware. Middleware is a component that connects the underlying technical tools with the application layer. Its main purpose is to allow us, as engineers who need to use services, to not have to worry about the specific implementation details of the underlying layer. We only need to use the interface provided by the middleware.

This concept is not difficult to understand. Let’s use an example to make it clearer. For example, let’s talk about databases. There are many different types of databases at the underlying layer, from relational databases like MySQL to non-relational databases like NoSQL, and from distributed databases like Spanner to in-memory databases like Redis. Different databases have different use cases, pros and cons, and also different ways of calling them. So what is the role of middleware?

Middleware adds a layer of logic on top of these different databases, which is specifically used to interact with the databases. However, it only exposes the same interface externally. This means that when programmers in the upper layer call the middleware interface, they only need to specify the database in the middleware, while other parameters remain the same. This greatly facilitates the development in the upper layer. At the same time, when the lower layer technology stack is updated or replaced, it can be completely separated from the upper layer without affecting the programmers’ use.

You can refer to the diagram I drew below for the logical relationship between them. I jokingly refer to the role of middleware as: there is nothing that can’t be solved by adding another layer; if there is, then add two layers.

Of course, this is just one example and one form of middleware. In fact, at Alibaba, middleware mainly comes in three forms: distributed relational database (DRDS), message queue, and distributed service. Today, we will primarily focus on message queues, as they are very suitable for the application scenarios of quantitative trading systems, which use an event-driven model.

Message Queue #

So, what is a message queue? As its name suggests, a message is an individual unit of internet information transmission, while a queue is a data structure that you should be familiar with if you have studied algorithms and data structures. (If your algorithm foundation is not strong, I recommend that you learn from the “Data Structures and Algorithms” column by Geek Time’s Wang Zheng. Lesson 09 is about queue knowledge.)

In short, a message queue is a temporary container for storing messages. Some people push messages into the message queue, while others listen to the message queue and take away any new messages they find. As we just explained about middleware, it is clear that a message queue is also a type of middleware.

Currently, the most commonly used message queues in the market include RabbitMQ, Kafka, RocketMQ, and ZMQ. However, today I will only introduce the most frequently used ones: ZMQ and Kafka.

Let’s first think about the characteristics of a message queue as middleware.

First, there is strict temporal order. As mentioned earlier, a queue is a data structure that follows the first-in-first-out (FIFO) rule. If you throw “1, 2, 3” into it and then someone takes data out of it, they will definitely get “1, 2, 3” out, strictly ensuring that the data that goes in first comes out first, and the data that goes in later comes out later. Obviously, this is also a necessary requirement in the message mechanism. Otherwise, the result will definitely not be what we want.

Speaking of queue characteristics, let me briefly mention that, in contrast to “first-in-first-out”, there is the stack data structure, which is first-in-last-out. If you throw “1, 2, 3” into it and then take them out, you will get “3, 2, 1”. You must distinguish between these two.

Second, it is a common issue in distributed network systems. How to ensure that messages are not lost? How to ensure that messages are not duplicated? All of these have been considered when designing the message queue. You only need to use it without delving too deep into it.

However, an important point is how the message queue reduces system complexity and acts as middleware to decouple components. Let’s look at the following diagram.

The message queue operates in publish-subscribe mode, where one or more message publishers can publish messages, and one or more message receivers can subscribe to messages. In the diagram, you can see that there is no direct coupling between the message publisher and the message receiver. Specifically:

  • After a message publisher sends a message to the distributed message queue, it finishes handling the message.
  • After a message receiver obtains the message from the distributed message queue, it can proceed with subsequent processing without investigating where this message came from.

As for the issue of adding new businesses, as long as you are interested in this type of message, you can subscribe to it without any impact on the original system and business, thus achieving business scalability design.

After discussing so many conceptual things, I’m sure you can’t wait to see the specific code. Next, let’s take a look at the implementation of ZMQ.

ZMQ #

First, let’s take a look at ZMQ, which is a very lightweight message queue implementation.

The author, Pieter Hintjens, is a great person. His personal experience is also legendary. In 2010, he was diagnosed with bile duct cancer and successfully underwent surgery to remove it. But in April 2016, he discovered that the cancer had spread extensively to his lungs and was incurable. The last communication pattern he wrote about was the death protocol. Afterward, he chose euthanasia in Belgium.

ZMQ is a simple and easy-to-use transport layer that has three usage modes:

  • Request-Reply mode
  • Publish-Subscribe mode
  • Parallel Pipeline mode

The first mode is simple: the client sends a message to the server, the server processes it and returns it to the client to complete one interaction. You should be very familiar with this scenario, as it is similar to the HTTP mode. So, I won’t focus on it here. As for the third mode, it has nothing to do with today’s content, so I won’t discuss it in detail.

We need to take a detailed look at the second mode, the “PubSub” mode. Here is its specific implementation, and the code is very clear, so you should be able to understand it easily:

# Subscriber 1
import zmq


def run():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect('tcp://127.0.0.1:6666')
    socket.setsockopt_string(zmq.SUBSCRIBE, '')

    print('client 1')
    while True:
        msg = socket.recv()
        print("msg: %s" % msg)


if __name__ == '__main__':
    run()

########## Output ##########

client 1
msg: b'server cnt 1'
msg: b'server cnt 2'
msg: b'server cnt 3'
msg: b'server cnt 4'
msg: b'server cnt 5'



# Subscriber 2
import zmq


def run():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:6666')
socket.setsockopt_string(zmq.SUBSCRIBE, '')

print('client 2')
while True:
    msg = socket.recv()
    print("msg: %s" % msg)

if __name__ == '__main__':
    run()
import time
import zmq

def run():
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind('tcp://*:6666')

    cnt = 1

    while True:
        time.sleep(1)
        socket.send_string('server cnt {}'.format(cnt))
        print('send {}'.format(cnt))
        cnt += 1

if __name__ == '__main__':
    run()

Here is a brief explanation:

For the subscriber, what we need to do is to create a zmq Context and connect the socket to the specified port. The function setsockopt_string() is used to filter specific messages, and the code below:

socket.setsockopt_string(zmq.SUBSCRIBE, '') 

indicates that no messages are filtered. Finally, we call socket.recv() to receive the message. This statement will block until there is a new message.

For the publisher, we also need to create a zmq Context and bind it to the specified port. However, please note that we use bind instead of connect here. Because in any case, only one bind can exist for the same address and port, but there can be multiple connects to this location. After initialization, we can call socket.send_string to send the content we want to send to ZMQ.

Of course, there are several things to note here. First, with send_string, we can actually pass almost any data structure we want through JSON serialization, and the data flow structure here is already clear.

Also, putting time.sleep(1) of the publisher at the end of the while loop should not affect the result strictly speaking. Here you can try an experiment to see what happens.

You can also think about another question, what should ZMQ do if there are multiple publishers here?

Kafka #

Next, let’s take a look at Kafka.

You can also find that the code implementation of Kafka is similar to ZMQ. ZMQ is mainly lightweight, open source, and easy to use, but in industrial applications, most people tend to turn to mature tools like Kafka.

Compared with ZMQ, Kafka supports point-to-point networking and publish-subscribe models, which are also the most widely used message queue models. Like ZMQ, Kafka is also completely open source, so you can get full support from the open source community.

The code implementation of Kafka is similar to ZMQ, so I won’t explain it in detail here. As for more information about Kafka, Geek Time has a dedicated column called “Kafka Core Technologies and Practices”, in which Instructor Hu Xi explains Kafka’s practices and internals in detail. If you are interested, you can search for it on the Geek Time platform. In that column, you can learn and use Kafka with detailed explanation.

Orderbook Data Streaming Based on Message Queue #

Let’s go back to our quantitative trading system.

In a quantitative trading system, there are generally two uses for obtaining the orderbook: the strategy side uses it to get real-time data for decision-making, and it is backed up in files or databases for future use by the strategy and backtesting systems.

If we directly listen to the exchange messages on a single machine, the risk will be huge. This is called Single Point Failure in distributed systems. Once this machine fails or the network connection is suddenly interrupted, our trading system will be immediately exposed to risks.

Therefore, a natural idea is to place different machines in different regions, connect them to the exchange using different networks, and then aggregate and deduplicate the information collected from these machines to generate accurate data that we need. The corresponding topology diagram is as follows:

Of course, this approach also has obvious drawbacks: waiting for data from multiple data servers at the same time, plus the potential processing latency and network latency of the message queue. For the strategy server, it may increase tens to hundreds of milliseconds of delay. If it is a high-frequency or slippage-sensitive strategy, this approach needs to be carefully considered.

However, for low-frequency strategies and arbitrage strategies, the stability of the entire system and the decoupling of the architecture gained from this delay are still very worthwhile. However, you still need to be aware that in this case, the message queue server may become a bottleneck, which is the Single Point Failure mentioned earlier. Once it is disconnected, the system will still be exposed to risks.

In fact, we can use some mature systems, such as Alibaba’s Message Queue, AWS’s Simple Queue Service, etc. By using these very mature message queue systems, the risk will be minimized.

Summary #

In this lesson, we analyzed the middleware system in the field of modern software engineering, as well as its main application - message queues. We explained the basic patterns of message queues, including the point-to-point model, the publish-subscribe model, and some other models supported by message queues.

In real project design, we need to choose different models according to our own product requirements. At the same time, in programming practice, we should deepen our understanding of different skill points and decouple the complexity of the system. This is the necessary path to design high-quality systems.

Thought Questions #

Today’s thought questions, I have mentioned them in the text, but I will specifically list them here again to emphasize. In the part about ZMQ, I raised two questions:

  • What will happen if you try to put time.sleep(1) of the publisher at the end of the while loop? Why?
  • If there are multiple publishers, what should ZMQ do?

Please leave a comment with your thoughts and doubts, and feel free to share this article with more people to learn together.