06 How Stream Can Improve the Efficiency of Traversing Collections

06 How Stream can improve the efficiency of traversing collections #

Hello, I am Liu Chao.

In the previous lesson, I talked about the List collection class, so I think you must also be familiar with the top-level interface of collections, Collection. In Java 8, Collection introduced two new methods, namely Stream() and parallelStream().

It is not difficult to guess from their English names that these two methods are definitely related to Stream. Furthermore, can we speculate that they are also related to our familiar InputStream and OutputStream? What are the new Stream methods in the collection class used for? Today, let’s delve deeper into understanding Streams.

What is Stream? #

Many large-scale systems today use sharding and partitioning.

For example, in an e-commerce system, the order table often uses the Hash value of the user ID to achieve sharding and partitioning. This is done to reduce the amount of data in a single table and optimize the speed of querying orders for users.

However, when backend administrators review orders, they need to query data from various sources and then perform merge operations at the application layer.

For example, when we need to query all orders under certain filtering conditions and sort them according to a certain condition, the data retrieved from a single data source can be sorted according to a certain condition. However, the data retrieved and sorted from multiple data sources does not necessarily represent the correct sorting after merge. Therefore, we need to re-sort the merged data collection at the application layer.

Before Java 8, we typically used for loops or Iterator iterations to re-sort and merge data, or redefine the Comparator method of Collections.sorts to achieve this. These two methods are not very efficient for large-scale systems.

Java 8 added a new interface class called Stream, which is different from the byte stream concept we previously encountered. The Stream in Java 8 collections is equivalent to an advanced version of Iterator. It can perform various convenient and efficient aggregation operations (Aggregate Operation) or bulk data operations (Bulk Data Operation) on collections using Lambda expressions.

Stream’s aggregation operations, such as sorted, filter, and map, are similar to the aggregate operations in database SQL. We can efficiently implement aggregate operations similar to database SQL at the application layer. In terms of data operations, Stream can not only perform data operations in a serial manner, but also process large amounts of data in a parallel manner, improving the efficiency of data processing.

Next, let’s experience the conciseness and power of Stream with a simple example.

The requirement of this demo is to filter male and female students in a certain high school whose height is above 160cm. Let’s first implement it using traditional iteration, the code is as follows:

Map<String, List<Student>> stuMap = new HashMap<String, List<Student>>();
for (Student stu: studentsList) {
    if (stu.getHeight() > 160) { // If height is greater than 160
        if (stuMap.get(stu.getSex()) == null) { // The gender is not categorized yet
            List<Student> list = new ArrayList<Student>(); // Create a new list for this gender
            list.add(stu); // Add the student to the list
            stuMap.put(stu.getSex(), list); // Put the list into the map
        } else { // The gender category already exists
            stuMap.get(stu.getSex()).add(stu); // The gender category already exists, just add it directly
        }
    }
}

Now let’s implement it using the Stream API in Java 8:

  1. Serial implementation
Map<String, List<Student>> stuMap = stuList.stream().filter((Student s) -> s.getHeight() > 160).collect(Collectors.groupingBy(Student::getSex));
  1. Parallel implementation
Map<String, List<Student>> stuMap = stuList.parallelStream().filter((Student s) -> s.getHeight() > 160).collect(Collectors.groupingBy(Student::getSex));

From the above two simple examples, we can see that Stream combined with Lambda expressions makes it very concise and convenient to implement traversal and filtering functions.

How to optimize Stream traversal? #

Above, we have a preliminary understanding of the Stream API in Java 8. But how does Stream optimize iteration? And how is parallel processing implemented? Below, we will analyze the implementation principle of Stream through the Stream source code.

1. Stream operation classification #

Before understanding the implementation principle of Stream, let’s first understand the classification of Stream operations, because the classification of operations is actually one of the important reasons for efficient iteration of large data sets. You will understand why after analyzing it.

The official categorizes the operations in Stream into two categories: intermediate operations and terminal operations. Intermediate operations only record the operations and only return a Stream without performing any calculations, while terminal operations actually perform the calculations.

Intermediate operations can be further classified into stateless operations and stateful operations. Stateless operations refer to operations where the processing of elements is not affected by the previous elements, while stateful operations can only continue after all elements are obtained.

Terminal operations can be further classified into short-circuiting operations and non-short-circuiting operations. Short-circuiting operations refer to operations where the final result can be obtained when certain elements that meet the conditions are encountered, while non-short-circuiting operations require all elements to be processed before the final result can be obtained. The details of the operation classification are shown in the following figure:

img

We usually also call intermediate operations lazy operations. It is precisely through this lazy operation combined with terminal operations and data sources that the processing pipeline (Pipeline) of Stream is efficient.

2. Stream source code implementation #

Before we understand how Stream works, let’s first understand which main structural classes compose the Stream package and what are the responsibilities of each class. Refer to the following figure:

img

BaseStream and Stream are the top-level interface classes. BaseStream primarily defines the basic interface methods of the stream, such as spliterator, isParallel, etc.; Stream defines some commonly used operation methods of the stream, such as map, filter, etc.

ReferencePipeline is a structural class that assembles various operation streams by defining inner classes. It defines three inner classes: Head, StatelessOp, and StatefulOp, and implements the interface methods of BaseStream and Stream.

The Sink interface defines the protocol for the relationship between each Stream operation, which includes four methods: begin(), end(), cancellationRequested(), and accept(). ReferencePipeline will ultimately assemble the entire Stream operation into a call chain, and the relationship between the Stream operations on this call chain is defined by the Sink interface protocol.

3. Stream operation stacking #

As we know, the various operations of a Stream are assembled by the processing pipeline and collectively complete the data processing. In the JDK, each interruption operation is named using a stage name.

The pipeline structure is usually implemented by the ReferencePipeline class. When I explained the Stream package structure earlier, I mentioned that the ReferencePipeline class includes three inner classes: Head, StatelessOp, and StatefulOp.

The Head class is mainly used to define data source operations. When we call the names.stream() method for the first time, the Head object will be loaded for the first time, and this is the loading of the data source operation; followed by the loading of intermediate operations, namely, the stateless intermediate operation StatelessOp object and the stateful operation StatefulOp object. At this time, the Stage is not executed, but a linked list of intermediate operation stages is created through AbstractPipeline; when we call the terminal operation, a final Stage is generated, and the previous intermediate operations are triggered through this Stage, starting from the last Stage, recursively generating a Sink chain. As shown in the figure below:

img

Now let’s use an example to feel how the operation classification of Stream achieves efficient iteration of large data sets.

List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七");
 
String maxLenStartWithZ = names.stream()
    	            .filter(name -> name.startsWith("张"))
    	            .mapToInt(String::length)
    	            .max()
    	            .toString();

The requirement of this example is to find the longest name starting with “张”. From the perspective of the code, you might think that the operation flow is like this: first iterate through the collection to get all names starting with “张”; then iterate through the filtered collection to convert the names to numbers representing their lengths; finally, find the longest name from the length collection and return it.

I want to tell you very clearly that the actual situation is not like this. Let’s analyze how all the operations in this method are executed step by step.

First, because names is an ArrayList, the names.stream() method will call the Stream method of the base interface Collection:

    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

Then, the Stream method will call the Stream method of the StreamSupport class, and an internal class Head of ReferencePipeline will be initialized in the method:

 public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

Then, the filter and map methods are called. These two methods are both stateless intermediate operations, so when filter and map operations are performed, no actual operation is performed. Instead, a Stage is created to represent each operation of the user.

Usually, the operation of Stream requires a callback function, so a complete Stage is represented by a triple composed of data source, operation, and callback function. The following figure shows the filter and map methods of ReferencePipeline:

 @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }
 
                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }
   @Override
    @SuppressWarnings("unchecked")
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {    
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

The new StatelessOp will call the constructor of the parent class AbstractPipeline, which connects the previous and next stages to generate a Stage linked list:

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
            if (previousStage.linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            previousStage.linkedOrConsumed = true;
            previousStage.nextStage = this; // Set the next pointer of the current stage to the previous stage
     
            this.previousStage = previousStage; // Assign the current stage as the global variable previousStage
            this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
            this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
            this.sourceStage = previousStage.sourceStage;
            if (opIsStateful())
                sourceStage.sourceAnyStateful = true;
            this.depth = previousStage.depth + 1;
        }

When creating each stage, the opWrapSink() method is included. This method wraps a specific implementation of an operation in the Sink class, which uses a (handle -> forward) pattern to stack operations.

When the max method is called, the max method of ReferencePipeline is called. At this time, since the max method is a terminal operation, a TerminalOp operation will be created, and a ReducingSink and the operation will be wrapped in the Sink class.

Finally, the wrapSink method of AbstractPipeline is called. This method calls opWrapSink to generate a Sink linked list, where each Sink in the Sink linked list wraps the specific implementation of an operation.

After the Sink linked list is generated, the Stream starts to execute. It iterates over the collection through the spliterator and executes the specific operations in the Sink linked list.

4. Stream Parallel Processing #

There are two ways in which Stream processes data: serial processing and parallel processing. To achieve parallel processing, we need to add a Parallel() method to the code in the example:

List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七");

String maxLenStartWithZ = names.stream()
                .parallel()
                .filter(name -> name.startsWith("张"))
                .mapToInt(String::length)
                .max()
                .toString();

The parallel processing of Stream is the same as the serial processing implementation before calling the terminal operation. After calling the terminal method, the implementation is a bit different. The evaluateParallel method of TerminalOp is called to perform parallel processing.

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
            assert getOutputShape() == terminalOp.inputShape();
            if (linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            linkedOrConsumed = true;
     
            return isParallel()
                   ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
                   : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
        }

In parallel processing, Stream is combined with the ForkJoin framework to divide the Stream processing into segments. The estimateSize method in Spliterator estimates the size of the segments.

I will not go into detail about the ForkJoin framework and the estimation algorithm here. If you are interested, you can analyze the implementation of this algorithm in more detail.

The minimum processing unit threshold is obtained based on the estimated data size. If the current segment size is greater than the threshold, the collection will continue to be divided. Each segment will generate a Sink linked list. After all the segment operations are completed, the ForkJoin framework will merge the results of the segments.

Using Stream properly #

By now, you should have a clear understanding of how the Stream API optimizes collection traversal. Using the Stream API is concise and can be processed in parallel, which may improve system performance. However, can using the Stream API really make the system perform better? Let’s find out through a series of tests.

We will compare the performance of regular iteration, Stream serial iteration, and Stream parallel iteration, with data filtering and grouping operations performed during the iteration loop. We will perform the following tests:

  • Comparing the performance of an integer array with a length of 100 on a multi-core CPU server configuration.
  • Comparing the performance of an integer array with a length of 1.00E+8 (100 million) on a multi-core CPU server configuration.
  • Comparing the performance of filtering and grouping an object array with a length of 1.00E+8 on a multi-core CPU server configuration.
  • Comparing the performance of filtering and grouping an object array with a length of 1.00E+8 on a single-core CPU server configuration.

Due to limited space, I will directly provide the statistical results here. You can also verify the specific test code on Github. Based on the tests, the time taken for each iteration method is as follows:

  • Regular iteration
  • Stream parallel iteration < Regular iteration
  • Stream parallel iteration < Regular iteration
  • Regular iteration

From the above test results, we can see that in cases where the number of loop iterations is small, the regular iteration method actually performs better. In a single-core CPU server configuration, the regular iteration method also has an advantage. However, when iterating over large amounts of data and the server has a multi-core CPU, Stream’s parallel iteration performs significantly better. Therefore, when dealing with large data sets in our daily work, we should try to deploy the application in a multi-core CPU environment and use Stream’s parallel iteration for processing.

Let’s look at the facts. As we can see, using Stream does not necessarily result in better system performance. We still need to choose based on the specific application scenario, which means using Stream properly.

Summary #

Looking at the design and implementation of Stream, it is worth learning from. In terms of the overall design direction, Stream decomposes the entire operation into a chain structure, which not only simplifies traversal operations but also lays the foundation for parallel computing.

In terms of the smaller categorization direction, Stream divides the operations of traversing elements and calculating elements into intermediate operations and terminal operations. The intermediate operations are further divided into stateful and stateless operations based on whether there is interference between elements, which implements different stages in the chain structure.

In serial processing operations, Stream does not actually process data in each intermediate operation, but rather concatenates these intermediate operations together. The final data processing chain is triggered by the terminal operation and is processed through the Spliterator iterator in Java 8. In this case, each iteration performs data processing for all stateless intermediate operations, while stateful intermediate operations require iterating through all the data before performing the processing operation. Lastly, the terminal operation is performed for data processing.

In parallel processing operations, Stream is mostly the same as serial processing in terms of intermediate operations. However, in the terminal operation, Stream combines the use of the ForkJoin framework to slice the collection. The ForkJoin framework merges the processing results of each slice together. Lastly, it is important to pay attention to the use cases of Stream.

Thought Question #

Here is a simple case of parallel processing. Please identify the issues with it.

// Use a container to store 100 numbers, and use Stream for parallel processing to transfer odd numbers from the container to the parallelList container
List<Integer> integerList = new ArrayList<Integer>();

for (int i = 0; i < 100; i++) {
    integerList.add(i);
}

List<Integer> parallelList = new ArrayList<Integer>();
integerList.stream()
           .parallel()
           .filter(i -> i%2 == 1)
           .forEach(i -> parallelList.add(i));

Answer: The code above has a potential problem with thread safety because multiple threads are modifying the same parallelList at the same time during the parallel processing. ArrayList is not thread-safe, and this can lead to race conditions and incorrect results. To fix this, you can use a thread-safe collection like CopyOnWriteArrayList or use proper synchronization mechanisms.