51 How to Use Completable Future to Solve Travel Platform Problems

51 How to Use CompletableFuture to Solve Travel Platform Problems #

In this lesson, we will mainly explain how to use CompletableFuture to implement the “Tourist Platform” problem.

The Tourist Platform Problem #

What is the Tourist Platform problem? If you want to build a tourist platform, there is often a requirement that users want to simultaneously access flight information from multiple airlines. For example, how much is a plane ticket from Beijing to Shanghai? Many airlines have such flight information, so all the flight information, ticket prices, and other information from all airlines should be obtained and then aggregated. Since each airline has its own server, you can make separate requests to their servers, such as requesting Air China, Hainan Airlines, China Eastern Airlines, etc., as shown in the following figure:

img

Serial #

One primitive way to solve this problem is to use a serial approach.

img

For example, if we want to get the price, we have to first visit Air China, which is called website 1 here, and then visit Hainan Airlines website 2, and so on. After each request is sent out and the response is received, we can then request the next website. This is the serial approach.

This approach is very inefficient. For example, if there are many airlines and each airline takes 1 second, the user will certainly not wait, so this approach is not feasible.

Parallel #

Next, let’s improve our thinking. The main idea is to change serial into parallel, as shown in the following figure:

img

We can obtain this flight information in parallel and then aggregate the flight information. This way, the efficiency will be greatly improved.

Although this parallel approach improves efficiency, it also has a downside, which is that it “waits until all requests are returned”. If a website is very slow, you should not be delayed by that website. For example, if a website takes 20 seconds to open, you definitely cannot wait that long, so we need a feature, which is to have a timeout for retrieval.

Parallel Retrieval with Timeout #

Now let’s take a look at the case of parallel retrieval with a timeout.

img

In this case, it belongs to parallel retrieval with a timeout, and we still parallelly request information from various websites. But we set a timeout, such as 3 seconds. If all requests have returned within 3 seconds, that’s great, we can collect them; but if some websites have not returned in time, we can ignore these requests. This way, the user experience will be better, and they need to wait for only a fixed 3 seconds to get the information, although what they get may not be the most complete, it’s still better than waiting indefinitely.

There are several implementation solutions to achieve this goal, let’s take a look at them one by one.

Implementation with Thread Pool #

The first implementation solution is to use a thread pool, let’s take a look at the code.

public class ThreadPoolDemo {

ExecutorService threadPool = Executors.newFixedThreadPool(3);

public static void main(String[] args) throws InterruptedException {

    ThreadPoolDemo threadPoolDemo = new ThreadPoolDemo();

    System.out.println(threadPoolDemo.getPrices());

}

private Set<Integer> getPrices() throws InterruptedException {

    Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());

    threadPool.submit(new Task(123, prices));

    threadPool.submit(new Task(456, prices));

    threadPool.submit(new Task(789, prices));

    Thread.sleep(3000);

    return prices;

}

private class Task implements Runnable {

    Integer productId;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CountDownLatchDemo {

    ExecutorService threadPool = Executors.newFixedThreadPool(3);

    public static void main(String[] args) throws InterruptedException {

        CountDownLatchDemo countDownLatchDemo = new CountDownLatchDemo();

        System.out.println(countDownLatchDemo.getPrices());

    }

    private Set<Integer> getPrices() throws InterruptedException {

        Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());

        CountDownLatch countDownLatch = new CountDownLatch(3);

        threadPool.submit(new Task(123, prices, countDownLatch));

        threadPool.submit(new Task(456, prices, countDownLatch));

        threadPool.submit(new Task(789, prices, countDownLatch));

        countDownLatch.await(3, TimeUnit.SECONDS);

In the code, a thread-safe Set named prices is created to store various price information. It is then named Prices. Tasks are placed in the thread pool. The thread pool is created at the beginning of the class and consists of three threads. The description of the Task is provided in the Task class below. In the Task class, there is a run method where we use a random time to simulate the response time of various airline websites, and then return a random price to represent the ticket price. Finally, this price is added to the Set. This is what the run method does.

Returning to the getPrices function, we create three tasks with product IDs 123, 456, and 789 respectively. The product ID is not important here because the prices we return are random. To implement the timeout waiting function, we call the sleep method of the Thread to sleep for 3 seconds. This way, it will wait for 3 seconds here and then directly return prices.

At this point, if the response speed is fast, there will be up to three values in prices, but if each response time is slow, there may not be a single value in prices. Regardless of how many values you have, it will return prices directly after the sleep is over, that is, after executing Thread.sleep, and ultimately print the result in the main function.

Let’s take a look at the possible execution results. One possibility is to have three values, namely [3815, 3609, 3819] (the numbers are random). It could be 1 value [3496] or 2 values [1701, 2730]. If each response speed is very slow, there may not be a single value.

This is the most basic solution implemented using a thread pool.

CountDownLatch #

There is room for improvement here. For example, when the network is very good, each airline company responds very quickly, and you don’t need to wait for three seconds at all. Some airline companies may return in a few hundred milliseconds, so you shouldn’t make the user wait for 3 seconds. So we need to make the following improvement, as shown in the code below:

```java
    return prices;
    
}

private class Task implements Runnable {

    Integer productId;

    Set<Integer> prices;

    CountDownLatch countDownLatch;

    public Task(Integer productId, Set<Integer> prices,

                CountDownLatch countDownLatch) {

        this.productId = productId;

        this.prices = prices;

        this.countDownLatch = countDownLatch;

    }

    @Override

    public void run() {

        int price = 0;

        try {

            Thread.sleep((long) (Math.random() * 4000));

            price = (int) (Math.random() * 4000);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        prices.add(price);

        countDownLatch.countDown();

    }

}

}
此代码使用CountDownLatch实现了这个功能,整体思路和之前是一致的,不同点在于我们新增了一个CountDownLatch,并且把它传入到了Task中。在Task中,获取完机票信息并且把它添加到Set之后,会调用countDown方法,相当于把计数减1。

这样一来,在执行countDownLatch.await(3, TimeUnit.SECONDS)这个函数进行等待时,如果三个任务都非常快速地执行完毕了,那么三个线程都已经执行了countDown方法,那么这个await方法就会立刻返回,不需要傻等到3秒钟。

如果有一个请求特别慢,相当于有一个线程没有执行countDown方法,来不及在3秒钟之内执行完毕,那么这个带超时参数的await方法也会在3秒钟到了以后,及时地放弃这一次等待,于是就把prices给返回了。所以这样一来,我们就利用CountDownLatch实现了这个需求,也就是说我们最多等3秒钟,但如果在3秒之内全都返回了,我们也可以快速地去返回,不会傻等,提高了效率。

### CompletableFuture

我们再来看一下用CompletableFuture来实现这个功能的用法,代码如下所示:

```java
public class CompletableFutureDemo {

    public static void main(String[] args)

            throws Exception {

        CompletableFutureDemo completableFutureDemo = new CompletableFutureDemo();

        System.out.println(completableFutureDemo.getPrices());

    }

    private Set<Integer> getPrices() {

        Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());

        CompletableFuture<Void> task1 = CompletableFuture.runAsync(new Task(123, prices));

        CompletableFuture<Void> task2 = CompletableFuture.runAsync(new Task(456, prices));

        CompletableFuture<Void> task3 = CompletableFuture.runAsync(new Task(789, prices));

        CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);

        try {

            allTasks.get(3, TimeUnit.SECONDS);

        } catch (InterruptedException e) {

        } catch (ExecutionException e) {

        } catch (TimeoutException e) {

        }

        return prices;

    }

    private class Task implements Runnable {

        Integer productId;

        Set<Integer> prices;

        public Task(Integer productId, Set<Integer> prices) {

            this.productId = productId;

            this.prices = prices;

        }

        @Override

        public void run() {

            int price = 0;

            try {

                Thread.sleep((long) (Math.random() * 4000));

                price = (int) (Math.random() * 4000);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            prices.add(price);

        }

    }

}

这里我们不再使用线程池了,我们看到getPrices方法,在这个方法中,我们用了CompletableFuture的runAsync方法,这个方法会异步的去执行任务。

我们有三个任务,并且在执行这个代码之后会分别返回一个CompletableFuture对象,我们把它们命名为task 1、task 2、task 3,然后执行CompletableFuture的allOf方法,并且把task 1、task 2、task 3传入。这个方法的作用是把多个task汇总,然后可以根据需要去获取到传入参数的这些task的返回结果,或者等待它们都执行完毕等。我们就把这个返回值叫作allTasks,并且在下面调用它的带超时时间的get方法,同时传入3秒钟的超时参数。

这样一来它的效果就是,如果在3秒钟之内这3个任务都可以顺利返回,也就是这个任务包括的那三个任务,每一个都执行完毕的话,则这个get方法就可以及时正常返回,并且往下执行,相当于执行到return prices。在下面的这个Task的run方法中,该方法如果执行完毕的话,对于CompletableFuture而言就意味着这个任务结束,它是以这个作为标记来判断任务是不是执行完毕的。但是如果有某一个任务没能来得及在3秒钟之内返回,那么这个带超时参数的get方法便会抛出TimeoutException异常,同样会被我们给catch住。这样一来它就实现了这样的效果:会尝试等待所有的任务完成,但是最多只会等3秒钟,在此之间,如及时完成则及时返回。那么所以我们利用CompletableFuture,同样也可以解决旅游平台的问题。它的运行结果也和之前是一样的,有多种可能性。

最后做一下总结。在本课时中,我们先给出了一个旅游平台问题,它需要获取各航空公司的机票信息,随后进行了代码演进,从串行到并行,再到有超时的并行,最后到不仅有超时的并行,而且如果大家速度都很快,那么也不需要一直等到超时时间到,我们进行了这样的一步一步的迭代。

当然除了这几种实现方案之外,还会有其他的实现方案,你能想到哪些实现方案呢?不妨在下方留言告诉我,谢谢。