13 How Java Producers Manage Tcp Connections

13 How Java Producers Manage TCP Connections #

Hello, I’m Hu Xi. Today I want to share with you the topic: How does the Java producer in Kafka manage TCP connections?

Why adopt TCP? #

All communication in Apache Kafka is based on TCP, rather than HTTP or other protocols. This applies to communication between producers, consumers, and brokers. You may wonder why Kafka doesn’t use HTTP as the underlying communication protocol. There are many reasons for this, but the main reason lies in the differences between TCP and HTTP.

From a community perspective, when developing clients, people can leverage some advanced features provided by TCP itself, such as multiplexing requests and the ability to concurrently poll multiple connections.

Multiplexing requests refer to the process of merging two or more data streams into a single underlying physical connection. TCP’s multiplexing requests create multiple virtual connections on a single physical connection, with each virtual connection responsible for routing its corresponding data stream. Strictly speaking, TCP does not actually perform multiplexing; it only provides reliable message delivery semantics, such as automatic retransmission of lost packets.

To be more precise, as a packet-based protocol, TCP can be used in scenarios that require multiplexing connections only if the upper-layer application protocol (such as HTTP) allows the sending of multiple messages. However, we won’t go into the details of TCP principles today, so you only need to know that this is one of the reasons why the community chose TCP.

In addition to the advanced features provided by TCP that may be used by Kafka client developers, the community has also found that existing HTTP libraries in many programming languages are somewhat limited.

Based on these two reasons, the Kafka community decided to adopt the TCP protocol as the underlying protocol for all request communications.

Overview of Kafka Producer Program #

The main object of Kafka’s Java producer API is the KafkaProducer. Typically, there are four steps to develop a producer.

Step 1: Create the parameter object required to construct the producer object.

Step 2: Create an instance of the KafkaProducer object using the parameter object from step 1.

Step 3: Use the send method of KafkaProducer to send messages.

Step 4: Call the close method of KafkaProducer to close the producer and release system resources.

If the above steps are written in Java code, it would look something like this:

Properties props = new Properties();
props.put("param1", "value1");
props.put("param2", "value2");
// ...
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
    producer.send(new ProducerRecord<>(...), callback);
    // ...
}

This code uses the try-with-resource feature provided by Java 7, so there is no explicit call to producer.close(). Regardless of whether close is explicitly called or not, most producer programs follow this approach.

Now, the question is, when we develop a producer application, the producer will send messages to the Kafka cluster’s specified topic, which inevitably involves establishing TCP connections with the Kafka Broker. So, how does Kafka’s producer client manage these TCP connections?

When is TCP connection created? #

To answer the above question, we first need to understand when the producer code creates a TCP connection. In the given code, there might be two places where a TCP connection could be created: at the line Producer producer = new KafkaProducer(props) and producer.send(msg, callback). Which one do you think creates the TCP connection to the broker? The former, the latter, or both? Take 5 seconds to think about it, and then I will give you my answer.

First, when the producer application creates an instance of KafkaProducer, it establishes a TCP connection with the broker. More accurately, when creating the KafkaProducer instance, the producer application will create and start a background thread called “Sender”, which will first establish a connection with the broker. Here is a snippet of log from a test environment to illustrate this:

[2018-12-09 09:35:45,620] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9093 (id: -2 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)

[2018-12-09 09:35:45,622] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9093 (id: -2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)

[2018-12-09 09:35:45,814] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)

[2018-12-09 09:35:45,815] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)

[2018-12-09 09:35:45,828] DEBUG [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9093 (id: -2 rack: null) (org.apache.kafka.clients.NetworkClient:1068)

You may wonder how this is possible. If we don’t call the send method, the producer doesn’t know which topic to send the message to, so how does it know which broker to connect to? Does it connect to all brokers specified in the bootstrap.servers parameter? Well, yes, that’s exactly how the Java Producer is designed.

Let me explain the bootstrap.servers parameter a little. It is one of the core parameters for the producer and specifies the list of broker addresses that the producer should connect to when starting up. Please note that “when starting up” means that the producer initiates connections with these brokers when it starts. So, if you provide information for 1000 brokers in this parameter, unfortunately, your producer will establish TCP connections with all 1000 brokers when it starts.

In practice, I don’t recommend configuring all broker information in bootstrap.servers. Usually, specifying 3-4 brokers is sufficient. Because once the producer connects to any broker in the cluster, it can obtain information about the entire cluster’s brokers, so there is no need to specify all brokers in bootstrap.servers.

Let’s review the log output above. Pay attention to the content I marked in orange. From this log, we can see that the producer application starts creating TCP connections with two brokers even before the KafkaProducer instance is created and before the messages are sent. In my test environment, I configured “localhost:9092” and “localhost:9093” to simulate different brokers, but this does not affect the discussion later. Also, the last line of the log output is crucial: it shows that the producer sends a METADATA request to one of the brokers to retrieve cluster metadata information - this is the method mentioned earlier that allows the producer to obtain all information about the cluster. Talking about this, I have some personal opinions that I want to share with you. Generally speaking, I don’t believe that the code written or the design made by the community is always right, so many similar “doubts” occasionally come to my mind.

Let’s take the creation of an instance of KafkaProducer today as an example. The official documentation of the community mentions that the KafkaProducer class is thread-safe. I personally have not thoroughly verified whether it is truly thread-safe, but after a rough browse through the source code, I can conclude the following: the threads created by the KafkaProducer instance and the Sender thread mentioned earlier share only one mutable data structure, which is the RecordAccumulator class. Therefore, by maintaining the thread safety of the RecordAccumulator class, the KafkaProducer class achieves thread safety as well.

You don’t need to understand what the RecordAccumulator class does, you just need to know that its main data structure is a ConcurrentMap. TopicPartition is the Java object used by Kafka to represent a topic partition, and it is itself an immutable object. Additionally, in the code of the RecordAccumulator, there are locks protecting the places where Deque is used, so it can be basically assumed that the RecordAccumulator class is thread-safe.

After saying all this, what I actually want to say is that even though the KafkaProducer is thread-safe, I do not agree with the practice of starting the Sender thread when creating a KafkaProducer instance. Brian Goetz, the author of “Java Concurrency in Practice,” explicitly points out the risk of doing so: starting a thread in a constructor can lead to the escape of the “this” pointer. Theoretically, the Sender thread can observe a KafkaProducer instance that has not yet been fully constructed. Of course, there is no problem in creating the thread during object construction, but it is best not to start it at the same time.

Well, let’s get back on track. Regarding when TCP connections are created, our current conclusion is: TCP connections are established when creating a KafkaProducer instance. However, we want to ask, does it only get created at this time?

Of course not! TCP connections can also be created in two other places: after metadata update and during message sending. Why do we say “can”? Because TCP connections are not always created in these two places. When the Producer updates the metadata information of the cluster, if it finds that there is no connection to certain Brokers at that time, it will create a TCP connection. Similarly, when sending messages, if the Producer finds that there is no connection to the target Broker yet, it will also create a connection.

Next, let’s look at the two scenarios where the Producer updates the cluster metadata information.

Scenario 1: When the Producer tries to send a message to a non-existent topic, the Broker will inform the Producer that the topic does not exist. At this time, the Producer will send a METADATA request to the Kafka cluster, trying to obtain the latest metadata information.

Scenario 2: The Producer periodically updates the metadata information by using the metadata.max.age.ms parameter. The default value of this parameter is 300000, which is 5 minutes. This means that the Producer will forcibly refresh the metadata every 5 minutes to ensure that it has the most up-to-date data, regardless of whether there are changes in the cluster.

Having said all this, we can “challenge” the rationality of this design by the community for the Producer. Currently, a Producer by default creates TCP connections to all Brokers in the cluster, regardless of whether there is actually a need to transmit requests. This is obviously unnecessary. Additionally, Kafka also supports forcibly closing idle TCP connections, which makes this even more redundant.

Imagine in a cluster with 1000 Brokers, your Producer may only communicate with 3 to 5 of them in the long run, but after the Producer starts, it sequentially creates TCP connections to all 1000 Brokers. After a while, approximately 995 TCP connections will be forcibly closed. Isn’t this a waste of resources? Clearly, there is room for improvement and optimization here.

When to close TCP connections? #

After discussing the creation of TCP connections, let’s talk about when they are closed.

There are two ways for the Producer to close a TCP connection: one is actively initiated by the user, and the other is automatically closed by Kafka.

Let’s start with the first one. The active closure mentioned here actually includes the user calling “kill -9” to forcibly terminate the Producer application. Of course, the recommended way is to call the producer.close() method to close the connection.

The second way is Kafka closing the connection for you, which depends on the value of the connections.max.idle.ms parameter on the Producer side. By default, this parameter is set to 9 minutes, which means that if no requests “pass through” a TCP connection within 9 minutes, Kafka will actively close the connection for you. Users can disable this mechanism by setting connections.max.idle.ms=-1 on the Producer side. Once set to -1, the TCP connection becomes a permanent long connection. However, this is only a software-level “long connection” mechanism. Since the Socket connections created by Kafka have keepalive enabled, the keepalive detection mechanism will still be followed.

It is worth noting that in the second way, the TCP connection is closed on the Broker side, but the initiator of this TCP connection is the client. Therefore, from the perspective of TCP, this is considered as a passive close scenario, meaning a passive close. The consequence of passive closure is the generation of a large number of CLOSE_WAIT connections, so the Producer or Client side does not have a chance to explicitly observe that this connection has been interrupted.

Summary #

Let’s summarize today’s content briefly. For the latest version of Kafka (2.1.0), the way Java Producer manages TCP connections is as follows:

  1. When the KafkaProducer instance is created, the Sender thread is started, which creates TCP connections to all Brokers specified in bootstrap.servers.
  2. After the KafkaProducer instance updates metadata information for the first time, it will create TCP connections to all Brokers in the cluster again.
  3. If the Producer finds that there is no TCP connection to a particular Broker when sending messages, it will create the connection immediately.
  4. If the Producer-side parameter connections.max.idle.ms is set to a value greater than 0, the TCP connections created in step 1 will be automatically closed. If this parameter is set to -1, the TCP connections created in step 1 will be unable to be closed, becoming “zombie” connections.

Open discussion #

Do you have any ideas for improving the community design challenge we have today?

Please write down your thoughts and answers, and let’s discuss together. If you find it rewarding, feel free to share the article with your friends.