20 Unveiling Python Coroutines

20 Unveiling Python Coroutines #

Hello, I am Jingxiao.

At the end of the previous lesson, we left a small cliffhanger: generators played an important role in implementing Python coroutines in Python 2.

So first, you need to understand what coroutines are.

Coroutines are a way of achieving concurrent programming. When we talk about concurrency, you may immediately think of the multi-threading/multi-processing model, and yes, multi-threading/multi-processing is one of the classic models for solving concurrency problems. In the early days of the Internet, multi-threading/multi-processing played a significant role in server concurrency.

With the rapid development of the Internet, you gradually encountered the C10K bottleneck, which means having ten thousand clients connected to a server simultaneously. As a result, many codes crashed, the process context switching consumed a large amount of resources, and the threads couldn’t handle such a huge pressure. At this time, NGINX came out with an event loop to save the world.

If we analogize multi-processing/multi-threading to the separatist warlords in the Tang Dynasty, the event loop would be the strengthened centralized tyranny in the Song Dynasty. The event loop starts a unified scheduler, allowing the scheduler to decide which task to run at any given moment, thus eliminating the various overheads of starting threads, managing threads, and synchronizing locks in multi-threading. NGINX, from the same period, can maintain low resource consumption and high performance under high concurrency, and compared to Apache, it supports more concurrent connections.

Later on, a well-known term called “callback hell” appeared. Those who have hand-coded JavaScript are surely familiar with what I am talking about. We were pleasantly surprised to find that this tool perfectly inherited the superiority of the event loop while also providing the syntax sugar of async/await, solving the dilemma of coexistence between execution and readability. As a result, more and more people gradually discovered and favored coroutines, and there were also more and more people who started doing backend development with Node.js. (Let me tell you a joke: JavaScript is a programming language.)

Back to our Python. Using generators was the old method to implement coroutines in the Python 2 era. Python 3.7 provides a new method based on asyncio and async/await. In this lesson, as well, keeping up with the times, we will abandon the old generator-based method, which is not easy to understand or write, and directly discuss the new method.

We will start with a web crawler example, using clear explanations and practical examples to help you understand this concept, which is not particularly easy to grasp. Afterwards, we will gradually delve deeper into the core of coroutines.

Starting with a Web Crawler #

A web crawler, also known as a spider, came into existence along with search engines. Every second, web crawlers crawl a large number of web pages, extract key information, and store it in a database for future analysis. Web crawlers can be implemented with as little as ten lines of Python code, or they can be as complex as Google’s global distributed crawlers, consisting of millions of lines of code distributed across thousands of servers, sniffing information from all over the world.

Without further ado, let’s start with a simple example of a web crawler:

import time

def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    time.sleep(sleep_time)
    print('OK {}'.format(url))

def main(urls):
    for url in urls:
        crawl_page(url)

%time main(['url_1', 'url_2', 'url_3', 'url_4'])

########## Output ##########

crawling url_1
OK url_1
crawling url_2
OK url_2
crawling url_3
OK url_3
crawling url_4
OK url_4
Wall time: 10 s

(Note: The main purpose of this section is to introduce the basic concept of coroutines, so we simplify the crawl_page function of the web crawler to simulate waiting for a few seconds. The sleep time depends on the last digit of the URL.)

This is a very simple web crawler. When the main() function is executed, it calls the crawl_page() function for network communication, waits for a few seconds, receives the results, and then moves on to the next one.

It seems simple, but if you calculate carefully, it actually takes a lot of time. Each page takes between 1 and 4 seconds, adding up to a total of 10 seconds. This is obviously inefficient, so how can we optimize it?

Therefore, a simple idea comes up–we can fully parallelize this crawling operation. Let’s see how to write it using coroutines.

import asyncio

async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))

async def main(urls):
    for url in urls:
        await crawl_page(url)

%time asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))

########## Output ##########

crawling url_1
OK url_1
crawling url_2
OK url_2
crawling url_3
OK url_3
crawling url_4
OK url_4
Wall time: 10 s

Looking at this code, you should have noticed that writing asynchronous programs using coroutines is very simple in Python 3.7 and above.

First, let’s look at import asyncio, which contains most of the magic tools we need to implement coroutines.

The async decorator declares an asynchronous function. Therefore, crawl_page and main here become asynchronous functions. When calling an asynchronous function, we get a coroutine object.

For example, if you print(crawl_page('')), it will output <coroutine object crawl_page at 0x000002BEDF141148>, indicating that it is a Python coroutine object, but the function itself is not actually executed.

Now let’s talk about how coroutines are executed. There are several methods to execute coroutines, and I will introduce the three most common ones here.

Firstly, we can use await to call a coroutine. The effect of await is the same as normal execution in Python. In other words, the program will block at this point, enter the called coroutine function, and continue after it returns. This is also the literal meaning of await. In the code, await asyncio.sleep(sleep_time) will sleep for a few seconds, and await crawl_page(url) will execute the crawl_page() function. Next, we can create tasks using asyncio.create_task(). We will go into detail about this in the next class, but for now, just have a basic understanding of it.

Finally, we need to use asyncio.run to trigger the execution. This function, asyncio.run, is a feature introduced in Python 3.7 that simplifies the Coroutine interface of Python. You don’t need to worry about how to define and use event loops (we will cover that later). A good programming practice is to use asyncio.run(main()) as the entry point of the main program. You should only call asyncio.run once during the program’s running cycle.

In this way, you should have a general understanding of how coroutines work. You can try running the code now. Oh, why is it still taking 10 seconds?

That’s correct. Remember what was mentioned above, await is a synchronous call. So, until the current call of crawl_page(url) finishes, the next call won’t be triggered. Consequently, the effect of this code is the same as before, as if we wrote synchronous code using an asynchronous interface.

So what can we do now?

Actually, it’s quite simple, and it’s also an important concept in coroutines that I’m going to talk about next: tasks. As usual, let’s take a look at the code.

import asyncio

async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))

async def main(urls):
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
    for task in tasks:
        await task

%time asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))

########## Output ##########

crawling url_1
crawling url_2
crawling url_3
crawling url_4
OK url_1
OK url_2
OK url_3
OK url_4
Wall time: 3.99 s

As you can see, once we have coroutine objects, we can create tasks using asyncio.create_task. The tasks are scheduled for execution quickly, so our code doesn’t block at the task level. Therefore, we need to wait for all tasks to complete, which can be done with for task in tasks: await task.

This time, you can see the effect. The result shows that the total runtime is equal to the runtime of the longest crawler.

Of course, you can also think about how to write this using multithreading. And what if there are tens of thousands of pages to crawl? If you compare it with the coroutine approach, the clearer and cleaner solution should be obvious.

In fact, there is another way to execute tasks:

import asyncio

async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))

async def main(urls):
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
    await asyncio.gather(*tasks)

%time asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))

########## Output ##########

crawling url_1
crawling url_2
crawling url_3
crawling url_4
OK url_1
OK url_2
OK url_3
OK url_4
Wall time: 4.01 s

This code is also easy to understand. The only thing to note is that *tasks unpacks the list, turning the list into function arguments. In contrast, **dict turns a dictionary into function arguments.

In addition, asyncio.create_task and asyncio.run are functions introduced in Python 3.7 and above, and they are easier to understand and read compared to the old interface.

Decryption of Coroutine Runtime #

After discussing so much, now let’s take a look at the code in depth. With the knowledge we’ve gained so far, you should have no trouble understanding these two code snippets.

import asyncio

async def worker_1():
    print('worker_1 start')
    await asyncio.sleep(1)
    print('worker_1 done')

async def worker_2():
    print('worker_2 start')
    await asyncio.sleep(2)
    print('worker_2 done')

async def main():
    print('before await')
    await worker_1()
    print('awaited worker_1')
    await worker_2()
    print('awaited worker_2')

%time asyncio.run(main())

########## Output ##########

before await
worker_1 start
worker_1 done
awaited worker_1
worker_2 start
worker_2 done
awaited worker_2
Wall time: 3 s
import asyncio

async def worker_1():
    print('worker_1 start')
    await asyncio.sleep(1)
    print('worker_1 done')

async def worker_2():
    print('worker_2 start')
    await asyncio.sleep(2)
    print('worker_2 done')

async def main():
    task1 = asyncio.create_task(worker_1())
    task2 = asyncio.create_task(worker_2())
    print('before await')
    await task1
    print('awaited worker_1')
    await task2
    print('awaited worker_2')

%time asyncio.run(main())

########## Output ##########

before await
worker_1 start
worker_2 start
worker_1 done
awaited worker_1
worker_2 done
awaited worker_2
Wall time: 2.01 s

So, what exactly happens in the second code? To give you a detailed understanding of the specific differences between coroutines and threads, I’ve analyzed the entire process in detail. There are quite a few steps, so take your time and let’s go through them one by one:

  1. asyncio.run(main()): The program enters the main() function and the event loop starts.
  2. task1 and task2 tasks are created and enter the event loop awaiting execution.
  3. Once the print statement is reached, 'before await' is printed.
  4. await task1 is executed, the user chooses to switch from the current main task, and the event scheduler starts scheduling worker_1.
  5. worker_1 starts running, 'worker_1 start' is printed, and then it reaches await asyncio.sleep(1) which suspends the current task. The event scheduler then begins scheduling worker_2.
  6. worker_2 starts running, 'worker_2 start' is printed, and then it reaches await asyncio.sleep(2) which also suspends the current task.
  7. The runtime of all the above events should be within the range of 1ms to 10ms, even shorter. The event scheduler stops scheduling at this point.
  8. After one second, the sleep in worker_1 completes, and the event scheduler regains control of task_1, printing 'worker_1 done'. task_1 completes its task and exits the event loop.
  9. await task1 is completed, and the event scheduler passes control back to the main task, printing 'awaited worker_1'. It then waits at await task2.
  10. After two seconds, the sleep in worker_2 completes, and the event scheduler regains control of task_2, printing 'worker_2 done'. task_2 completes its task and exits the event loop.
  11. The main task prints 'awaited worker_2', all coroutine tasks are finished, and the event loop ends.

Now, let’s take it a step further. What if we want to set a time limit for certain coroutine tasks, and cancel them if the time exceeds the limit? And what if we want to handle errors that occur during the runtime of certain coroutine tasks? Let’s take a look at the code:

import asyncio

async def worker_1():
    await asyncio.sleep(1)
    return 1

async def worker_2():
    await asyncio.sleep(2)
    return 2 / 0

async def worker_3():
    await asyncio.sleep(3)
    return 3

async def main():
    task_1 = asyncio.create_task(worker_1())
    task_2 = asyncio.create_task(worker_2())
    task_3 = asyncio.create_task(worker_3())

    await asyncio.sleep(2)
    task_3.cancel()

    res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)
    print(res)

%time asyncio.run(main())

########## Output ##########

[1, ZeroDivisionError('division by zero'), CancelledError()]
Wall time: 2 s

As you can see, worker_1 runs normally, worker_2 encounters an error during its execution, and worker_3 is canceled because it takes too long. All this information is reflected in the final return result res.

However, pay attention to the line return_exceptions=True. If this argument is not set, the error will be fully thrown to the execution layer, which means that all other tasks that have not been executed will be canceled. To avoid this situation, we set return_exceptions to True.

Now, have you noticed? Anything that can be done with threads can also be done with coroutines. So let’s review these key points and use coroutines to implement a classic producer-consumer model.

import asyncio
import random

async def consumer(queue, id):
    while True:
        val = await queue.get()
        print('{} get a val: {}'.format(id, val))
        await asyncio.sleep(1)

async def producer(queue, id):
    for i in range(5):
        val = random.randint(1, 10)
        await queue.put(val)
        print('{} put a val: {}'.format(id, val))
        await asyncio.sleep(1)

async def main():
    queue = asyncio.Queue()

    consumer_1 = asyncio.create_task(consumer(queue, 'consumer_1'))
    consumer_2 = asyncio.create_task(consumer(queue, 'consumer_2'))

    producer_1 = asyncio.create_task(producer(queue, 'producer_1'))
    producer_2 = asyncio.create_task(producer(queue, 'producer_2'))

    await asyncio.sleep(10)
    consumer_1.cancel()
    consumer_2.cancel()
    
    await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True)

%time asyncio.run(main())

########## Output ##########

producer_1 put a val: 5
producer_2 put a val: 3
consumer_1 get a val: 5
consumer_2 get a val: 3
producer_1 put a val: 1
producer_2 put a val: 3
consumer_2 get a val: 1
consumer_1 get a val: 3
producer_1 put a val: 6
producer_2 put a val: 10
consumer_1 get a val: 6
consumer_2 get a val: 10
producer_1 put a val: 4
producer_2 put a val: 5
consumer_2 get a val: 4
consumer_1 get a val: 5
producer_1 put a val: 2
producer_2 put a val: 8
consumer_1 get a val: 2
consumer_2 get a val: 8
Wall time: 10 s

Practical Exercise: Douban Movie Spider #

Finally, let’s move on to today’s practical exercise - implementing a complete coroutine spider.

Task description: Can you use Python to get the names, release dates, and posters of movies recently released in Beijing from this webpage: https://movie.douban.com/cinema/later/beijing/? The posters on this webpage are miniaturized, and I hope you can scrape the posters from the specific movie description page.

Doesn’t sound too difficult, right? I have provided the code for both the synchronous version and the coroutine version below. By comparing the running time and code structure, I hope you can have a deeper understanding of coroutines. (Note: To highlight the key points and simplify the code, I have omitted error handling here.)

However, before referring to the code I provided, can you try writing and running it on your own first?

import requests
from bs4 import BeautifulSoup

def main():
    url = "https://movie.douban.com/cinema/later/beijing/"
    init_page = requests.get(url).content
    init_soup = BeautifulSoup(init_page, 'lxml')

    all_movies = init_soup.find('div', id="showing-soon")
    for each_movie in all_movies.find_all('div', class_="item"):
        all_a_tag = each_movie.find_all('a')
        all_li_tag = each_movie.find_all('li')

        movie_name = all_a_tag[1].text
        url_to_fetch = all_a_tag[1]['href']
        movie_date = all_li_tag[0].text

        response_item = requests.get(url_to_fetch).content
        soup_item = BeautifulSoup(response_item, 'lxml')
        img_tag = soup_item.find('img')

        print('{} {} {}'.format(movie_name, movie_date, img_tag['src']))

%time main()

########## Output ##########

阿拉丁 05月24日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2553992741.jpg
龙珠超布罗利 05月24日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2557371503.jpg
五月天人生无限公司 05月24日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2554324453.jpg
... ...
直播攻略 06月04日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2555957974.jpg
Wall time: 56.6 s



import asyncio
import aiohttp

from bs4 import BeautifulSoup

async def fetch_content(url):
    async with aiohttp.ClientSession(
        headers=header, connector=aiohttp.TCPConnector(ssl=False)
    ) as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    url = "https://movie.douban.com/cinema/later/beijing/"
    init_page = await fetch_content(url)
    init_soup = BeautifulSoup(init_page, 'lxml')

    movie_names, urls_to_fetch, movie_dates = [], [], []

    all_movies = init_soup.find('div', id="showing-soon")
    for each_movie in all_movies.find_all('div', class_="item"):
        all_a_tag = each_movie.find_all('a')
        all_li_tag = each_movie.find_all('li')

        movie_names.append(all_a_tag[1].text)
        urls_to_fetch.append(all_a_tag[1]['href'])
        movie_dates.append(all_li_tag[0].text)

    tasks = [fetch_content(url) for url in urls_to_fetch]
    pages = await asyncio.gather(*tasks)

    for movie_name, movie_date, page in zip(movie_names, movie_dates, pages):
        soup_item = BeautifulSoup(page, 'lxml')
        img_tag = soup_item.find('img')

        print('{} {} {}'.format(movie_name, movie_date, img_tag['src']))

%time asyncio.run(main())

########## Output ##########

阿拉丁 05月24日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2553992741.jpg
龙珠超布罗利 05月24日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2557371503.jpg
五月天人生无限公司 05月24日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2554324453.jpg
... ...
直播攻略 06月04日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2555957974.jpg
Wall time: 4.98 s

Summary #

That’s the end of today’s main content. Today, I went into great detail, starting from a simple web crawler and ending with a real web crawler, with explanations of the latest concepts and usage of Python coroutines interspersed in between. Let me briefly review with you here.

  • The main differences between coroutines and multithreading are: first, coroutines are single-threaded; second, users have control over when to yield control and switch to the next task.
  • The syntax for writing coroutines is more concise and clear. Combining the async/await syntax and create_task is sufficient for moderate-level concurrency requirements.
  • When writing coroutine programs, you need to have a clear concept of event loops in your mind, knowing when the program needs to pause and wait for I/O, and when it needs to execute all the way through.

Lastly, please don’t show off your skills too easily. The multithreading model certainly has its advantages. A truly capable programmer should know when to use the appropriate model to achieve the optimal engineering results, instead of blindly assuming that a certain technique is very powerful and should be used in all projects. Technology is part of engineering, and engineering involves trade-offs between time, resources, manpower, and other complex factors.

Thought-provoking Question #

Finally, here’s a thought-provoking question for you. How can coroutines implement callback functions? Please feel free to leave a message and discuss with me. You are also welcome to share this article with your colleagues and friends. Let’s exchange ideas and progress together.