Thought leadership from the most innovative tech companies, all in one place.

Asynchronous Magic: PyNest and SQLAlchemy 2.0 Drive a 25% Improvement in Python Apps Performance

Exploring the Synergy of PyNest and SQLAlchemy 2.0 in Enhancing Asynchronous Operations and Database Interactions

Overview

In the rapidly evolving web development landscape, asynchronous programming has become a cornerstone for building efficient and scalable applications. PyNest, a new Python framework built on top of FastAPI and follows the modular architecture of NestJS, has recently introduced a new feature that integrates SQLAlchemy 2.0, allowing developers to harness the power of asynchronous database operations. This article delves into the practicalities of this feature, guiding you through setting up and utilizing async database connections in PyNest, thereby elevating your Python applications to new heights of performance.

Asynchronous Programming In PyNest

Asynchronous programming is a paradigm that facilitates non-blocking operations, making it particularly suitable for I/O-bound tasks such as database interactions. PyNest’s integration with SQLAlchemy 2.0 allows developers to manage database operations without the overhead of traditional synchronous processing. This integration not only enhances performance but also improves the scalability and responsiveness of applications.

Prerequisites

Before diving into the implementation, ensure you have the following prerequisites:

  • Python 3.9 or higher
  • PyNest version 0.1.2 or newer
  • SQLAlchemy version 2.0 or newer
  • An appropriate async driver for your database (e.g., asyncpg for PostgreSQL)

Setting Up Your Async PyNest Project

Installation

First, install PyNest and the necessary dependencies:

pip install pynest-api
pip install asyncpg  # For PostgreSQL

Project Initialization

Create a new PyNest project with async capabilities:

pynest create-nest-app -n MyAppName -db postgresql --is-async

This command scaffolds a project with an organized structure, including configuration files essential for async operations.

This is the structure of your project —

├── app.py
├── config.py
├── main.py
├── README.md
├── requirements.txt
├── .gitignore
├── src
│    ├── __init__.py

Let’s dive into each file in your project —

# config.py

from nest.core.database.orm_provider import AsyncOrmProvider
import os
from dotenv import load_dotenv

load_dotenv()

config = AsyncOrmProvider(
    db_type="postgresql",
    config_params={
        "host": os.getenv("POSTGRESQL_HOST"),
        "db_name": os.getenv("POSTGRESQL_DB_NAME"),
        "user": os.getenv("POSTGRESQL_USER"),
        "password": os.getenv("POSTGRESQL_PASSWORD"),
        "port": int(os.getenv("POSTGRESQL_PORT")),
    },
)

PyNest provides the “AsyncOrmProvider”, which is an object for initializing and managing an Asynchronous ORM connection.

# app.py

from config import config
from nest.core.app import App
from src.book.book_module import BookModule

app = App(
    description="PyNest service",
    modules=[BookModule],
    title="Book Store Management Application",
)


@app.on_event("startup")
async def startup():
    await config.create_all()

This file contains the code for initializing the FastAPI application. Note that the app object is taking a variable called “modules” which is a list of objects that contains a bunch of routes that are related to a certain domain in our application (The BookModule is inserted automatically once we create the module — as you will see below).

# main.py
import uvicorn

if __name__ == '__main__':
    uvicorn.run(
        'app:app',
        host="0.0.0.0",
        port=8000,
        reload=True
    )

This main file is our way to start the application. it is equal to run the following —

uvicorn "app:app" --host "0.0.0.0" --port "8000" --reload

Adding Modules

After we create the infrastructure of our application, we want to add content to our API and make it useful. In this case, we will create a BookStore Application that will have one module: A book module that will handle all the operations that are related to books.

We will start with creating the book module. PyNest has a command for creating a whole module (similar to generating resources in NestJS).

pynest g module -n book

This command will create for us a new module under the “src” folder. This is the new structure —

├── app.py
├── config.py
├── main.py
├── README.md
├── requirements.txt
├── src
│    ├── __init__.py
│    ├── book
│    │    ├── __init__.py
│    │    ├── book_controller.py
│    │    ├── book_service.py
│    │    ├── book_model.py
│    ├──  ├── book_entity.py
│    ├──  ├── book_module.py

There is a boilerplate code inside every file. I will demonstrate the code after additions that will be incorporated with my application.


# book_entity.py

from config import config
from sqlalchemy import Integer, String, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship


class Author(config.Base):
    __tablename__ = "authors"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    name: Mapped[str] = mapped_column(String, unique=True)

    books = relationship("Book", back_populates="author")


class Book(config.Base):
    __tablename__ = "books"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    book_name: Mapped[str] = mapped_column(String, unique=True)
    author_id: Mapped[int] = mapped_column(Integer, ForeignKey("authors.id"))
    price: Mapped[int] = mapped_column(Integer)
    description: Mapped[str] = mapped_column(String)
    year_of_publication: Mapped[int] = mapped_column(Integer)
    genre: Mapped[str] = mapped_column(String)
    language: Mapped[str] = mapped_column(String)
    pages: Mapped[int] = mapped_column(Integer)

    author: Mapped[Author] = relationship("Author", back_populates="books")

We utilize the config object (which is the AsyncOrmProvider object) to initialize and create the tables. SQLAlchemy 2.0 necessitates a slightly different syntax. Notably, the mapped_columns function and the Mapped object are employed to leverage Python's typing system.


# book_model.py

from pydantic import BaseModel, Field
from typing import Optional


class Book(BaseModel):
    book_name: str
    author_id: int
    price: int
    description: str
    year_of_publication: int
    genre: str
    language: str
    pages: int

    class Config:
        orm_mode = True
        schema_extra = {
            "example": {
                "book_name": "The Alchemist",
                "author_id": 1,
                "price": 200,
                "description": "The Alchemist follows the journey of an Andalus",
                "year_of_publication": 1988,
                "genre": "Fiction",
                "language": "English",
                "pages": 200,
            }
        }


class BookUpdate(BaseModel):
    book_name: Optional[str] = None
    author_id: Optional[int] = None
    price: Optional[int] = None
    description: Optional[str] = None
    year_of_publication: Optional[int] = None
    genre: Optional[str] = None
    language: Optional[str] = None
    pages: Optional[int] = None

The model file includes Pydantic classes essential for our FastAPI application. FastAPI leverages Pydantic for data validation and typing mechanisms. It’s noteworthy that Sebastián Ramírez Montaño, the creator of FastAPI, has developed a library named SQLModel. This library is poised to bridge Pydantic and SQLAlchemy, and PyNest plans to support it once its async feature is incorporated


# book_service.py

from .book_model import Book, BookUpdate
from .book_entity import Book as BookEntity
from nest.core.decorators import async_db_request_handler

from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession


class BookService:

    @async_db_request_handler
    async def add_book(self, book: Book, session: AsyncSession):
        new_book = BookEntity(**book.dict())
        session.add(new_book)
        await session.commit()
        return new_book.id

    @async_db_request_handler
    async def get_books(self, session: AsyncSession):
        query = select(BookEntity)
        result = await session.execute(query)
        return result.scalars().all()

    @async_db_request_handler
    async def get_book(self, book_id: int, session: AsyncSession):
        query = select(BookEntity).where(BookEntity.id == book_id)
        result = await session.execute(query)
        book = result.scalars().first()
        if not book:
            raise Exception(f"Book {book_id} not found")
        return book

    @async_db_request_handler
    async def update_book(self, book_id: int, book: BookUpdate, session: AsyncSession):
        book_entity = await self.get_book(book_id, session)
        for key, value in book.dict().items():
            if value is not None:
                setattr(book_entity, key, value)
        await session.commit()
        return {"message": f"Book {book_id} updated successfully"}

    @async_db_request_handler
    async def delete_book(self, book_id: int, session: AsyncSession):
        book_entity = await self.get_book(book_id, session)
        await session.delete(book_entity)
        await session.commit()
        return {"message": f"Book {book_id} deleted successfully"}

    async def extensive_query(self, session: AsyncSession):
        stmt = text("SELECT pg_sleep(5)")
        await session.execute(stmt)
        return {"message": "Extensive query executed successfully"}

This service will be responsible for data storage and retrieval and is designed to be used by the Bookontroller .

Key Responsibilities

  1. CRUD Operations: The class provides methods for adding (add_book), retrieving (get_books and get_book), updating (update_book), and deleting (delete_book) book records. These methods encapsulate the logic for interacting with the database.
  2. Asynchronous Session Management: The class methods receive an AsyncSession instance from the controller, which is part of SQLAlchemy's asynchronous extension. This session is used for executing database queries and transactions, ensuring that database interactions are performed within a session's scope, providing consistency and integrity. Here is the place to mention that PyNest provides 2 ways to interact Asynchronously with your database (more about that — HERE)
  3. Error Handling: Through the decorator @async_db_request_handler, which is responsible for handling exceptions and collecting metadata about the function.

# book_controller.py

from nest.core import Controller, Get, Post, Depends, Put, Delete
from sqlalchemy.ext.asyncio import AsyncSession
from config import config

from .book_service import BookService
from .book_model import Book, BookUpdate


@Controller("book")
class BookController:

    service: BookService = Depends(BookService)
    session: AsyncSession = Depends(config.get_db)

    @Get("/")
    async def get_books(self):
        return await self.service.get_books(self.session)

    @Post("/")
    async def add_book(
            self, book: Book
    ):
        return await self.service.add_book(book, self.session)

    @Get("/extensive_query")
    async def extensive_query(self):
        return await self.service.extensive_query(self.session)

    @Get("/{book_id}")
    async def get_book_by_id(
            self, book_id: int
    ):
        return await self.service.get_book(book_id, self.session)

    @Put("/{book_id}")
    async def update_book(
            self,
            book_id: int,
            book: BookUpdate,
    ):
        return await self.service.update_book(book_id, book, self.session)

    @Delete("/{book_id}")
    async def delete_book(
            self, book_id: int
    ):
        return await self.service.delete_book(book_id, self.session)

The BookController in book_controller.py serves as the central hub for handling HTTP requests related to books, utilizing Dependency Injection (DI) to seamlessly integrate BookService and AsyncSession for database operations. It defines various routes such as Get, Post, Put, and Delete, enabling operations like retrieving all books, adding a new book, updating, and deleting a book by its ID.

# book_module.py
from .book_service import BookService
from .book_controller import BookController


class BookModule:
    def __init__(self):
        self.providers = [BookService]
        self.controllers = [BookController]

The BookModule class in this application serves as a modular configuration unit, encapsulating all the components related to the 'Book' feature. This approach aligns with the principle of modularity in software design, which emphasizes dividing a program into separate, independent sections.


Amazing! now that we have all the desired components of our BookStore application (well, only a part of it, but this is only for demonstration), we can run the application —

uvicorn "app:app" --host "0.0.0.0" --port "8000" --reload

Now, let’s visit the OpenAPI docs at — http://0.0.0.0:8000/docs

Crud operations on the book resource

We almost reached the end of this article, but there is one more thing we must discuss — WHY do you need this?

Comparison — Async App VS Sync App

So I've created two identical applications, one using the good old sync sqlalchemy approach, and the other as the above.

To both of them, I've added another endpoint, that involves an extensive query that takes 5 seconds for each call. In real, the function looks like —

# all import from above

class BookService:

    # all methods from above

    async def extensive_query(self, session: AsyncSession):
        stmt = text("SELECT pg_sleep(5)")
        await session.execute(stmt)
        return {"message": "Extensive query executed successfully"}

Now, we have two apps, with this endpoint that takes 5 seconds per request. Let’s start making requests from 10 different clients, each client will perform a set of requests as will be mentioned below in the main program.

We will run these requests for 5 minutes, and calculate the RPM (requests per minute) to see the difference.

First thing first, we’ll create a client that will be responsible for sending HTTP requests to our Pynest service —

import aiohttp
import random
from src.book.book_model import Book
from faker import Faker


class AsyncBookClient:
    def __init__(self):
        self.url = "http://localhost:8000/book"
        self.endpoints = {
            "get_book": self.url + "/",
            "add_book": self.url + "/",
            "get_book_by_id": self.url + "/{book_id}",
            "update_book": self.url + "/{book_id}",
            "delete_book": self.url + "/{book_id}",
            "extensive_query": self.url + "/extensive_query",
        }
        self.faker = Faker()

    async def get_book(self):
        async with aiohttp.ClientSession() as session:
            async with session.get(self.endpoints["get_book"]) as response:
                return await response.json()

    async def add_book(self):
        book = self.generate_book()
        async with aiohttp.ClientSession() as session:
            async with session.post(self.endpoints["add_book"], json=book.dict()) as response:
                return await response.json()

    async def get_book_by_id(self):
        book_id = random.choice([book["id"] for book in await self.get_book()])
        async with aiohttp.ClientSession() as session:
            async with session.get(self.endpoints["get_book_by_id"].format(book_id=book_id)) as response:
                return await response.json()

    async def extensive_query(self):
        async with aiohttp.ClientSession() as session:
            async with session.get(self.endpoints["extensive_query"]) as response:
                return await response.json()

    def generate_book(self):
        book_name = self.faker.name()
        author_id = 1
        price: int = self.faker.pyint()
        description: str = self.faker.text()
        year_of_publication: int = random.randint(1900, 2021)
        genre: str = random.choice(["fiction", "non-fiction", "biography", "autobiography"])
        language: str = random.choice(["English", "Hindi", "French", "German", "Spanish"])
        pages: int = random.choice([100, 200, 300, 400, 500])
        return Book(
            book_name=book_name,
            author_id=author_id,
            price=price,
            description=description,
            year_of_publication=year_of_publication,
            genre=genre,
            language=language,
            pages=pages,
        )

The client uses aiohttp for sending async requests to our server.

Then we will create a program that initializes 10 clients and send multiple requests.

import asyncio
from src.book.book_client import AsyncBookClient
import time


async def run_client(client, end_time):
    while time.time() < end_time:
        print(await client.get_book())
        print(await client.get_book_by_id())
        print(await client.extensive_query())
        print(await client.add_book())
        print(await client.add_book())
        print(await client.add_book())
        print(await client.get_book())
        print(await client.get_book_by_id())
        print(await client.get_book())
        print(await client.get_book_by_id())
        print(await client.extensive_query())
        global request_count
        request_count += 11  # Increment for each API call made


async def main():
    global request_count
    request_count = 0
    clients = [AsyncBookClient() for _ in range(10)]
    end_time = time.time() + 300  # Run for 5 minutes

    tasks = [asyncio.create_task(run_client(client, end_time)) for client in clients]
    await asyncio.gather(*tasks)

    rpm = request_count / 5
    print(f"Requests per minute: {rpm}")


asyncio.run(main())

The result is — Requests per minute: 550.0

Now let’s do the same thing, on our non-async PyNest Service and see if there is any difference.

The result is — Requests per minute: 446.6

Conclusion

This means that when we encounter an extensive query, such as one that takes 5 seconds, and when clients invoke this query twice within a set of 11 requests, the difference in performance between an Async Service and a Non-Async Service is approximately 100 requests per minute, amounting to an almost 25% improvement. This number is likely to increase exponentially with a greater number of extensive queries and with more complex queries (such as joins between large tables or massive aggregations). Therefore, it becomes clear that for a Python engineer building the backend of a web application or a microservice for a large system, utilizing an async connection to their RDBMS database is essential to achieve optimal performance.

Resources




Continue Learning