15 Practical Solutions How to Implement Tracing With C

15 Practical Solutions How to Implement Tracing with C #

Hello, I am Xu Changlong.

In the previous lessons, we explained the key principles of MySQL and multiple distributed retrieval systems, understanding how they achieve distributed data storage and retrieval. I believe you already have a clear understanding of the main optimization ideas for write-heavy, read-light systems, including: aggregating logs with distributed queues, using memory cache for newly written data, sequentially writing to disks, sharding across multiple servers, and having distributed queryable split indexes.

However, you may feel that these concepts are a bit detached from our business logic. In this lesson, I will share how I implemented a link tracing system using C++.

By analyzing the main ideas and key details of this system implementation, you will not only learn practical techniques for business scenarios but also, more importantly, connect technical understanding with business implementation, gaining a deeper understanding of write-heavy, read-light systems.

Case Background #

In 2016, I was working at Weibo, where there were many important but complex internal systems. Due to their interdependencies and the inability to log into the public cluster, it was always painful to troubleshoot issues.

Many problems required continuous logging and trial and error, which usually took about three days to figure out. In order to troubleshoot online failures more efficiently, we needed some tools to assist in improving problem-solving efficiency. So, I collaborated with a few colleagues to develop a distributed tracing system.

At that time, I only had two servers with 4 cores and 8GB of memory, and the available hardware resources were limited. Therefore, the storage and computational functions of the distributed tracing system were implemented using C++ 11. The biggest challenge of this project was how to record all the request processes and be able to monitor online failures in real-time, assisting in troubleshooting problems with limited resources.

To build such a system, there are several key functionalities: log collection, log transmission, log storage, log querying, real-time performance statistics display, and failure clue collection. After discussions, we determined the specific implementation approach for the project, as shown in the following diagram:

Image

The first step of distributed tracing is log collection. After reviewing the relevant materials on distributed tracing, I decided to design and implement according to the distributed tracing approach. By doing so, it was possible to gather all the relevant logs of a single request by generating the TraceID for each request entry.

But what specific logs should be collected to help with troubleshooting? If the distributed tracing only records the performance of interfaces, it can only assist us in analyzing performance issues, without much significance for troubleshooting logic problems.

After further discussions, we decided to include TraceID in both graded logs and exception logs, allowing us to obtain more information about the business process status. Additionally, we added TraceID and RPCID to the request headers when making requests to other services, and recorded the parameters, response content, and performance data of API and SQL requests. By combining all these, a comprehensive logging monitoring and tracking system can be implemented, enabling us to troubleshoot both performance issues and logic defects.

Next, let’s take a look at how the main functionalities here are implemented.

Data Capture, Collection, and Transmission #

How do we implement log collection in our system?

I believe you can roughly guess the approach: in general, when the interface is requested, we need to receive the passed TraceID and RPCID. If there is no passed TraceID, we can generate one using UUID to identify all the logs during the request.

Image

When a service is requested, it is recommended to record an access log of the invocation. Specifically, we can record the current requested parameters, the result, httpcode, and response time of the interface. This log can facilitate the analysis of service performance and failures.

Image

As for the business logs, error logs generated during the request, as well as the logs requesting other resources, they all require detailed recording, such as SQL query records, API request records, and the parameters, responses, and response time of these requests.

Image

Whether we want to do distributed tracing or monitor the status of system services, we need to intercept database operations or API requests using AOP-like aspect programming to capture the data before and after the operations. To help you understand better, I can provide you with an implementation example of AOP, which I used in a previous production environment.

After recording the part about project request-dependent resources, I used two transmission methods to transport the generated logs. One method is to push the logs to our log collection service via memcache’s long connection protocol. If the push log request times out for more than 200ms, it will be discarded, so as to avoid affecting the performance of the interface.

The other method is to save the logs to the local disk and use Filebeat to capture and push the logs in real-time, consolidating the log collection. Of course, the second method is more stable, but due to restrictions on logging in to our public server cluster, some systems can only use the first method to transmit logs.

As mentioned earlier, since we are tracing core systems with complex business logic, we have recorded the entire request process and the parameters returned.

It can be foreseen that this approach will generate a large amount of logs, and the write concurrency throughput of the logs is very high. Sometimes, the payment system even experiences situations where logs are written at a rate of 100MB/s during service peaks. Therefore, our log writing and transmission require robust performance support while ensuring that logs are not lost.

For this reason, we chose to use Kafka to transmit the logs. Kafka achieves load balancing and dynamic scaling by dynamically allocating multiple partitions for the same topic. When our traffic exceeds its capacity, we can easily scale up by adding servers to the server group, thereby providing better throughput. It can be said that real-time high-throughput transmission and synchronization between multiple systems are almost all implemented using Kafka.

Dynamic scalable group consumption #

So how does Kafka help businesses dynamically scale consumption performance?

Image

To consume data in Kafka, consumer group group consumption is used. Group consumption is a great implementation that allows multiple services to consume a group of data simultaneously. For example, we can start two processes to consume data from 20 partitions, with each service responsible for consuming data from 10 partitions.

If the consumption capacity of the services is not sufficient during operation and message backlog occurs, we can start two additional consumer processes on two new servers. At this point, Kafka will automatically reschedule (rebalance) the consumer processes, evenly distributing the 20 partitions among the four consumer processes, so that each consumer process consumes data from 5 partitions.

With this functionality, we can dynamically scale the capacity of the consumer servers, such as increasing the number of consumer processes at any time to improve consumption capacity, and even restart some consumer services whenever necessary.

This feature makes dynamic scaling easier, which is very helpful for data stream transmission or synchronization services with high write concurrency. Almost all eventually consistent data services are implemented using distributed queues. Many data synchronization processes between systems within Weibo have been switched to using Kafka for synchronization.

Based on Kafka’s grouping feature, we have divided the services into two groups: one for data statistics and one for storage. This way, we can separate storage from real-time statistics services.

Write More, Read Less RocksDB #

Next, let’s talk about how distributed storage is handled, as this is the most distinctive feature of the self-implementation. In addition, the implementation of the computing part is similar to the situation in the 13th lesson, which you can refer back to here.

Considering that there are only two storage servers, I need to provide a log storage and retrieval service that has good write performance and supports “retrieval”. After searching and comparing, I finally chose RocksDB.

RocksDB is a product initially experimented by Facebook, and it has been continuously improved and widely used by users. It provides a service that surpasses LevelDB in terms of write performance and can store data on Flash, disk, and HDFS media. It also leverages multi-core and SSD technologies to provide high-performance, high-load data storage services.

Since RocksDB is embedded, there is no network overhead between our implemented service and the storage engine, resulting in better performance. When combined with Kafka group consumption, we can achieve a distributed storage system without replicas.

First, I was attracted to the write performance of the RocksDB engine. Recall the content we learned in the 10th lesson. RocksDB leverages memory as a cache and the strong sequential write performance of disk. It can provide a write data capacity close to 300M/s on a single machine. In the ideal case, two storage servers can provide a write capacity of 600M/s. Together with Kafka alleviating write peak pressure, this design already meets the needs of most business requirements.

Secondly, RocksDB is very easy to integrate. To introduce its library into a project, you just need to ensure that its write operations are performed by a single thread, while other threads can instantiate a Secondary read-only.

In addition, RocksDB also supports automatic management of hot and cold data in memory and disk, data compression, and a single database can store up to TBs of data, with a single value length reaching 3G. This is very suitable for storing and searching a large number of text logs in a distributed trace system.

The next problem to solve is how to allocate and manage our Trace logs in RocksDB.

In order to improve query efficiency and keep only 7 days of logs, we chose to save logs on a daily basis, with one RocksDB database for each day. Expired databases can be deleted or archived to HDFS.

When summarizing and saving logs, we utilized two features of RocksDB. On one hand, we stored logs by using the TraceID of the Trace logs as the key, so we can directly retrieve all related logs by TraceID.

On the other hand, we utilized the Merge operation to append the values of the KV. Merge is a rarely mentioned feature in RocksDB, but it works well. It helps us efficiently append all logs to a single key. You can obtain the official demo code for the Merge operation from here. If you are interested in the implementation principle, you can also refer to rocksdb-merge-operators.

Distributed Query and Computation #

After the data is stored, how do we query it?

In fact, it’s quite simple. Our Trace SDK will include the TraceID in the header when returning the response content for each interface. When debugging, you can use the returned TraceID to query. The interface will send query requests to all storage nodes, retrieve all the log entries separated by line breaks from RocksDB using the TraceID, and then aggregate and sort them.

In addition, the log storage service integrates Libevent, which implements query interfaces for the HTTP and Memcache protocols. Since this part is more complex and has multiple modes, I won’t go into detail here. If you want to know how to implement a simple HTTP service using epoll and Socket, you can take a look at the demo I wrote in my spare time.

Let me add that we also need to know how to query data from multiple nodes. Since read operations are rare, we can asynchronously send queries to multiple storage instances and then aggregate and sort the results at the coordinating node.

Now that we’ve talked about data querying, let’s move on to distributed computation.

To achieve server status statistics calculation, the key is to use Kafka’s group consumer. Additionally, start a group of services to consume log contents and perform summary calculations in memory.

If you want to sample the request status of servers, you can periodically index and randomly select 1000 TraceIDs into the time block index of RocksDB. When you need to display them, retrieve and aggregate them. I have already discussed the algorithms and strategies for real-time calculation in Lesson Thirteen. You can review it.

That’s all about the overall ideas of self-implementation. After reading this, you may be curious whether it’s still necessary to learn these things now that hardware resources are abundant.

In fact, even in the era of abundant hardware resources, we still need to consider costs. Let’s do a calculation. For example, if the performance of 2000 servers is doubled, we can save 1000 servers. If each server costs 10,000 yuan in maintenance per year, then it means we can save 10 million yuan per year. Besides solving business problems, most of the time architects are thinking about how to reduce costs while ensuring stable services.

Furthermore, I want to give some suggestions on selecting open source projects. Since many open source projects are co-developed and some are personal projects that have not been validated in production environments, we should try to choose those that have been validated in production environments and have active community support.

Although I previously implemented the trace system using C++, technology is progressing rapidly. If it were today, I would not recommend you to use the same approach to build this service. When putting it into practice, you can consider using languages like Java, Go, Rust, etc. I believe it will save you a lot of time.

Summary #

In this lesson, I shared with you my practical solution for implementing link tracking using C++. You can refer to the technical points in the diagram below.

Image

Systems with low write-to-read ratio often use distributed queue services (such as Kafka) to collect data and consume it with multiple servers or shards to process the data. This architecture helps to handle the data flood.

In this chapter, we analyzed several solutions for systems with low write-to-read ratio in detail, and you will find that each solution has its own advantages. To facilitate your comparison and learning, I introduced MySQL as a reference.

You can also refer to the following table to list out the key points of technical implementation (such as data transmission, writing, sharding, scalability, querying, etc.). By doing so, you can quickly analyze which technical implementation is more suitable for your project’s business needs.

Image

Thought Questions #

Today, I have prepared two thought questions for you.

Question 1: How to solve the occasional disorder and low probability duplicate consumption in Kafka?

Question 2: The second question is a bit more challenging. If you are interested, you can give it a try. When implementing epoll, there are several ways: single-threaded Reactor, single Reactor with multiple threads, and multi-threaded Reactor. Which way do you think is more suitable for storage services?

Feel free to discuss and exchange ideas with me in the comments section. See you in the next class!