We frequently encounter the need to update client data triggered by events on the server, in a one-way manner.
In most projects involving customer interaction, it’s crucial to promptly notify users about events such as successful order processes or receiving likes on a post.
If we conduct brief investigation into the optimal approach for implementing this feature, we will find several potential methods to implement this feature:
1. Web Sockets:
WebSocket is a communication protocol facilitating full-duplex, two-way communication channels through a persistent TCP connection. It enables real-time interaction between a web browser and a server.
WebSocket is suitable for bidirectional communication; however, it necessitates the implementation of a push-notification feature. Moreover, managing a multitude of WebSocket connections can consume extra server resources, posing potential challenges in resource-intensive scenarios.
2. Long Polling:
Long Polling involves the client periodically querying the server for new data. This type of connection is better suited for computationally intensive tasks where event generation requires substantial time.
3. Server-Sent Events (SSE):
SSE, a web development technology, establishes a one-way communication channel between a web server and a browser. It facilitates server-initiated real-time updates over a single HTTP connection. SSE is commonly employed for delivering live notifications, continuous updates, or data streams to a web page without requiring constant polling from the client.
SSE provides unidirectional real-time communication and consumes fewer resources compared to WebSockets. It appears that SSE fits well with all our requirements and criteria.
Hence, I have opted for SSE and intend to demonstrate how to set up SSE in FastAPI.
We will develop a basic FastAPI service to implement push-notification functionality.
The objectives of this service are as follows:
- Authenticate users for SSE requests: User authentication will be carried out by sending requests to an authentication server.
- Capture events from a source: The service will capture events from a source using Redis publish/subscribe functionality.
- Dispatch notifications based on recipients: Event forwarding to recipients will be determined by recipient_id information within the event.
GitHub source code for this article
Dependencies
pyproject.toml
python = "^3.10"
redis = "^4.6.0"
fastapi = {extras = ["all"], version = "^0.100.1"}
sse-starlette = "^1.6.1"
sqlalchemy = "^2.0.19"
aiohttp = "^3.8.5"
Authenticate users for SSE requests
In order to authenticate the user, we must validate its token. To accomplish this, we extract the token from the request header and send a request to the authentication server to verify the token’s validity.
auth_service.py
import aiohttp
from starlette import status
from starlette.datastructures import Headers
from src.config import Config
from src.exceptions import HeadersValidationError, AuthenticationError
# Retrieving the authorization token from the incoming request header
def get_headers_token(headers: Headers) -> str | None:
authorization_header = headers.get("authorization")
if authorization_header is None:
raise HeadersValidationError(msg="Credentials are not provided.", status=status.HTTP_403_FORBIDDEN)
if authorization_header.startswith("Bearer "):
return authorization_header
raise HeadersValidationError(msg="Incorrect token type.", status=status.HTTP_403_FORBIDDEN)
# Check authorization token's validity
async def is_user_authenticated(token: str) -> str | None:
async with aiohttp.ClientSession() as session:
try:
response = await session.get(
f"{Config.MAIN_HOST}/api/v1/is_user_auth/",
headers={"Authorization": token}
)
except aiohttp.ClientError:
raise AuthenticationError(msg="Authentication API connection error.", status=status.HTTP_504_GATEWAY_TIMEOUT)
if response.ok:
return res.get("user_id")
raise AuthenticationError(msg=res, status=status.HTTP_401_UNAUTHORIZED)
💡 Speed up your blog creation with DifferAI.
Available for free exclusively on the free and open blogging platform, Differ.
Capture events from a source/Dispatch notifications based on recipients
For event capture we will use Redis Publish/Subscribe
mechanism, which allows for real-time message broadcasting and communication between different parts of an application.
Our event source server will be a publisher and FastAPI app will be subscriber. Publishers send messages to channels, while subscribers listen to specific channels for messages.
redis_service.py
import redis
from typing import Callable
from redis import asyncio as aioredis
from redis.asyncio.client import Redis
from src.config import Config
# Create Redis conection client
def redis_client():
try:
return aioredis.from_url(
f"redis://{Config.REDIS_HOST}:{Config.REDIS_PORT}",
encoding="utf8", decode_responses=True
)
except redis.exceptions.ConnectionError as e:
print("Connection error:", e)
except Exception as e:
print("An unexpected error occurred:", e)
SSE connection has media type text/event-stream
therefore we should use a generator
to generate the data by “chunks”.
async def listen_to_channel(filter_func: Callable, user_id: str, redis: Redis):
# Create message listener and subscribe on the event source channel
async with redis.pubsub() as listener:
await listener.subscribe(Config.PUSH_NOTIFICATIONS_CHANNEL)
# Create a generator that will 'yield' our data into opened TLS connection
while True:
message = await listener.get_message()
if message is None:
continue
if message.get("type") == "message":
message = json.loads(message["data"])
# Checking, if the user that opened this SSE conection
# is recipient of the message or not.
# The message obj has field recipient_id to compare.
if filter_func(user_id, message):
yield {"data": message}
Connect all together
main.py
import uvicorn
from fastapi import FastAPI
from fastapi.params import Depends
from redis.asyncio.client import Redis
from sse_starlette.sse import EventSourceResponse
from starlette.middleware.cors import CORSMiddleware
from starlette.requests import Request
from src.auth_service import get_headers_token, is_user_authenticated
from src.exceptions import create_response_for_exception, HeadersValidationError, AuthenticationError
from src.redis_service import listen_to_channel, redis_client
from src.utils import is_user_recipient
app = FastAPI(title="notification_service")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# SSE implementation
@app.get("/notify")
async def notification(request: Request, redis: Redis = Depends(redis_client)):
# Get and check request headers
try:
authorization_header = get_headers_token(request.headers)
except HeadersValidationError as e:
return create_response_for_exception(msg=e.msg, status=e.status)
# Authenticate the user
try:
user_id = await is_user_authenticated(authorization_header)
# EventSourceResponse function allows to send python generators as server-sent events.
return EventSourceResponse(listen_to_channel(is_user_recipient, user_id, redis))
except AuthenticationError as e:
return create_response_for_exception(msg=e.msg, status=e.status)
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8001, reload=True)
Summary
SSE is often overshadowed but it is a good tool for certain tasks and requirements. For example: updating dynamic content, sending push notifications, and streaming data in real-time and more.
For more example of SSE on Python I would recommend to visit: