Server-Sent Event Feature in Django Rest Framework

How to implement a server-sent event feature in Django.

Django Logo

SSE, a web development technology, establishes a one-way communication channel between a web server and a browser. It facilitates server-initiated real-time updates over a single HTTP connection that consumes fewer resources compared to WebSockets. SSE is commonly employed for delivering live notifications, continuous updates, or data streams to a web page without requiring constant polling from the client.

As you can see this type of request can be useful in some specific cases, so let’s implement this in Django 4.2.

Install and configure ASGI server

Usually, classic Django app uses WSGI server to run the Python code. In our case WSGI server is not a good choice because it was designed for short-lived requests and started SSE streaming responses on it will tie a worker process for the entire duration of the response. This may result in poor performance.

To overcome this we need to run our app via ASGI server.
This opens up the possibility to implement long-lived requests for streaming content and implementing patterns such as long-polling and server-sent events.
I’ll use Daphne ASGI server that also are used in Django Channels.

pip install daphne

In your settings.py add the Daphne as an installed apps and specify entry point for it:

INSTALLED_APPS = (
    "daphne",
    "django.contrib.auth",
    "django.contrib.contenttypes",
    "django.contrib.sessions",
    "django.contrib.sites",
    ...
)
ASGI_APPLICATION = "myproject.asgi.application"

Capture events from a source

For event capture we will use Redis Publish/Subscribe mechanism, which allows for real-time message broadcasting and communication between different parts of an application.
Our Django app will be a subscriber. Publishers send messages to channels, while subscribers listen to specific channels for messages.

pip install redis

Create Redis connection function:

from django.conf import settings
from redis import asyncio as aioredis


# Create Redis conection client
def get_async_redis_client():
    try:
        return aioredis.from_url(
            f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}",
            encoding="utf8", decode_responses=True
        )
    except redis.exceptions.ConnectionError as e:
        print("Connection error:", e)
    except Exception as e:
        print("An unexpected error occurred:", e)

To retrieve messages from the channel, it’s necessary to establish a Redis subscriber.

This function will return asynchronous generator that allows it to yield messages one by one in streaming response as they meet the filtering criteria.

async def listen_to_channel(filter_func: Callable, user_id: int) -> AsyncGenerator:
    # Create message listener and subscribe on the event source channel
    async with get_async_redis_client().pubsub() as listener:
        await listener.subscribe(settings.PUSH_NOTIFICATIONS_CHANNEL)
         # Create a generator that will 'yield' our data into opened connection
        while True:
            message = await listener.get_message(
                timeout=settings.PUSH_NOTIFICATIONS_DELAY_SECONDS,
                ignore_subscribe_messages=True
            )
            # Send on connect message
            if message is None and is_on_connect:
                yield ""
                is_on_connect = False
                continue
            # Send heartbeat message
            if message is None and not is_on_connect:
                message = {"ping": datetime.now()}
                yield f"data: {json.dumps(message, default=str)}\n\n"
                continue
            message = json.loads(message["data"])
            # Check if the authorized user is a recipient of the notification
            if filter_func(user_id, message):
                yield f"data: {json.dumps(message)}\n\n"

Example of filter_func that allow to sent message only specified in the message recipient.

def is_user_recipient(user_id: int, message: dict[str, Any]) -> bool:
    return str(user_id) == message.get("recipient_id")

Make SSE controller

To ensure the event-stream is rendered correctly, it is necessary to add the appropriate media_type. This can be achieved by inheriting from the DRF BaseRenderer and redefining the necessary components
https://www.django-rest-framework.org/api-guide/renderers/

# sse_render.py

from rest_framework.renderers import BaseRenderer

class ServerSentEventRenderer(BaseRenderer):
    media_type = "text/event-stream"
    format = "txt"

    def render(self, data, accepted_media_type=None, renderer_context=None):
        return data

Also we should specify a content_typeas text/event-stream and pass our async generator as streaming_content argument.

# views.py

from django.http import StreamingHttpResponse
from rest_framework.permissions import IsAuthenticated
from rest_framework.views import APIView
from rest_framework.renderers import JsonRenderer
from sse_render import ServerSentEventRenderer


class Notify(APIView):
    permission_classes = [IsAuthenticated]
    renderer_classes = [JsonRenderer, ServerSentEventRenderer]

    def get(self, request):
        generator = listen_to_channel(is_user_recipient, request.user.pk)
        response = StreamingHttpResponse(streaming_content=generator, content_type="text/event-stream")
        response["X-Accel-Buffering"] = "no"  # Disable buffering in nginx
        response["Cache-Control"] = "no-cache"  # Ensure clients don't cache the data
        return response

For more example of SSE on Django I would recommend to visit:

Enjoyed this article?

Share it with your network to help others discover it

Continue Learning

Discover more articles on similar topics