32 Data Processing How to Efficiently Process Data Generated by Applications

32 Data Processing How to Efficiently Process Data Generated by Applications #

Hello, I’m Kong Lingfei. Today, let’s talk about how to better perform asynchronous data processing.

For troubleshooting and operations, a large-scale application stores some request data in a storage system for future use. For example, an application saves request logs to Elasticsearch for easy troubleshooting, and a gateway saves API request counts, request message bodies, and other data in a database for console querying and display.

To meet these requirements, we need to perform data collection. Data collection is common in large-scale applications, but I found that many data collection services designed by developers often have the following problems:

  • The collection service is developed only for specific collection requirements. If the collection requirements change, the main code logic needs to be modified. Code changes inevitably introduce potential bugs and increase the workload of development and testing.
  • The collection service increases the latency of existing service requests.
  • The collection data has poor performance and takes a long time to collect a batch of data.
  • When starting or stopping the service, collected data may be lost.

In this lecture, I will teach you in detail how to design and implement a data collection service to solve these problems.

Classification of Data Collection Methods #

First of all, you need to know the current methods of data collection in order to better understand the solution for asynchronous data processing.

Currently, there are primarily two methods of data collection: synchronous collection and asynchronous collection. The concepts and advantages and disadvantages of both are shown in the table below:

Image

Modern applications have increasingly high performance requirements, and asynchronous collection has less impact on the performance of the application. Therefore, asynchronous collection is more popular among developers and has been widely applied. The IAM Pump Server service that I am going to introduce next also adopts the asynchronous collection method.

Design of Data Collection System #

In this lecture, I will use a combination of theory and practical examples to demonstrate how to design a data collection service. First, let me introduce some theoretical knowledge about data collection, and then we will have specific practical examples.

In previous project developments, I found that many developers added data collection functionality, which seriously affected the performance of the entire application due to reasons such as synchronous data reporting, single-threading, and incorrect reporting logic. So, how can we minimize the impact on program performance during the collection process?

The answer lies in modeling the data collection. Through modeling, the designed collection system can be more versatile and meet many similar future requirements, eliminating the need to develop the same system repeatedly.

Today, I will give you a detailed introduction on how to model the data collection functionality and how this model solves the various problems mentioned earlier.

Core issues to be addressed when designing data collection systems #

To begin with, a collection system needs a data source, which can be one or more inputs. The data in the input comes from application reporting. The collected data usually needs to be processed, such as formatting, adding or deleting fields, filtering out unneeded data, etc., and then stored in downstream systems (outputs), as shown in the following diagram:

Image

Here, we need to address these three core issues:

  • Adding a data reporting step in the normal process for data collection will inevitably affect the performance of the program. So, how can we minimize the performance loss of the program?
  • What should we do if the speed at which input produces data exceeds the consumption capability of output, resulting in data accumulation?
  • After data collection, it needs to be stored in downstream systems. Before storage, we need to perform different processing on the data and may need to store it in different downstream systems. How can we meet these variable requirements?

To minimize the performance loss of the program, the best method is asynchronous reporting. If it is asynchronous, we need to first cache the data in memory and then asynchronously report it to the target system. Of course, to improve the reporting efficiency, batch reporting can be used.

For the problem of data accumulation, a good solution is to first report the collected data to some intermediate components with high throughput and the ability to store a large amount of data, such as Kafka or Redis. This approach is also an industry-standard handling method.

To address the issue of diverse collection requirements, we can make the collection program plug-in-based and extensible to meet the variable requirements.

To solve these three issues, it actually involves the design of two functional points in the data collection system, namely, data reporting functionality and data collection functionality. Next, let’s take a look at how to design these two functional points.

Design of Data Reporting Functionality #

To improve the throughput of asynchronous reporting, you can cache the data in memory (in Go, you can use a buffered channel) and use multiple workers to consume the data in memory. Using multiple workers can fully utilize the multi-core capability of the CPU. In addition, when reporting to downstream systems, you can also use batch reporting.

Design of Data Collection Functionality #

Modern applications pay more and more attention to plug-in-based and extensibility. When designing a collection system, future needs should also be taken into account. For example, in the future, you may need to switch from reporting data to MongoDB to HBase, or report data to both MongoDB and HBase simultaneously. Therefore, the program logic for reporting to downstream systems should have plug-in capabilities and be able to select the required plug-ins through configuration.

In order to improve program performance, the data is cached in memory. However, there is a drawback to this approach: when the program is shut down, the data in memory will be lost. Therefore, before the program ends, we need to ensure that the data in memory can be successfully reported, meaning the collection program needs to implement graceful shutdown functionality. Graceful shutdown not only ensures that the data in the cache is successfully reported, but also ensures that the data being processed is successfully reported.

Of course, since it is data collection, the collection frequency should also be configurable. Finally, because collection programs are usually non-API types, a special API needs to be exposed to return the health status of the collection program.

Data Collection Application Model #

Based on the above analysis and design, we can draw the following collection model:

Image

Asynchronous reporting requires additional asynchronous logic, which increases development effort and program complexity. Therefore, for scenarios where the speed at which the input produces data is less than the consumption speed of the output, and the output has high throughput and low latency characteristics, synchronous reporting, such as synchronous reporting to Redis, can also be used.

Data Collection System Implementation Project: iam-authz-server + iam-pump #

Above, I introduced the architecture of the data collection system, but only the model and theory are definitely not enough to meet your development needs for the data collection program. So, next, I will introduce how to implement the data collection architecture mentioned above. The entire architecture consists of two parts, each implemented by different services:

  • iam-authz-server: Implements data reporting functionality.
  • iam-pump: Implements data collection functionality.

The architecture of the entire collection system is completely consistent with the data collection architecture described above, so I won’t repeat it here.

iam-authz-server: Data Reporting #

The biggest challenge of data reporting is how to minimize the impact on application performance. To address this, our main solution is to perform data reporting asynchronously.

Next, I will introduce the data reporting design of iam-authz-server. This design is very mature and widely adopted in the projects I have developed and worked on. Some projects can handle billions of requests per day. Through this design, we will explore the specific methods of asynchronous data reporting, as well as the factors to consider during the reporting process.

The data reporting architecture of iam-authz-server is shown in the following diagram:

Image

The data reporting feature of iam-authz-server can be selectively enabled. The code for enabling it can be found in internal/authzserver/server.go, as shown below:

 if s.analyticsOptions.Enable {                                           
    analyticsStore := storage.RedisCluster{KeyPrefix: RedisKeyPrefix}              
    analyticsIns := analytics.NewAnalytics(s.analyticsOptions, &analyticsStore)    
    analyticsIns.Start()                                                   
    s.gs.AddShutdownCallback(shutdown.ShutdownFunc(func(string) error {    
        analyticsIns.Stop()    
        
        return nil    
    }))    
}    

In the above code, when s.analyticsOptions.Enable is true, the data reporting feature is enabled. Because data reporting can affect the performance of the program, and there may be scenarios in the future where the data reporting feature needs to be disabled, we have made the data reporting feature configurable when designing iam-authz-server. In other words, the data reporting feature can be enabled or disabled through a configuration file. The configuration is simple: set the analytics.enable field in iam-authz-server.yaml to true to enable data reporting, and set it to false to disable data reporting.

Here, I suggest that you consider future possible variables when designing your program and make these variables configurable. This way, if there are changes in requirements in the future, we can meet them by modifying the configuration file instead of modifying the code. This approach can limit the changes in the application program to the configuration file, greatly reducing the probability of failures in the production environment, and allowing us to shorten the release cycle by only modifying the configuration file.

In the above code, a data reporting service is created using NewAnalytics, as shown below:

func NewAnalytics(options *AnalyticsOptions, store storage.AnalyticsHandler) *Analytics {                              
    ps := options.PoolSize                                                                                             
    recordsBufferSize := options.RecordsBufferSize                                                                     
    workerBufferSize := recordsBufferSize / uint64(ps)                                                                 
    log.Debug("Analytics pool worker buffer size", log.Uint64("workerBufferSize", workerBufferSize))                   
                                                                                                                       
    recordsChan := make(chan *AnalyticsRecord, recordsBufferSize)                                                      
                                                                                                                       
    return &Analytics{                                                                                                 
        store:                      store,                                                                             
        poolSize:                   ps,                                                                                
        recordsChan:                recordsChan,                                                                       
        workerBufferSize:           workerBufferSize,                                                                  
        recordsBufferFlushInterval: options.FlushInterval,                                                             
    }                                                                                                                  
}      

In this code, an Analytics variable is created and returned based on the provided parameters. There are 5 fields in the variable that you need to pay attention to:

  • store: An interface type storage.AnalyticsHandler, which provides Connect() bool and AppendToSetPipelined(string, byte) functions for connecting to the storage and reporting data to the storage. iam-authz-server uses Redis storage.
  • recordsChan: The authorization logs are cached in the recordsChan channel with a buffered channel, and its length can be configured in the iam-authz-server.yaml configuration file through the analytics.records-buffer-size configuration.
  • poolSize: Specifies the number of workers to be opened, that is, the number of Go routines to consume messages from recordsChan.
    • workerBufferSize: The number of messages to be batch delivered to the downstream system. Batch delivery can further improve consumption capacity and reduce CPU consumption.
    • recordsBufferFlushInterval: Sets the maximum time interval to deliver once, which is the timeout for delivering data.

analytics.ecords-buffer-size and analytics.pool-size are suggested to be configured based on the CPU and memory of the deployed machine. Before the application goes live, I suggest you configure a suitable value through stress and load testing.

Analytics provides 3 methods:

  • Start(): Used to start the data reporting service.
  • Stop(): Used to stop the data reporting service. After receiving a termination command from the system, the main program calls the Stop method to gracefully shut down the data reporting service and ensure that all data in the cache is successfully reported.
  • RecordHit(record *AnalyticsRecord) error: Used to record data for AnalyticsRecord.

Return a structure of type Xxx (Analytics) through NewXxx (NewAnalytics). The Xxx (Analytics) type has several methods, as follows:

func NewAnalytics(options) *Analytics {
    ...
}

func (r *Analytics) Start() {
    ...
}
func (r *Analytics) Stop() {
    ...
}
func (r *Analytics) RecordHit(record *AnalyticsRecord) error {
    ...
}

In fact, the above code snippet is a common Go coding style/design pattern. You will encounter this design pattern frequently in your future development career. There are two benefits to using this code design pattern:

  • Modularity: The data reporting function is encapsulated into a service module, and data and methods revolve around the Xxx structure. This is similar to classes in C++, Java, and Python. Xxx can be understood as a class, NewXxx initializes a class instance, and Start, Stop, RecordHit are methods provided by the class. Modularity makes the program logic clearer, the function more independent and easier to maintain, and can also be used by other applications.
  • Convenient data transfer: Data can be stored in the fields of the Xxx structure and shared among different methods. If there is concurrency and data sharing, remember to lock non-concurrency safe types when sharing data, such as recordsChan.

Next, I will introduce three core code sections related to data reporting in the iam-authz-server service: starting the data reporting service, asynchronously reporting authorization logs, and gracefully shutting down the data reporting.

Starting the Service: Starting the Data Reporting Service #

When the service starts, the data reporting module needs to be started first. We do this by calling analyticsIns.Start() to start the data reporting service. The Start code is as follows:

func (r *Analytics) Start() {
    analytics = r
    r.store.Connect()

    // start worker pool
    atomic.SwapUint32(&r.shouldStop, 0)
    for i := 0; i < r.poolSize; i++ {
        r.poolWg.Add(1)
go r.recordWorker()
}

// stop analytics workers
go r.Stop()
}

Here, there is one point that you need to pay attention to. Both data reporting and data collection rely heavily on Go goroutines to execute operations concurrently. In order to prevent potential concurrency bugs caused by concurrent read and write, it is recommended to add -race flag when compiling your test program. For example, go build -race cmd/iam-authz-server/authzserver.go. Then, during the testing process, observe the program logs for any concurrent issues.

In the Start function, a total of poolSize worker goroutines are started, which jointly consume messages from recordsChan. The consumption logic is implemented in the recordWorker() function, as shown below:

func (r *Analytics) recordWorker() {
    defer r.poolWg.Done()

    // this is buffer to send one pipelined command to redis
    // use r.recordsBufferSize as cap to reduce slice re-allocations
    recordsBuffer := make([][]byte, 0, r.workerBufferSize)

    // read records from channel and process
    lastSentTS := time.Now()
    for {
        readyToSend := false
        select {
        case record, ok := <-r.recordsChan:
            // check if channel was closed and it is time to exit from worker
            if !ok {
                // send what is left in buffer
                r.store.AppendToSetPipelined(analyticsKeyName, recordsBuffer)
                return
            }

            // we have new record - prepare it and add to buffer

            if encoded, err := msgpack.Marshal(record); err != nil {
                log.Errorf("Error encoding analytics data: %s", err.Error())
            } else {
                recordsBuffer = append(recordsBuffer, encoded)
            }

            // identify that buffer is ready to be sent
            readyToSend = uint64(len(recordsBuffer)) == r.workerBufferSize

        case <-time.After(r.recordsBufferFlushInterval):
            // nothing was received for that period of time
            // anyways send whatever we have, don't hold data too long in buffer
            readyToSend = true
        }

        // send data to Redis and reset buffer
        if len(recordsBuffer) > 0 && (readyToSend || time.Since(lastSentTS) >= recordsBufferForcedFlushInterval) {
            r.store.AppendToSetPipelined(analyticsKeyName, recordsBuffer)
            recordsBuffer = recordsBuffer[:0]
            lastSentTS = time.Now()
        }
    }
}

The recordWorker() function saves the received authorization logs in the recordsBuffer slice. When the number of elements in the array reaches workerBufferSize, or the time since the last delivery exceeds recordsBufferFlushInterval, the data in the recordsBuffer array will be reported to the target system (Input). There are some design tricks in recordWorker() that are worth your reference:

  • Using msgpack to serialize messages: msgpack is an efficient binary serialization format. Like JSON, it allows you to exchange data between different languages. But it is faster and smaller than JSON.
  • Supporting Batch Windows: When the number of messages from the worker reaches a specified threshold, the messages will be batch-delivered to Redis. The threshold is determined by the code readyToSend = uint64(len(recordsBuffer)) == r.workerBufferSize.
  • Timeout delivery: In order to avoid the situation where the message generation is too slow, and the message cannot be delivered due to not reaching the Batch Windows, the delivery logic also supports timeout delivery, which is implemented by the code segment case <-time.After(r.recordsBufferFlushInterval).
  • Supporting graceful shutdown: When recordsChan is closed, the pending messages in recordsBuffer will be batch-delivered to Redis, and then the worker goroutine will exit.

Here’s one thing to note: After the delivery is completed, you need to reset recordsBuffer and the timer, otherwise the data may be repeatedly delivered:

recordsBuffer = recordsBuffer[:0]
lastSentTS = time.Now()

A maximum timeout recordsBufferForcedFlushInterval is also set to ensure that the messages are delivered within the maximum interval. In other words, iam-authz-server requires that the maximum delivery interval is recordsBufferForcedFlushInterval seconds, which is to prevent the configuration file from setting recordsBufferFlushInterval too large.

Running the Service: Asynchronous Reporting of Authorization Logs #

After the data reporting service is started, when authorization logs are generated, the program will automatically report the data. Next, I will explain in detail how to efficiently report the data.

iam-authz-server calls the LogGrantedAccessRequest function when the authorization is successful, and calls the LogRejectedAccessRequest function when the authorization fails. And in these two functions, the RecordHit function is called to record the authorization logs.

By calling the RecordHit(record *AnalyticsRecord) error function, iam-authz-server asynchronously caches the authorization logs. After calling RecordHit, the messages of type AnalyticsRecord are stored in the recordsChan buffered channel.

Here’s the thing: before caching, it is necessary to check whether the reporting service is in graceful shutdown. If it is in shutdown, the message will be discarded:

if atomic.LoadUint32(&r.shouldStop) > 0 {
    return nil
}

By storing the authorization logs in the recordsChan buffered channel, the LogGrantedAccessRequest and LogRejectedAccessRequest functions can return without waiting for the successful reporting of the authorization logs, resulting in almost zero performance overhead for the entire authorization request.

Stopping the Service: Graceful Shutdown of Data Reporting #

After the completion of data reporting, the next step is to gracefully stop data reporting. In order to ensure that the data and the data being delivered can be delivered to Redis when the application is stopped, iam-authz-server implements the functionality of stopping data reporting. The code is as follows:

gs.AddShutdownCallback(shutdown.ShutdownFunc(func(string) error {
    analyticsIns.Stop()
    return nil
}))

When receiving the os.Interrupt and syscall.SIGTERM system signals, analyticsIns.Stop() is called to stop the data reporting service. The Stop function stops receiving new authorization logs and waits for the ongoing data reporting to complete.

I have explained the design of the data reporting part. Next, I will introduce the design of the data collection part.

iam-pump: Data Collection #

The iam-authz-server reports data to Redis, and iam-pump consumes the data from Redis and saves it in MongoDB for persistent storage.

The key design points for iam-pump are plugin-based and configurable data processing from Redis to downstream systems, as well as graceful shutdown functionality. These are also the key points and challenges in designing a data collection program. Now, let’s take a look at how iam-pump implements a plugin-based data collection program. The design of this data collection program has been validated in the development of large-scale enterprise applications, so you can use it with confidence.

The architecture of the iam-pump data collection is shown in the following diagram:

Image

When the iam-pump service starts, the data collection functionality needs to be started. The startup code can be found in internal/pump/server.go.

Next, I will introduce the 5 core parts of the iam-pump service:

  • Data collection plugin definition.
  • Initialization of data collection plugin.
  • Health check.
  • Starting the Loop for periodic consumption of data from Redis.
  • Gracefully stopping the data collection service.

Initialization: Data Collection Plugin Definition #

The core of the data collection component design is plugin-based. Here, I abstract the systems that need to be reported as individual pumps. So, how do we define the pump interface? The interface definition needs to consider the actual collection requirements. Generally, it requires at least the following functions.

  • New: Create a pump.
  • Init: Initialize a pump, such as creating a network connection to the downstream system.
  • WriteData: Write data to the downstream system. For better performance, it is preferable to support batch writing.
  • SetFilters: Set whether to filter certain data, which is a very common requirement because not all data is needed.
  • SetTimeout: Set the timeout. In my development process, I encountered a pitfall of timeouts when connecting to Kafka, which caused the entire collection program to timeout. So, there needs to be a timeout handling mechanism here to ensure the normal operation of the collection framework.

I have previously developed a public cloud gateway service that needs to store gateway request data to MongoDB. Our gateway service encountered a big problem: some users would upload very large files through the gateway (on the order of hundreds of megabytes), and storing this data in MongoDB quickly consumed the storage space (500GB storage space). To avoid this problem, when storing the data, it is necessary to filter out some detailed data. Therefore, iam-pump added the SetOmitDetailedRecording function to filter out detailed data.

Therefore, the final plugin interface definition of iam-pump is in internal/pump/pumps/pump.go:

type Pump interface {
	GetName() string
	New() Pump
	Init(interface{}) error
	WriteData(context.Context, []interface{}) error
	SetFilters(analytics.AnalyticsFilters)
	GetFilters() analytics.AnalyticsFilters
	SetTimeout(timeout int)
	GetTimeout() int
	SetOmitDetailedRecording(bool)
	GetOmitDetailedRecording() bool
}

If you have more requirements in actual development, you can continue to add the necessary processing functions to the Pump interface definition.

Initialization: Initializing Data Collection Plugins #

Once the plugins are defined, they need to be initialized. In the initialize function, the pumps are initialized:

func (s *pumpServer) initialize() {
	pmps = make([]pumps.Pump, len(s.pumps))
	i := 0
	for key, pmp := range s.pumps {
		pumpTypeName := pmp.Type
		if pumpTypeName == "" {
			pumpTypeName = key
		}

		pmpType, err := pumps.GetPumpByName(pumpTypeName)
		if err != nil {
			log.Errorf("Pump load error (skipping): %s", err.Error())
		} else {
			pmpIns := pmpType.New()
			initErr := pmpIns.Init(pmp.Meta)
			if initErr != nil {
				log.Errorf("Pump init error (skipping): %s", initErr.Error())
			} else {
				log.Infof("Init Pump: %s", pmpIns.GetName())
				pmpIns.SetFilters(pmp.Filters)
				pmpIns.SetTimeout(pmp.Timeout)
				pmpIns.SetOmitDetailedRecording(pmp.OmitDetailedRecording)
				pmps[i] = pmpIns
			}
		}
		i++
	}
}

In the initialize function, pumps are created, initialized, and the SetFilters, SetTimeout, and SetOmitDetailedRecording functions are called to set these values for the pumps. The information about Filters, Timeout, OmitDetailedRecording, etc. is specified in the pump’s configuration file.

Here’s a tip you can also take note of: the pump configuration file supports both common and custom configurations, with the configuration structure as PumpConfig:

type PumpConfig struct {
	Type                  string
	Filters               analytics.AnalyticsFilters
	Timeout               int
	OmitDetailedRecording bool
	Meta                  map[string]interface{}
}

Custom pump configurations can be stored in the Meta variable, which is a map type. Common configurations can be shared, reducing development and maintenance effort, while custom configurations can be adapted to the different configurations of each pump.

Initialization: Health Check #

Since iam-pump is a non-API service, a health check interface is also set up to monitor its running status. The iam-pump component starts an HTTP service by calling the server.ServeHealthCheck function. Here’s the code for the ServeHealthCheck function:

func ServeHealthCheck(healthPath string, healthAddress string) {
	http.HandleFunc("/"+healthPath, func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Content-type", "application/json")
		w.WriteHeader(http.StatusOK)
		_, _ = w.Write([]byte(`{"status": "ok"}`))
	})

	if err := http.ListenAndServe(healthAddress, nil); err != nil {
		log.Fatalf("Error serving health check endpoint: %s", err.Error())
	}
}

This function starts an HTTP server that listens to the address configured by health-check-address (https://github.com/marmotedu/iam/blob/v1.0.6/configs/iam-pump.yaml#L7) and the health check path configured by health-check-path (https://github.com/marmotedu/iam/blob/v1.0.6/configs/iam-pump.yaml#L6). If the request http://<health-check-address>/<health-check-path> returns {"status": "ok"}, it means that iam-pump is working correctly.

In this case, the health check simply returns a string. In actual development, you can encapsulate more complex logic. For example, checking if the process can successfully ping the database or if the worker process is in a working state.

The default health check request address for iam-pump is http://127.0.0.1:7070/healthz.

Running the Service: Start Loop to Periodically Consume Redis Data #

After initializing the pumps, you can use the Run function to start the consumption logic. In the Run function, the data is retrieved periodically from Redis (using the polling time set by the purge-delay configuration), uncompressed using msgpack.Unmarshal, and passed to writeToPumps for processing:

func (s preparedPumpServer) Run(stopCh <-chan struct{}) error {
	ticker := time.NewTicker(time.Duration(s.secInterval) * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			analyticsValues := s.analyticsStore.GetAndDeleteSet(storage.AnalyticsKeyName)
			if len(analyticsValues) > 0 {
				// Convert to something clean
				keys := make([]interface{}, len(analyticsValues))

				for i, v := range analyticsValues {
					decoded := analytics.AnalyticsRecord{}
					err := msgpack.Unmarshal([]byte(v.(string)), &decoded)
					log.Debugf("Decoded Record: %v", decoded)
					if err != nil {
						log.Errorf("Couldn't unmarshal analytics data: %s", err.Error())
					} else {
						if s.omitDetails {
							decoded.Policies = ""
							decoded.Deciders = ""
						}
						keys[i] = interface{}(decoded)
					}
				}

				// Send to pumps
				writeToPumps(keys, s.secInterval)
			}
		// exit consumption cycle when receive SIGINT and SIGTERM signal
		case <-stopCh:
			log.Info("stop purge loop")

			return nil
		}
	}
}

The writeToPumps function asynchronously calls the WriteData function of the pump by invoking execPumpWriting. There are a couple of design considerations in the execPumpWriting function that you might find interesting:

  • Some common processing, such as Filters, Timeout, and OmitDetailedRecording, is handled outside the pump to reduce code duplication within the pump.
  • Graceful shutdown is implemented by the following code:
select {
    case <-stopCh:
        log.Info("stop purge loop")
        return
    default:
}

This code needs to be placed after writeToPumps to ensure that all data is successfully written to the pumps before stopping the collection logic.

Stopping the Service: Graceful Shutdown of Data Collection Service #

To ensure that the data being processed is successfully stored when stopping the service, a graceful shutdown feature is required. In iam-pump, SIGINT and SIGTERM signals are passed through channels. When the consumption logic receives either of these signals, it exits the consumption loop, as seen in the Run function. The code is as follows:

func (s preparedPumpServer) Run(stopCh <-chan struct{}) error {
	ticker := time.NewTicker(time.Duration(s.secInterval) * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			// Consumption logic
			...
		// exit consumption cycle when receive SIGINT and SIGTERM signal
		case <-stopCh:
			log.Info("stop purge loop")

			return nil
		}
	}
}

Summary #

In this lecture, I primarily introduced how to transform data collection requirements into a data collection model. From this model, I designed a scalable and high-performance data collection service, and implemented it using the iam-pump component.

Lastly, I would like to give you a suggestion: during development, you can also abstract certain functions into general models and implement basic frameworks (engines) for these models. Then, you can plug in customized parts as modules. Through this approach, you can design a highly extensible service that not only meets current needs but also future ones.

Homework Exercise #

  1. Take a moment to think about how to design a data reporting and data collection application. What points should be paid attention to during the design process?
  2. Practice by starting the iam-authz-server and iam-pump services, and validate the entire process.

Feel free to discuss and communicate with me in the comment section. See you in the next class.