How to Manage Exceptions When Waiting On Multiple Asyncio Tasks

Handling exceptions in asyncio requires careful management to ensure that code does not become unstable.

Published on

image

This is the second part of a series of articles looking at problems arising when waiting on multiple events in Python Asyncio. The first article explored how to manage the lifecycle of tasks when using asyncio.wait. This article discusses how asyncio.wait handles exceptions and what the programmer needs to do to ensure that results are gathered and tasks terminated in a coordinated manner.

Waiting on a single coroutine

When awaiting a single coroutine exception handling is managed in exactly the same way as non-asyncio code using a try…except…finally, block. As an example, an exception is raised in the foo coroutine which propagates up to the main block where it is captured and the error is printed.

import asyncio

async def foo():
    raise ValueError("Foo value error")
    return("Foo finished")

async def main():
    try:
        result = await foo()
        print(result)
    except ValueError as e:
        print(f"ValueError: {e}")

asyncio.run(main())

However, if the result in main is waiting on several coroutines to complete what should happen if one of those coroutines throws an exception? Should it be immediately propagated to the main coroutine? What about unfinished tasks? Should they be terminated or allowed to continue? To prevent any unwanted side-effects the programmer needs to have a plan of action to handle exceptions when they arise.

It’s the scheduler that calls the coroutine

In the previous example when we run the main and await the foo coroutines they don’t start running immediately. Instead, they are added to a list of coroutines to run at some point in time in the future. This is a major difference compared with regular Python functions and what allows multiple tasks to seemingly run in parallel.

So, when asyncio.run(main()) is called the coroutine main() is placed on the event loop and, as it is the only coroutine on the event loop, it is called by the scheduler and starts executing. When the main coroutine meets the await foo() statement the foo coroutine is added to the event list and execution of the main function is suspended with control yielded back to the scheduler. The scheduler then looks to see if there are any other tasks to execute and seeing foo calls the coroutine until it terminates. Termination of foo sets a flag in main that the call to foo has completed; so on the next trip round the event loop, the scheduler sees that main is able to continue execution and so calls it, with execution continuing from the point where main yielded.

image

We can also consider the case when main is waiting on two tasks to complete. In this case, two tasks are created (a, b). These are added to the event loop, but will not execute until the main loop yields control back to the scheduler and the scheduler is then free to call the newly created tasks.

image

We can now consider the case where the main coroutine has yielded control to the scheduler and the scheduler has called task a. For some reason, task a rases an exception. This exception does not propagate up to main, as main is a peer of a on the event loop and does, instead, propagate up to the scheduler.

The scheduler captures the exception and passes a task object back to main noting that an exception has been raised. Depending upon the parameters in asyncio.wait the scheduler will then determine whether main is callable on the next trip around the event loop, or whether all tasks have to complete before main is called.

image

At this point you will note:

  • No exception has been raised in main at this point. It can only be handled once the scheduler calls the main coroutine — which occurs when the return_when condition is met.
  • The coroutine a has terminated and will be removed from the event loop. All the resources it used will be released.
  • The status of coroutine b is not determined. It might or might not have started execution; and, might or might not have completed. If the return_when condition for main is FIRST_EXCEPTION then main may continue execution and complete before b is executed and terminates.

When waiting on multiple tasks a plan is needed:

  • What behaviour is required when an exception is raised in a task (coroutine)? Do you want to wait for all other tasks to complete before the calling coroutine continues execution or return immediately?
  • If you wait for all tasks to return how will you determine which tasks returned results and which had exceptions? How many tasks can simultaneously return exceptions? This could be zero up to the number of tasks that you are waiting on.
  • If you return when the first exception is raised how do you ensure that all other tasks have completed execution and any resources they use are safely released? Tasks may be in an unknown state and may create unwanted side-effects if they are left to run indefinitely or resources they rely on are destroyed. For instance, if the calling coroutine closes network connections before the called coroutine is executed then the called coroutine may wait indefinitely for a network connection that doesn’t exist.

Capturing exceptions raised in tasks

To demonstrate exception handling with asyncio.wait the following example has two coroutines: foo which immediately raises a ValueError and bar which sleeps for one second and then returns. The main function encapsulates these coroutines in tasks before waiting for both tasks to complete. When the tasks complete then completed tasks are returned in the done list and tasks which have not completed are returned in pending.

import asyncio

async def foo():
    raise ValueError("Foo value error")
    return("Foo finished")

async def bar():
    await asyncio.sleep(1)
    return("Bar finished")

async def main():
    foo_task = asyncio.create_task(foo(), name="Exception task")
    bar_task = asyncio.create_task(bar(), name="Waiting task")
    try:
        done, pending = await asyncio.wait(
            [foo_task, bar_task],
            return_when=asyncio.ALL_COMPLETED
        )
        for task in done:
            name = task.get_name()
            print(f"Done: {name}")
        for task in pending:
            task.cancel()
    except Exception as e:
        print(f"Exception caught: {e}")

asyncio.run(main())

When foo_task and bar_task complete the done list contains both tasks. However, at this point the exception raised in foo is not re-thrown into main. If the code is run then the following output is produced.

Done: Waiting task
Done: Exception task------------ At program termination -------------
Task exception was never retrieved
future: <Task
 finished name='Exception task'
 coro=<foo() done,exception=ValueError('Foo value error')>
>
ValueError: Foo value error

We can clearly see that both tasks completed, and we are able to retrieve the name of the tasks and print them. However, accessing the name of the task did not cause the exception in foo_task to be re-thrown. What is perhaps more concerning is that the foo_task remains on the event loop until the program terminates. If the exception is not handled then tasks can remain on the event loop indefinitely.

Retrieving exceptions for each task

Tasks returned from asyncio.wait have the following methods which are relevant to handling exceptions:

  • get_name() : This is optional, however, giving tasks meaningful names makes creating descriptive exceptions and debugging significantly easier.
  • exception() : Returns the exception object raised by the coroutine, None if no exception as raised.
  • result() : Returns the result of the coroutine, re-throws the exception if the coroutine raised an exception.

Using these methods we can rewrite the main function to print the name of the task; test whether the coroutine threw an exception and print the exception details; and, retrieve the result of the coroutine, using a try…except block to capture any re-thrown exceptions.

import asyncio

async def foo():
    raise ValueError("Foo value error")
    return("Foo finished")

async def bar():
    await asyncio.sleep(1)
    return("Bar finished")

async def main():
    foo_task = asyncio.create_task(foo(), name="Exception_task")
    bar_task = asyncio.create_task(bar(), name="Waiting_task")
    try:
        done, pending = await asyncio.wait(
            [foo_task, bar_task],
            return_when=asyncio.ALL_COMPLETED
        )
        for task in done:
            name = task.get_name()
            print(f"DONE: {name}")
            exception = task.exception()
            if isinstance(exception, Exception):
                print(f"{name} threw {exception}")
            try:
                result = task.result()
                print(f"{name} returned {result}")
            except ValueError as e:
                print(f"ValueError: {e}")
        for task in pending:
            task.cancel()
    except Exception as e:
        print("Outer Exception")

asyncio.run(main())

Running the above code produces the following output. We can clearly see that foo threw a ValueError exception, which was then re-thrown and caught when .result() was called. Note, the ordering of the returned tasks is the order in which they completed; and, using this comprehensive approach to handling exceptions and pending tasks ensures that all tasks are managed and removed from the event loop prior to termination of the calling coroutine (in this case main).

DONE: Exception_task
Exception_task threw Foo value error
ValueError: Foo value error
DONE: Waiting_task
Waiting_task returned Bar finished------------ At program termination -------------

There are a couple of considerations when planning how to handle exceptions and cancel pending tasks:

  • Plan how you will handle pending tasks and exceptions. It is difficult to debug asyncio code when there is no code to explicitly handle these conditions.
  • There is a high probability that tasks that throw exceptions will return earlier than those that return a value result. This means they will appear earlier, if not first, in the iterable list. Therefore, the first task for which result() method is called may throw an exception, in which case you may want to process all other results and cancel pending tasks before re-throwing this error.
  • The ordering of activities following asyncio.wait may be important in your implementation. Work out the most appropriate order for cancelling pending tasks, getting results, testing for and handling exceptions.
  • If more than one task throws an exception you may need to work out how to aggregate them into a single error message that can be re-thrown once all other tasks have been processed.

Exceptions in asyncio.gather

Python provides asyncio.gather as a higher-level alternative to asyncio.wait when waiting for all tasks to complete. It manages the wrapping of coroutines in tasks and ensuring the tasks are removed from the event loop when the function returns. Using the previous foo bar example we can replace asyncio.wait with:

results = await asyncio.gather(foo(), bar(), return_exceptions=True)
print(results)[ValueError('Foo value error'), 'Bar finished']

The key parameter here is return_exceptions. When this parameter is set to True, the function returns a list of both values and exceptions; allowing all tasks to complete before returning. As all tasks have been completed they are removed from the event loop.

If return_exceptions is set to False (the default value) then asyncio.gather returns immediately, throwing the exception into the calling coroutine. Any task which has not been terminated will remain on the event loop.

The following code explains the behaviour more clearly.

import asyncio

async def foo():
    raise ValueError("Foo value error.")
    return("Foo finished.")

async def bar():
    try:
        await asyncio.sleep(1)
        return("Bar finished.")
    except asyncio.CancelledError:
        print("Bar Cancelled.")

async def main():

    try:
        results = await asyncio.gather(
            foo(), bar(),
            return_exceptions=False
        )
        print(results)
    except ValueError as e:
        print("Value Error raised.")

asyncio.run(main())

When this code executes foo immediately throws a ValueError exception. This propagates immediately to main where it is caught and an error message printed. As the gather function exits the await statement, as a result of the exception, any pending tasks are cancelled (task.cancel()). This causes a CancelledError to be raised in bar.

Value Error raised.
Bar Cancelled.

Always Plan for Cancelled Tasks

The previous example draws attention to potential cancellation of tasks mid-execution. When planning how the code executes it is quite possible that coroutines are cancelled when an execution is thrown elsewhere in the code. In the case of gather, this may cause a CancelledError to be thrown into a peer coroutine.

If the coroutine in which a CancelledError is raised is communicating with another system, accessing a database or changing resources it may leave the system in an unstable state. Catching the CancelledError provides an opportunity to unwind transactions minimising the risk of data corruption, inefficiency, or system failure.

Python asyncio is a powerful tool for improving system efficiency using asynchronous execution of code. However, it is very easy to implement code that does not explicitly handle the lifecycle of tasks and exceptions. Planning in advance how tasks are created and cancelled, and how exceptions should be handled, will significantly reduce the amount of time trying to diagnose bugs that may arise.

Enjoyed this article?

Share it with your network to help others discover it

Continue Learning

Discover more articles on similar topics