21 How Java Consumers Manage Tcp Connections

21 How Java Consumers Manage TCP Connections #

Hello, I am Hu Xi. Today I want to share with you a topic: how Java consumers in Kafka manage TCP connections.

In Lesson 13 of this series, we specifically discussed how Java producers manage TCP connection resources. You should still remember, right? Today’s topic is a sister topic to that one, where we will delve into how Java consumers in Kafka manage TCP or socket resources. Only after completing today’s discussion will we have a comprehensive understanding of Kafka client’s TCP connection management mechanism.

Just like before, today I will use the terms TCP and socket interchangeably. After all, in the world of Kafka, whether it’s ServerSocket or SocketChannel, they both implement the TCP protocol. Or in other words, Kafka’s network transport is based on the TCP protocol, not the UDP protocol. Therefore, when I mention TCP connections or socket resources today, I am referring to the same thing.

When is the TCP connection created? #

Let’s start by discussing the creation of a TCP connection by the consumer. The main entry point for the consumer is the KafkaConsumer class. Unlike the producer, when you create an instance of KafkaConsumer, no TCP connection is created. This means that after executing the new KafkaConsumer(properties) statement, you will find that no socket connection has been created. This is different from the Java producer, where the KafkaProducer entry class silently starts a Sender thread in the background, responsible for creating the socket connection.

From this perspective, I personally think that the design of KafkaConsumer is better than KafkaProducer. As I mentioned in Lecture 13, starting a thread in a Java constructor can cause the “this” pointer to escape, which is always a risk.

If the socket is not created in the constructor, is it created in the KafkaConsumer.subscribe or KafkaConsumer.assign methods? Strictly speaking, it is not. Let me give you the answer directly: the TCP connection is created when the KafkaConsumer.poll method is called. To be more precise, there are three moments within the poll method where the TCP connection can be created.

  1. When sending a FindCoordinator request.

Do you remember the coordinator component on the consumer side? It resides in the memory of the broker and is responsible for managing the consumer group members and offset commits of individual consumers. When the consumer program calls the poll method for the first time after startup, it needs to send a FindCoordinator request to the Kafka cluster, hoping that the cluster will tell it which broker is its coordinator.

However, which broker should the consumer send this type of request to? In theory, any broker can answer this question, which means that the consumer can send the FindCoordinator request to any server in the cluster. On this issue, the community has made a small optimization: the consumer program sends the request to the broker with the lowest current load in the cluster. How is the load evaluated? It’s actually quite simple: it’s determined by which broker among all the brokers connected by the consumer has the fewest pending requests to send. Of course, this evaluation is clearly a one-way evaluation from the consumer side and does not necessarily result in the optimal solution globally. However, this does not affect our discussion. In summary, at this step, the consumer will create a socket connection.

  1. When connecting to the coordinator.

After the broker has handled the FindCoordinator request sent in the previous step, it will return the corresponding response to explicitly inform the consumer of which broker is the real coordinator. Therefore, at this step, once the consumer knows the actual coordinator, it will create a socket connection to that broker. Only when connected to the coordinator can the coordinator perform normal group coordination operations, such as joining the group, waiting for the group assignment plan, handling heartbeat requests, fetching offsets, and committing offsets.

  1. When consuming data.

The consumer will create a TCP connection to the broker hosting the leader replica of each partition it wants to consume. For example, suppose the consumer wants to consume data from 5 partitions, and the leader replicas of these 5 partitions are distributed among 4 brokers. In that case, the consumer will create a socket connection to each of these 4 brokers when consuming data. 在此日志中,消费者程序创建了多个TCP连接。为了总结上述内容的主要点,我们可以得出以下结论:

  • 消费者程序创建的第一个TCP连接(nodeId = -1)用于发送FindCoordinator请求,以获取协调者的信息。
  • 一旦获取到协调者的信息,消费者程序会复用此连接发送其他请求,如获取集群元数据。
  • 消费者程序创建第二个TCP连接(nodeId为协调者的ID)以连接到协调者,并执行组管理操作。
  • 最后,消费者程序创建多个TCP连接以执行实际的消息获取操作。每个连接对应一个要消费的分区的领导者副本所在的Broker。

关于这些TCP连接的生命周期和何时被关闭,日志中并没有明确提及。一般而言,TCP连接在完成请求后会被关闭,以释放资源并维持较少的连接数。消费者程序可能会自动管理这些连接的打开和关闭,以提高性能和效率。具体的实现取决于消费者程序的设计和配置。

因此,我们无法从这段日志中确定TCP连接的确切生命周期和关闭时间。如果您需要进一步了解或验证TCP连接的生命周期,建议参考相关的Kafka文档、源代码或与该程序的开发者交流。

When to Close TCP Connections? #

Similar to producers, closing TCP connections for consumers can be done actively or automatically by Kafka. Active closing means that you explicitly call the consumer API method to close the consumer, which can be done by manually calling the KafkaConsumer.close() method or executing a Kill command, whether it’s Kill -2 or Kill -9. On the other hand, automatic closing is controlled by the consumer-side parameter connection.max.idle.ms, which has a default value of 9 minutes. This means that if there is no request passing through a socket connection for 9 consecutive minutes, the consumer will forcibly terminate this socket connection.

However, unlike producers, if you use a loop to call the poll method for message consumption in your consumer program, all the mentioned requests will be sent to the broker regularly. This ensures that there are always requests being sent on these socket connections, thus achieving the effect of a “long connection”.

Regarding the three types of TCP connections mentioned above, it is important to note that once the third type of TCP connection is successfully established, the consumer program will discard the first type of TCP connection. Afterwards, when requesting metadata periodically, it will switch to using the third type of TCP connection. In other words, you will eventually find that the first type of TCP connection is silently closed in the background. For a consumer program that has been running for a while, only the latter two types of TCP connections will exist.

Possible issues #

In theory, I have already explained the mechanism of Kafka Java consumer managing TCP resources. However, upon careful consideration of the design principles, some problems can still be found.

As we just mentioned, the first type of TCP connection is only created to obtain metadata for the first time and will be discarded later. The fundamental reason is that the consumer does not know the information of the Kafka cluster when it starts. It can only use a “fake” ID to register. Even if the consumer obtains the real Broker ID, it still cannot differentiate which broker corresponds to this “fake” ID and therefore cannot reuse this socket connection. Instead, a new connection needs to be created.

Why does this situation occur? It is mainly because currently Kafka only uses the ID as the only dimension of data to represent socket connection information. This information is clearly insufficient to determine which broker the connection is associated with. Perhaps in the future, the community should consider using the triplet of <hostname, port, ID> to locate socket resources. This may reduce the number of TCP connections the consumer program needs to create.

You may ask, since Kafka has a mechanism to close connections after a certain period of time, how big of a deal is this? In fact, in practical scenarios, I have seen many cases where connection.max.idle.ms is set to -1, which disables the mechanism for closing connections. If this is the case, these TCP connections will not be cleared regularly and will only become permanent “zombie” connections. Due to this reason, the community should consider better solutions.

Conclusion #

Alright, today we completed the missing piece of the puzzle for managing TCP connections in the Kafka Java client. We not only described in detail how Java consumers create and close TCP connections, but also provided some of our own thoughts on the current design. We hope that you can apply this knowledge to your own business scenarios in the future and have a clear understanding of Socket management in actual production environments.

Open Discussion #

Suppose there is a Kafka cluster consisting of 2 brokers, and a topic with 5 partitions. When a consumer program that consumes this topic starts, how many socket connections do you think this program will create? Why?

Feel free to share your thoughts and answers, and let’s discuss together. If you find this article helpful, please feel free to share it with your friends.