In a synchronous environment, a program execution follows a set of operations sequentially. The execution flow will start processing a step and wait for it to return a result before proceeding to the next one. With asynchronous programming, we can use the lag time required by the operation to process and return a result to continue processing other tasks.

We have previously discussed the concept of concurrency and its usage in Python. Check it out here, if you like.

In this post, we will further discuss the topic of Asynchronous Programming and introduce the asyncio library in Python.

Single-Threaded Asynchrony

Single-Threaded Asynchrony is a programming model which only uses a single thread to achieve concurrency by interleaving the execution of several tasks. In this programming model, the single thread executed the different tasks sequentially.

To process simultaneously, we would require to initialise an array of threads or processes. Let us assume an array of 2 tasks — [T1, T2]. While executing T1, the main thread can temporarily pause the execution of T1 in the thread and start processing T2 before returning to T1 to complete.

The best use case for Single-Threaded Asynchrony is the execution of I/O bound tasks due to their typically high idle times. We trigger an I/O task and during its idle time, the thread can start the execution of another task.

To manage the execution flow we normally use an event loop. The event loop is responsible for maintaining an event queue, gathering items for the said queue, and handling them. We will show practical examples of this below.

Cooperative Multi-Tasking

Any task running in the event loop must be suspended when the event loop encounters either an I/O task or other long-running operations.

In Python, these tasks automatically suspend themselves and allow other operations to run by yielding control to the event loop. On completion of the I/O operation, the task is resumed back to its original state (before it was paused) by the event loop.

We can achieve this behavior through a coroutine object. In Python, we can do this using the async keyword before the function definition. Execution of this coroutine results in a coroutine object.

async def this_is_a_coroutine_func():
# .....

The Coroutine Function

The coroutine function is an asynchronous function that is able to give up control to its caller without it losing its original state. A coroutine function must be awaited using the await keyword. It can wait on events, return the respective result on completion, or raise an exception.

When Python encounters an await, it will pause the execution of the coroutine at that point and passes control to the event loop. The event loop will pass back control to the coroutine when an event occurs such as a timeout event or a completion event.

For the coroutine object to be executed, it needs to be wrapped in a Future object and passed to a running event loop.

We can execute a coroutine in an event loop as follows:

loop.run_until_complete(some_coroutine_task)

Let us write our first asynchronous program.

import asyncio
# define our coroutine
async def say_hello():
print(‘Hello’)
# asynchronous sleep of 1 second
await asyncio.sleep(1)
print(‘World’)
# we initialise our event loop
loop = asyncio.get_event_loop()
# we run our coroutine in the event loop until it is completed
loop.run_until_complete(say_hello())
# close the event loop
loop.close()

view rawasync_test.py hosted with ❤ by GitHub

In the above program, we define our coroutine object say_hello() as a function that prints Hello, wait 1 second, and then prints World. Note that we are defining the coroutine using the async def keywords. Inside our coroutine, we are also calling another coroutine (asyncio.sleep); thus, we need to await this coroutine.

We define the event loop and execute our coroutine in the event loop until it is completed. When the event loop starts execution, we will see the first print statement is displayed. As the execution hits the sleep step, the event loop pauses the execution and awaits its completion. During this time, the event loop can start processing another task.

On completion of the asyncio.sleep operation, the event loop resumes the task to its original state and it proceeds by executing the last print statement.

In summary:

  • The event loop is initialized
  • Coroutine submitted to the event loop as a Future object
  • The Future object’s state is set to pending
  • The event loop starts execution
  • The coroutine executed the first print and the async sleep
  • The coroutine status is changed to suspended and control is passed to the event loop
  • After 1 second, the coroutine is resumed
  • The next print is executed and the coroutine completes
  • Control is returned to the event loop

Future Objects

A Future is an object that controls a function and returns its results on completion. The Future object enables us to use several control operations to manage coroutines. Some available functionality includes:

  • cancel() — cancels a pending Future
  • done() —returns True if the Future was completed or canceled
  • result() —returns the result of Future
  • exception() — returns any exception that happened during the execution of Future
  • add_done_callback() — executes a callback function on Future completion

Task

A task is a subclass of a Future that is used to manage the execution of a coroutine in an event loop.

A coroutine must be wrapped in a Task before it can be executed in an event loop. We can achieve this using the asyncio.create_task().

We can also wait for a task/set of tasks to complete using the asyncio.wait. This provides us with the functionality to wait for all passed coroutines to finish before proceeding.

import asyncio
# define a coroutine to simulate retrieval of an item
async def get_item(i):
await asyncio.sleep(i)
return f’item {i}’
# define a coroutine which executes several coroutines
async def get_items(num_items):
print(‘getting items’)
# create a list of Tasks
item_coros = [asyncio.create_task(get_item(i)) for i in range(num_items)]
print(‘waiting for tasks to complete’)
# wait for all Tasks to complete
completed, pending = await asyncio.wait(item_coros)
# access the Task results
results = [task.result() for task in completed]
print(f’results: {results}’)
print(pending)
# create an event loop
loop = asyncio.get_event_loop()
try:
# execute the coroutine in the event loop
loop.run_until_complete(get_items(4))
finally:
loop.close()

view rawasync_wait_test.py hosted with ❤ by GitHub

In the above program, we are creating a list of Tasks and then executing them in an event loop using the asyncio.wait function. At any point, we have access to all Tasks which have been completed and those which are still pending.

The wait function also provides a timeout functionality. Using the timeout parameter we can specify how long we will wait for the coroutines to finish. If any coroutine has not been completed after the timeout has elapsed, the event loop will pause it and it will remain pending.

We can access the pending Tasks and call the cancel() function to kill them. Moreover, we can also use the asyncio.wait_for() function which also waits for the coroutines with a timeout, but automatically cancels any pending tasks after the respective timeout has elapsed.

Running in Multi-threads

The same code structure can also be used to execute multiple coroutines with multi-threading.

import concurrent.futures
import asyncio
import time
# we create our blocking target function for multi-threading
def blocking_func(n):
time.sleep(0.5)
return n ** 2
# we define our main coroutine
async def main(loop, executor):
print(‘creating executor tasks’)
# create a list of coroutines and execute in the event loop
blocking_tasks = [loop.run_in_executor(executor, blocking_func, i) for i in range(6)]
print(‘waiting for tasks to complete’)
# group the results of all completed coroutines
results = await asyncio.gather(*blocking_tasks)
print(f’results: {results}’)
if __name__ == ‘__main__’:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop, executor))
finally:
loop.close()

view rawasync_multithread.py hosted with ❤ by GitHub

We create the blocking target function that performs our logic (in this case, we are squaring every number). We create a coroutine object that accepts an event loop and a ThreadPool which executes a number of coroutines using multi-threading.

In this example, instead of asyncio.wait, we are using asyncio.gather. The gather function is more high-level and mainly focuses on grouping the results. The wait function does not give us the results directly, rather, we have to manually collect them using the .results() function.

Conclusion

Asynchronous programming can be intimidating at first, especially to new developers. Nonetheless, Python offers several libraries that facilitate how we build, interact, and integrate async operations in our programs. In this post, we introduced and discussed the basic concepts of asynchronous programming to get you started. There are more advanced concepts and functions around the async and await keywords in Python. I highly encourage you to continue reading up on this topic and practice with your own use-cases.

Did you enjoy this post? If yes, perhaps you might want to consider becoming a member to support me and your other favorite writers.

Original Source

Author

Data Scientist at Gaming Innovation Group | AI Enthusiast and Researcher.