21 Python Concurrent Programming With Futures

21 Python Concurrent Programming with Futures #

Hello, I am Jingxiao.

Concurrency programming is a commonly used and important skill in any programming language. For example, the web crawlers we discussed in the previous lesson are widely used in various industries. A large portion of the news information we obtain daily from websites and apps is obtained through concurrent programming-based crawlers.

Proper and reasonable use of concurrent programming undoubtedly brings significant performance improvements to our programs. In today’s lesson, I will guide you to learn and understand how to use concurrency programming in Python with Futures.

Distinguishing Concurrency and Parallelism #

When we learn concurrent programming, we often hear the terms concurrency and parallelism used together, leading many people to think that they have the same meaning, but in fact they do not.

First of all, you need to distinguish a misconception. In Python, concurrency does not mean that multiple operations (threads, tasks) happen at the same time. Instead, at any given moment, only one operation is allowed to occur. However, the threads/tasks can switch back and forth until they are completed. Let’s look at the following diagram:

The diagram shows different ways of switching between threads and tasks, which correspond to the two forms of concurrency in Python—threading and asyncio.

For threading, the operating system knows all the information about each thread, so it will take charge and switch threads at the appropriate time. Obviously, the advantage of this approach is that the code is easy to write because the programmer does not need to handle any switching operations. However, thread switches can also occur during the execution of a single statement (such as x += 1), which can easily lead to race conditions.

In the case of asyncio, when the main program wants to switch tasks, it must receive a notification that the task can be switched. This way, it can avoid the race condition mentioned earlier.

As for parallelism, it refers to things happening at the same time. This is the meaning of multi-processing in Python. For multi-processing, you can simply understand it like this: if your computer has a 6-core processor, when running the program, you can force Python to open 6 processes to execute simultaneously in order to speed up the execution. The schematic diagram below illustrates the principle:

In comparison,

  • Concurrency is usually applied in scenarios where I/O operations are frequent, such as downloading multiple files from a website, where the time for I/O operations may be much longer than the time for CPU processing.
  • Parallelism, on the other hand, is more suitable for CPU-heavy scenarios, such as parallel computing in MapReduce. To speed up the execution time, multiple machines or processors are generally used.

Futures in Concurrent Programming #

Performance Comparison Between Single-threaded and Multi-threaded #

Next, let’s understand Futures in concurrent programming and compare its performance with single-threaded approach through specific examples from a code perspective.

Suppose we have a task to download content from multiple websites and print it. If we use a single-threaded approach, the code will look like this (for the sake of simplicity and to focus on the topic, I have omitted exception handling here):

import requests
import time

def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content), url))
    
def download_all(sites):
    for site in sites:
        download_one(site)

def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
    
if __name__ == '__main__':
    main()

# Output
Read 129886 from https://en.wikipedia.org/wiki/Portal:Arts
Read 184343 from https://en.wikipedia.org/wiki/Portal:History
Read 224118 from https://en.wikipedia.org/wiki/Portal:Society
Read 107637 from https://en.wikipedia.org/wiki/Portal:Biography
Read 151021 from https://en.wikipedia.org/wiki/Portal:Mathematics
Read 157811 from https://en.wikipedia.org/wiki/Portal:Technology
Read 167923 from https://en.wikipedia.org/wiki/Portal:Geography
Read 93347 from https://en.wikipedia.org/wiki/Portal:Science
Read 321352 from https://en.wikipedia.org/wiki/Computer_science
Read 391905 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 321417 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 468461 from https://en.wikipedia.org/wiki/PHP
Read 180298 from https://en.wikipedia.org/wiki/Node.js
Read 56765 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 324039 from https://en.wikipedia.org/wiki/Go_(programming_language)
Download 15 sites in 2.464231112999869 seconds

This approach is the most straightforward and simplest:

  • First, iterate over a list of websites.
  • Then, perform the download operation on the current website.
  • Wait for the completion of the current operation before moving on to the next website.

We can see that it takes about 2.4 seconds in total. The advantage of single-threaded approach is its simplicity, but it is clearly inefficient because most of the time in the above program is wasted in I/O waiting. The program has to wait for the previous website to finish downloading before starting with the next one. If we have to download thousands of websites in a real production environment, it is obvious that this approach won’t work.

Now, let’s take a look at the code for the multi-threaded version:

import concurrent.futures
import requests
import threading
import time

def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content), url))


def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_one, sites)

def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
    main()

## Output
Read 151021 from https://en.wikipedia.org/wiki/Portal:Mathematics
Read 129886 from https://en.wikipedia.org/wiki/Portal:Arts
Read 107637 from https://en.wikipedia.org/wiki/Portal:Biography
Read 224118 from https://en.wikipedia.org/wiki/Portal:Society
Read 184343 from https://en.wikipedia.org/wiki/Portal:History
Read 167923 from https://en.wikipedia.org/wiki/Portal:Geography
Read 157811 from https://en.wikipedia.org/wiki/Portal:Technology
Read 91533 from https://en.wikipedia.org/wiki/Portal:Science
Read 321352 from https://en.wikipedia.org/wiki/Computer_science
Read 391905 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 180298 from https://en.wikipedia.org/wiki/Node.js
Read 56765 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 468461 from https://en.wikipedia.org/wiki/PHP
Read 321417 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 324039 from https://en.wikipedia.org/wiki/Go_(programming_language)
Download 15 sites in 0.19936635800002023 seconds

It’s obvious that the total time is around 0.2s, which is more than 10 times faster.

Now let’s take a closer look at this code, which is the main difference between the multi-threaded version and the single-threaded version:

    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_one, sites)

Here, we are creating a thread pool with a total of 5 threads that can be allocated. executor.map() is similar to the built-in map() function in Python, which concurrently calls the function download_one() for each element in sites.

By the way, in the download_one() function, the requests.get() method we use is thread-safe, so it can be safely used in a multi-threaded environment without causing race conditions.

Although the number of threads can be defined by yourself, more threads are not necessarily better, because creating, maintaining, and deleting threads also has some overhead. So if you set it too large, it may even slow down the speed. We often need to do some testing based on actual needs to find the optimal number of threads.

Of course, we can also improve the efficiency of the program by parallelizing it. You just need to make the following changes in the download_all() function:

    with futures.ThreadPoolExecutor(workers) as executor
    =>
    with futures.ProcessPoolExecutor() as executor:

In the part of the code that needs to be modified, the function ProcessPoolExecutor() means creating a process pool and using multiple processes to execute the program in parallel. However, here we usually omit the workers parameter because the system automatically returns the number of CPUs as the number of processes that can be called.

I just mentioned that parallelization is generally used in CPU-intensive scenarios, because for I/O-intensive operations, most of the time is spent waiting, and using multiple processes instead of multiple threads does not improve efficiency. On the contrary, many times, due to the limitation of the number of CPUs, the execution efficiency is not as good as the multi-threaded version.

What exactly are Futures? #

The Futures module in Python, located in concurrent.futures and asyncio, represents operations with a delay. Futures wrap pending operations and put them into a queue, and the status of these operations can be queried at any time. Of course, their results or exceptions can also be obtained after the operations are completed.

Usually, as users, we don’t need to worry about how to create Futures, as they are handled for us at the lower level. What we need to do is actually schedule the execution of these Futures.

For example, in the Futures module, when we execute executor.submit(func), it will arrange for the func() function inside to be executed and return a created future instance for you to query and call later.

Here are some commonly used functions. The done() method in Futures indicates whether the corresponding operation is completed-True means completed and False means not completed. However, note that done() is non-blocking and returns the result immediately. The corresponding add_done_callback(fn) function indicates that when the Futures are completed, the parameter function fn will be notified and executed.

Futures also has an important function called result(), which indicates that when the future is completed, it returns its corresponding result or exception. as_completed(fs) is used to iterate over the given future iterator fs and return the completed iterator after completion.

So, the above example can also be written in the following form:

import concurrent.futures
import requests
import time

def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content), url))

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_one, sites)
import concurrent.futures
import time

def download_one(site):
    # 模拟下载网页内容
    time.sleep(1)
    print("Read {} from {}".format(len(site), site))

def download_all(sites):
    to_do = []
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for site in sites:
            future = executor.submit(download_one, site)
            to_do.append(future)
                
        for future in concurrent.futures.as_completed(to_do):
            future.result()

def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
    
if __name__ == '__main__':
    main()

Here, we first call executor.submit() to put the content of each website into the future queue to_do for execution. Then we use the as_completed() function to output the result after each future is completed.

However, it is important to note that the order in which each future in the future list is completed may not exactly match the order in the list. Which future is completed first and which one is completed later depends on the system’s scheduling and the execution time of each future.

Why can only one thread be executed at a time in multithreading? #

As I mentioned earlier, at any given time, only one thread is allowed to be executed in the Python main program, so Python’s concurrency is achieved through the switching of multiple threads. You may wonder why this is the case.

Here I briefly mention the concept of the Global Interpreter Lock (GIL). The Python interpreter is not thread-safe itself. To solve the race condition and other problems caused by this, Python introduces the GIL, which allows only one thread to be executed at a time. However, when performing I/O operations, if one thread is blocked, the GIL will be released, allowing another thread to continue execution.

Summary #

In this lesson, we first learned about the concepts and differences between concurrency and parallelism in Python.

  • Concurrency is achieved by switching between threads and tasks, but at any given time, only one thread or task is allowed to execute.
  • Parallelism, on the other hand, refers to multiple processes executing simultaneously.

Concurrency is usually used in scenarios with frequent I/O operations, while parallelism is suitable for CPU-heavy scenarios.

Next, we compared the performance differences between a single-threaded version and a multi-threaded version using Futures, through an example of downloading website content. It is evident that using multi-threading in a rational manner can greatly improve program execution efficiency.

We also learned about the principles of Futures and introduced the usage of some commonly used functions such as done(), result(), as_completed(), and so on, with examples to facilitate understanding.

It is important to note that Python only allows one thread to run at a time due to the Global Interpreter Lock (GIL). However, for I/O operations, when they are blocked, the GIL will be released, allowing other threads to continue executing.

Reflection Question #

Finally, here’s a reflection question for you. Can you make the program more stable and robust by adding reasonable exception handling to the example of downloading website content that we discussed today? Please feel free to write down your thoughts and answers in the comments section. You are also welcome to share today’s content with your colleagues and friends. Let’s communicate and improve together.