AsyncOpenAI - Calling OpenAI APIs Asynchronously with Pandas: An Experience Sharing

A guide on calling OpenAI APIs asynchronously with pandas.

ā€¢

TheĀ AsyncOpenAIĀ class is a Python wrapper for the OpenAI API that allows users to perform asynchronous requests to the API. The class inherits from the OpenAI class and overrides some of its methods to use the asyncio library for concurrency. The AsyncOpenAI class provides the following benefits:

  • Faster performance: Users can send multiple requests to the API in parallel and wait for the results without blocking the main thread. This can improve the efficiency and responsiveness of the application.
  • Easier error handling: Users can use theĀ try/exceptĀ syntax to catch and handle any exceptions raised by the API requests. The class also provides aĀ cancelĀ method to cancel any pending requests.
  • Simpler code: Users can use theĀ async/awaitĀ syntax to write concise and readable code that works with the API. The class also supports theĀ async withĀ context manager to automatically close the session when done.

Here is the brief documentation from the README.md from OpenAIā€™s official GitHub repository openai-pythonĀ (openai/openai-python: The official Python library for the OpenAI API (github.com)

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(
    # defaults to os.environ.get("OPENAI_API_KEY")
    api_key="My API Key",
)


async def main() -> None:
    chat_completion = await client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": "Say this is a test",
            }
        ],
        model="gpt-3.5-turbo",
    )


asyncio.run(main())

Integrating with Pandas

Pandas DataFramesĀ are a staple for data manipulation and analysis. However, when it comes to making API calls for each row in a DataFrame, things can get a bit tricky. Traditional methods of looping through a DataFrame and making synchronous API calls can be time-consuming, especially when dealing with large datasets. This is whereĀ asynchronous operationsĀ come into play.

Asynchronous operations allow multiple tasks to be executed concurrently, rather than sequentially. This means that while one task is waiting for a response (such as an API call), other tasks can continue to execute. This can significantly reduce the overall time required to process large datasets.

The Wrong Way of UsingĀ AsyncOpenAIĀ Class

I ought to admit, I rarely use async (e.g., asyncio) in my code, so Iā€™m have no clear idea how to make it works with Pandas methods such as .map() and .apply() , thus naturally I was starting with the code below, where I use .map() to attempt to map every single value stored in the text column to the function. Iā€™m quite sure this is not rare since when I was trying to search for the solution, I found a post on StackOverflow where someone was also posting the following and asking for the proper way to use the AsyncOpenAIclass with Pandas.

After digging into how async works, here is something I was confident that it will work:

# Async (Standalone from above)
async def get_embedding(text):
    response = await client.embeddings.create(input=text, model=EMBEDDING_ENGINE)
    return response.data[0].embedding


def apply_async_get_embedding(dfi):
    loop = asyncio.get_event_loop()
    tasks = [loop.create_task(get_embedding(row[''text''])) for _, row in dfi.iterrows()]
    return loop.run_until_complete(asyncio.gather(*tasks))


results = apply_async_get_embedding(df)
df[''embedding''] = results

Butā€¦ here we go again. I got another error, showing ā€œThis event loop is already runningā€.

Error Message from the Async Code:

The error ā€œRuntimeError: This event loop is already runningā€ typically occurs when we try to nest the event loop. This means that the code is trying to create a new event loop while an existing one is already present. This can cause conflicts and lead to the error.

A common solution is to use theĀ nest_asyncioĀ package, which allows nested use of asyncio.run and loop.run_until_complete1. You can install it using pip and apply it in your code as follows:

Below is the description lifted from theĀ nest_asyncioĀ GitHub repo:

By design asyncioĀ does not allowĀ its event loop to be nested. This presents a practical problem: When in an environment where the event loop is already running itā€™s impossible to run tasks and wait for the result. Trying to do so will give the error ā€œRuntimeError: This event loop is already running".The issue pops up in various environments, such as web servers, GUI applications and inĀ Jupyter notebooks.This module patches asyncio to allow nested use ofĀ asyncio.runĀ andĀ loop.run_until_complete.

All we need to do is add the following two lines in our Jupyter Notebook, and our previous code will work:

import asyncio

nest_asyncio.apply()

One Correct Way of Using AsyncOpenAI Class with Pandas

So now letā€™s sum up all these into the working code.

1. Install the packages

!pip install openai
!pip install nest_asyncio

2. Import the packages

import pandas as pd
import seaborn as sns

from openai import AsyncOpenAI

import asyncio
import nest_asyncio

3. Import and Preview the Dataset

4. Set up the ā€œAsyncOpenAIā€ client object

client = AsyncOpenAI(api_key="Your API Key")

5. [CORE] Define the Async Functions

nest_asyncio.apply()

EMBEDDING_ENGINE = ''text-embedding-ada-002''

async def get_embedding(text):
    response = await client.embeddings.create(input=text, model=EMBEDDING_ENGINE)
    return response.data[0].embedding


def apply_async_get_embedding(dfi):
    loop = asyncio.get_event_loop()
    tasks = [loop.create_task(get_embedding(row[''Review''])) for _, row in dfi.iterrows()]
    return loop.run_until_complete(asyncio.gather(*tasks))

6. Apply the Async Function to the DataFrame (df)

Now we can see the returned embeddings are stored in the last columnĀ embeddingĀ of the DataFrameĀ df.

Here is theĀ link to the full notebook.


Thatā€™s it for this article. Hope to share this so someone who is looking for the solution doesnā€™t have to go through the hassle of trials and errors to make theĀ AsyncOpenAIĀ classĀ to work with the values stored in aĀ Pandas DataFrame.

Continue Learning

Discover more articles on similar topics