SSE, a web development technology, establishes a one-way communication channel between a web server and a browser. It facilitates server-initiated real-time updates over a single HTTP connection that consumes fewer resources compared to WebSockets. SSE is commonly employed for delivering live notifications, continuous updates, or data streams to a web page without requiring constant polling from the client.
As you can see this type of request can be useful in some specific cases, so let’s implement this in Django 4.2.
Install and configure ASGI server
Usually, classic Django app uses WSGI server to run the Python code. In our case WSGI server is not a good choice because it was designed for short-lived requests and started SSE streaming responses on it will tie a worker process for the entire duration of the response. This may result in poor performance.
To overcome this we need to run our app via ASGI server.
This opens up the possibility to implement long-lived requests for streaming content and implementing patterns such as long-polling and server-sent events.
I’ll use Daphne ASGI server that also are used in Django Channels.
pip install daphne
In your settings.py
add the Daphne as an installed apps and specify entry point for it:
INSTALLED_APPS = (
"daphne",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.sites",
...
)
ASGI_APPLICATION = "myproject.asgi.application"
Capture events from a source
For event capture we will use Redis Publish/Subscribe mechanism, which allows for real-time message broadcasting and communication between different parts of an application.
Our Django app will be a subscriber. Publishers send messages to channels, while subscribers listen to specific channels for messages.
pip install redis
Create Redis connection function:
from django.conf import settings
from redis import asyncio as aioredis
# Create Redis conection client
def get_async_redis_client():
try:
return aioredis.from_url(
f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}",
encoding="utf8", decode_responses=True
)
except redis.exceptions.ConnectionError as e:
print("Connection error:", e)
except Exception as e:
print("An unexpected error occurred:", e)
To retrieve messages from the channel, it’s necessary to establish a Redis subscriber.
This function will return asynchronous generator that allows it to yield messages one by one in streaming response as they meet the filtering criteria.
async def listen_to_channel(filter_func: Callable, user_id: int) -> AsyncGenerator:
# Create message listener and subscribe on the event source channel
async with get_async_redis_client().pubsub() as listener:
await listener.subscribe(settings.PUSH_NOTIFICATIONS_CHANNEL)
# Create a generator that will 'yield' our data into opened connection
while True:
message = await listener.get_message(
timeout=settings.PUSH_NOTIFICATIONS_DELAY_SECONDS,
ignore_subscribe_messages=True
)
# Send on connect message
if message is None and is_on_connect:
yield ""
is_on_connect = False
continue
# Send heartbeat message
if message is None and not is_on_connect:
message = {"ping": datetime.now()}
yield f"data: {json.dumps(message, default=str)}\n\n"
continue
message = json.loads(message["data"])
# Check if the authorized user is a recipient of the notification
if filter_func(user_id, message):
yield f"data: {json.dumps(message)}\n\n"
Example of filter_func
that allow to sent message only specified in the message recipient.
def is_user_recipient(user_id: int, message: dict[str, Any]) -> bool:
return str(user_id) == message.get("recipient_id")
Make SSE controller
To ensure the event-stream is rendered correctly, it is necessary to add the appropriate media_type
. This can be achieved by inheriting from the DRF BaseRenderer
and redefining the necessary components
https://www.django-rest-framework.org/api-guide/renderers/
# sse_render.py
from rest_framework.renderers import BaseRenderer
class ServerSentEventRenderer(BaseRenderer):
media_type = "text/event-stream"
format = "txt"
def render(self, data, accepted_media_type=None, renderer_context=None):
return data
Also we should specify a content_type
as text/event-stream
and pass our async generator as streaming_content
argument.
# views.py
from django.http import StreamingHttpResponse
from rest_framework.permissions import IsAuthenticated
from rest_framework.views import APIView
from rest_framework.renderers import JsonRenderer
from sse_render import ServerSentEventRenderer
class Notify(APIView):
permission_classes = [IsAuthenticated]
renderer_classes = [JsonRenderer, ServerSentEventRenderer]
def get(self, request):
generator = listen_to_channel(is_user_recipient, request.user.pk)
response = StreamingHttpResponse(streaming_content=generator, content_type="text/event-stream")
response["X-Accel-Buffering"] = "no" # Disable buffering in nginx
response["Cache-Control"] = "no-cache" # Ensure clients don't cache the data
return response
For more example of SSE on Django I would recommend to visit: