22 Concurrent Programming With Asyncio

22 Concurrent Programming with Asyncio #

Hello, I’m Jingxiao.

In the previous lesson, we learned about one implementation of concurrent programming in Python - multithreading. In today’s lesson, we will continue learning about another implementation of concurrent programming in Python - Asyncio. Unlike the chapter on coroutines, in this lesson, we will focus more on understanding the principles.

From the previous lesson, we learned that when dealing with I/O operations, the efficiency of using multithreading is greatly improved compared to using a single thread. You may wonder, if that’s the case, why do we still need Asyncio?

Indeed, multithreading has many advantages and is widely used, but it also has certain limitations:

  • For example, the execution of multiple threads can be easily interrupted, leading to potential race condition situations.
  • Additionally, thread switching itself has certain overhead, and the number of threads cannot increase infinitely. Therefore, if your I/O operations are heavy, multithreading may not be able to meet the requirements for high efficiency and high quality.

It is precisely to address these issues that Asyncio comes into play.

What is Asyncio #

Sync VS Async #

Let’s first differentiate between Sync and Async.

  • Sync refers to executing operations one by one, where the next operation can only be executed after the previous operation is completed.
  • Async, on the other hand, refers to the ability to execute different operations interchangeably. If one operation is blocked, the program does not wait and instead continues executing other available operations.

To illustrate with a simple example, imagine your boss asks you to create a quarterly report and email it to them.

  • If you approach it synchronously, you would first input the data for the quarter into the software, then wait for 5 minutes. After the detailed report is generated, you would write the email and send it.
  • However, if you approach it asynchronously, after inputting the data for the quarter, you would start writing the email. When the detailed report is generated, you would pause the email, review the report, and then continue writing the email until it is sent.

Asyncio Working Principle #

Now that we understand Sync and Async, let’s get back to our main topic - what exactly is Asyncio?

Asyncio, like other Python programs, is single-threaded. It only has one main thread, but it can perform multiple tasks, which are special objects called futures. These different tasks are controlled by an object called the event loop. You can think of these tasks as multiple threads in a multi-threaded version.

To simplify the explanation, let’s assume that tasks have only two states: ready and waiting. The ready state means that a task is currently idle but ready to run at any time. The waiting state means that a task is currently running but waiting for an external operation to complete, such as an I/O operation.

In this scenario, the event loop maintains two task lists corresponding to these two states. It selects a ready task (the specific task selection depends on factors such as waiting time, resource usage, etc.) to run until the task returns control to the event loop.

When a task returns control to the event loop, the event loop places the task in the ready or waiting state list based on whether it has completed or not.

  • If the task is completed, it is placed in the ready state list.
  • If the task is not completed, it continues to be placed in the waiting state list.

The tasks in the ready state list remain in their respective positions because they have not been executed yet.

This cycle continues, with the event loop selecting a task from the ready state list and executing it until all tasks are completed.

It is worth mentioning that for Asyncio, the tasks are not interrupted by external factors during execution. Therefore, there is no race condition in Asyncio operations, and you don’t have to worry about thread safety issues.

Asyncio Usage #

Now that we have covered the principles of Asyncio, let’s look at its usage through specific code examples. Continuing with the topic of downloading website content from the previous lesson, I have provided the code below with Asyncio implementation (excluding some exception handling operations). Let’s take a look together:

import asyncio
import aiohttp
import time

async def download_one(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print('Read {} from {}'.format(resp.content_length, url))

async def download_all(sites):
    tasks = [asyncio.create_task(download_one(site)) for site in sites]
    await asyncio.gather(*tasks)

def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
'https://en.wikipedia.org/wiki/Portal:Biography',
'https://en.wikipedia.org/wiki/Portal:Mathematics',
'https://en.wikipedia.org/wiki/Portal:Technology',
'https://en.wikipedia.org/wiki/Portal:Geography',
'https://en.wikipedia.org/wiki/Portal:Science',
'https://en.wikipedia.org/wiki/Computer_science',
'https://en.wikipedia.org/wiki/Python_(programming_language)',
'https://en.wikipedia.org/wiki/Java_(programming_language)',
'https://en.wikipedia.org/wiki/PHP',
'https://en.wikipedia.org/wiki/Node.js',
'https://en.wikipedia.org/wiki/The_C_Programming_Language',
'https://en.wikipedia.org/wiki/Go_(programming_language)'
]
start_time = time.perf_counter()
asyncio.run(download_all(sites))
end_time = time.perf_counter()
print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
    main()

## Output
Read 63153 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 31461 from https://en.wikipedia.org/wiki/Portal:Society
Read 23965 from https://en.wikipedia.org/wiki/Portal:Biography
Read 36312 from https://en.wikipedia.org/wiki/Portal:History
Read 25203 from https://en.wikipedia.org/wiki/Portal:Arts
Read 15160 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 28749 from https://en.wikipedia.org/wiki/Portal:Mathematics
Read 29587 from https://en.wikipedia.org/wiki/Portal:Technology
Read 79318 from https://en.wikipedia.org/wiki/PHP
Read 30298 from https://en.wikipedia.org/wiki/Portal:Geography
Read 73914 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 62218 from https://en.wikipedia.org/wiki/Go_(programming_language)
Read 22318 from https://en.wikipedia.org/wiki/Portal:Science
Read 36800 from https://en.wikipedia.org/wiki/Node.js
Read 67028 from https://en.wikipedia.org/wiki/Computer_science
Download 15 sites in 0.062144195078872144 seconds

Here, the async and await keywords are the latest syntax of Asyncio. They indicate that this statement/function is non-blocking, which corresponds to the concept of the event loop mentioned earlier. If the execution of a task requires waiting, it is put into a waiting state list, and then the tasks in the ready state list continue to execute.

The asyncio.run(coro) in the main function is the root call of Asyncio, indicating to get the event loop, run the input coro until it is finished, and finally close the event loop. In fact, asyncio.run() was introduced in Python 3.7+. For older versions, you can use the following statement as an equivalent replacement:

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(coro)
finally:
    loop.close()

As for the Asyncio version of the function download_all(), it is significantly different from the previous multithreaded version:

tasks = [asyncio.create_task(download_one(site)) for site in sites]
await asyncio.gather(*tasks)

Here, asyncio.create_task(coro) creates a task for the input coroutine coro, schedules its execution, and returns the task object. This function was added in Python 3.7+. For older versions, you can use asyncio.ensure_future(coro) as an equivalent replacement. As you can see, a corresponding task is created for each website download task.

Furthermore, asyncio.gather(*aws, loop=None, return_exception=False) runs all the tasks in the aws sequence in the event loop. In addition to the functions used in the example, Asyncio provides many other usages. You can refer to the documentation for more information.

Finally, let’s take a look at the final output result - it only took 0.06 seconds, which is more efficient than the previous multithreaded version and demonstrates its advantage.

Are there any flaws with Asyncio? #

After learning so much about Asyncio, we realize its power. However, you should be aware that no solution is perfect, and there are limitations to every approach, including Asyncio.

In practical work, to effectively use Asyncio and make the most of its powerful features, you often need support from corresponding Python libraries. You may have noticed that in the previous lesson on multithreaded programming, we used the requests library. But today, we didn’t use it and instead used the aiohttp library. The reason is that the requests library is not compatible with Asyncio, while aiohttp library is.

Compatibility issues with the Asyncio software library have been a big problem in the early stages of Python 3. However, with the advancement of technology, this problem is gradually being resolved.

Furthermore, when using Asyncio, because you have greater autonomy in task scheduling, you need to be more careful when writing code to avoid mistakes.

For example, if you need to await a series of operations, you will use asyncio.gather(). If it’s just a single future, asyncio.wait() might suffice. So, for your future, do you want it to run_until_complete() or run_forever()? These are the considerations you need to think about when facing specific problems.

Multithreading or Asyncio #

Unconsciously, we have already learned both approaches to concurrent programming. However, when faced with practical problems, how do we choose between multithreading and Asyncio?

In general, you can follow the following pseudo code guidelines:

if io_bound:
    if io_slow:
        print('Use Asyncio')
    else:
        print('Use multithreading')
elif cpu_bound:
    print('Use multiprocessing')
  • If it is I/O bound and the I/O operations are slow, requiring many tasks/threads to collaborate, then Asyncio is more suitable.
  • If it is I/O bound, but the I/O operations are fast and only a limited number of tasks/threads are required, then multithreading is sufficient.
  • If it is CPU bound, multiprocessing should be used to improve program performance.

Summary #

In today’s class, we learned about the principles and usage of Asyncio, and compared the advantages and disadvantages of Asyncio and multithreading.

Unlike multithreading, Asyncio is single-threaded, but its internal event loop mechanism allows it to run multiple different tasks concurrently, and it has more autonomous control than multithreading.

Tasks in Asyncio are not interrupted during execution, so race conditions do not occur. Especially in I/O-intensive scenarios, Asyncio is more efficient than multithreading. This is because the cost of task switching in Asyncio is much smaller than the cost of thread switching, and Asyncio can handle many more tasks than the number of threads in multithreading.

However, it should be noted that in many cases, using Asyncio requires the support of specific third-party libraries, such as aiohttp in the previous example. And if the I/O operations are fast and not heavy, using multithreading can also effectively solve the problem.

Thought Exercise #

In the last two lessons, we learned two ways to implement concurrent programming and also mentioned parallel programming, which is suitable for CPU-heavy scenarios.

Now we have the following requirement: given a list, for each element in the list, I want to calculate the sum of squares for all integers from 0 to that element.

I have written a conventional version of the solution below. Could you write a multiprocessing version of it by referring to the documentation and compare the runtime of the two versions?

import time
def cpu_bound(number):
    print(sum(i * i for i in range(number)))

def calculate_sums(numbers):
    for number in numbers:
        cpu_bound(number)

def main():
    start_time = time.perf_counter()  
    numbers = [10000000 + x for x in range(20)]
    calculate_sums(numbers)
    end_time = time.perf_counter()
    print('Calculation takes {} seconds'.format(end_time - start_time))

if __name__ == '__main__':
    main()

Feel free to share your thoughts and answers in the comments, and also feel free to share today’s content with your colleagues and friends. Let’s communicate and progress together.