circuit

How to Schedule Recurring Jobs or Tasks Using Celery

Schedule Jobs 2: Schedule recurring jobs/tasks using Celery.


In the last tutorial [1], I have demonstrated how to schedule jobs using Crontab. In this tutorial, I continue to demonstrate how to schedule recurring jobs/tasks using Celery.

Overview

Celery is a popular and powerful (open source) asynchronous task/job queue based on distributed message passing. It supports both scheduling and queuing tasks/jobs. This tutorial focuses on scheduling recurring tasks/jobs. The diagram below illustrates the architecture of a typical application for scheduling recurring tasks/jobs.

Generally speaking, as a Task Queue, Celery consists of

  • a producer, which defines tasks, and is responsible for pushing the task messages into the broker(s);
  • multiple workers, which pull the task messages from the broker, execute the tasks and save the results into the result backend (database)

Celery itself does not include broker and (result) backend. Therefore, the Task Queue itself needs a message broker and a (result) backend. Celery supports a variety of message brokers, such as Redis, RabbitMQ, Amazon SQS, etc., and result backends, such as AMQP, Redis, Memcached, SQLAlchemy (PostgreSQL, MySQL, SQLite), etc.

As a Task Scheduler, we need one more component, celery beat, which pushes tasks into the message queue at regular intervals, such that as soon as any worker is available, the tasks are picked up and executed. Celery beat supports four different ways to define a recurring task.

  • regular (time) interval of the recurring task: e.g. checking the status of a sensor once every 10 seconds
  • crontab schedule: e.g. generating a sales report and sending it to all stakeholders via email every day after midnight
  • solar schedule: e.g. recording the weather-related information at sunset at a specific geographic location every day
  • custom schedule: when the default scheduler does not fit your use case, for example, you want to store your schedules and status information in a specific SQL or NoSQL database of your choice instead of the local shelve database file, you need to define your own (custom) scheduler by subclassing both celery.beat.Scheduler and celery.beat.ScheduleEntry

In this tutorial, we focus on the default scheduler, celery.beat.PersistentScheduler, and demonstrate how to build a task scheduler using Redis as a message broker and PostgreSQL as a result backend.

Step 1: Preparing Broker and Backend

First, we start a Redis server and a PostgreSQL server using docker containers respectively.

1.1 Preparing result backend

demo@localhost ~ % docker run -d --name demo_backend -p 5432:5432 -e POSTGRES_PASSWORD=dbc postgres:latest  
0b205dd31d60ec6c8235219dfdb4088b7f3cf93ace9a40c068562381b49dc9d7  
demo@localhost ~ % docker ps  
CONTAINER ID   IMAGE                                                     COMMAND                  CREATED         STATUS         PORTS                                       NAMES  
0b205dd31d60   postgres:latest                                           "docker-entrypoint.s…"   5 seconds ago   Up 3 seconds   0.0.0.0:5432->5432/tcp, :::5432->5432/tcp   demo_backend  
demo@localhost ~ %  
demo@localhost ~ % psql -h localhost -p 5432 -U postgres  
Password for user postgres:  
psql (14.0)  
Type "help" for help.
postgres=# SELECT datname, dattablespace FROM pg_catalog.pg_database ORDER BY 2,1 DESC;  
  datname  | dattablespace  
-----------+---------------  
 template1 |          1663  
 template0 |          1663  
 postgres  |          1663  
(3 rows)
postgres=# CREATE DATABASE demo;  
CREATE DATABASE  
postgres=# SELECT datname, dattablespace FROM pg_catalog.pg_database ORDER BY 2,1 DESC;  
  datname  | dattablespace  
-----------+---------------  
 template1 |          1663  
 template0 |          1663  
 postgres  |          1663  
 demo      |          1663  
(4 rows)
postgres=#

1.2 Preparing Message Broker

demo@localhost ~ %  docker run -d \  
> -h localhost \  
> -e REDIS_PASSWORD=redis \  
> -p 6379:6379 \  
> --name demo_broker \  
> --restart always \  
> redis:latest /bin/sh -c 'redis-server --appendonly yes --requirepass ${REDIS_PASSWORD}'  
74d75f5ba433f18d466ee089ff96d95b2f2b038acb03d28f3c410fad90259d90  
demo@localhost ~ %  
demo@localhost ~ % docker ps  
CONTAINER ID   IMAGE                                                     
COMMAND                  CREATED          STATUS          PORTS                                       NAMES  
74d75f5ba433   redis:latest                                              "docker-entrypoint.s…"   3 seconds ago    Up 3 seconds    0.0.0.0:6379->6379/tcp, :::6379->6379/tcp   demo_broker  
0b205dd31d60   postgres:latest                                           "docker-entrypoint.s…"   9 minutes ago   Up 9 minutes   0.0.0.0:5432->5432/tcp, :::5432->5432/tcp   demo_backend  
demo@localhost ~ %  
demo@localhost ~ % redis-cli -h localhost -p 6379 
localhost:6379> keys *  
(error) NOAUTH Authentication required.  
localhost:6379> AUTH redis  
OK  
localhost:6379> keys *  
(empty array)  
localhost:6379>

Step 2: Scheduling and Executing Recurring Tasks

In the sample code snippet below, we defined two periodic tasks: current_weather(city) and weather_forecast(zip_code). They are scheduled in two different ways (in the schedule_periodic_tasks() function). The first is scheduled using regular time intervals and will be invoked every 10 seconds, and the second is scheduled using the crontab schedule and will be executed every Monday morning at 7:30 am.

from celery import Celery
from celery.schedules import crontab
import redis, json
import requests as rq
app = Celery("tasks",
backend="db+postgresql://postgres:dbc@localhost:5432/demo",
broker="redis://:redis@localhost:6379/0")
@app.on_after_configure.connect
def schedule_periodic_tasks(sender, **kwargs):
# Checking weather information every 10 seconds
sender.add_periodic_task(10.0, current_weather.s('Los Angeles'))
# Executes every Monday morning at 7:30 am
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
weather_forecast.s('90717'),
)
key = "<MY_API_KEY>"
@app.task(bind=True)
def current_weather(self, city):
url = f'http://api.weatherapi.com/v1/current.json?key={key}&q={city}'
res = json.loads(rq.get(url).text)
weather = {}
weather['last_updated'] = res['current']['last_updated']
weather['condition'] = res['current']['condition']['text']
weather['temp_c'] = res['current']['temp_c']
weather['wind_mph'] = res['current']['wind_mph']
weather['humidity'] = res['current']['humidity']
return weather
@app.task(bind=True)
def weather_forecast(self, zip_code):
url = f'http://api.weatherapi.com/v1/forecast.json?key={key}&q={zip_code}&days=7'
res = json.loads(rq.get(url).text)
weather = {'current':{}, 'forecast':[]}
weather['current']['last_updated'] = res['current']['last_updated']
weather['current']['condition'] = res['current']['text']
weather['current']['temp_c'] = res['current']['temp_c']
weather['current']['wind_mph'] = res['current']['wind_mph']
weather['current']['humidity'] = res['current']['humidity']
for day_res in res['forecast']['forecastday']:
day_weather = {}
day_weather['maxtemp_c'] = day_res['day']['maxtemp_c']
day_weather['mintemp_c'] = day_res['day']['mintemp_c']
day_weather['avgtemp_c'] = day_res['day']['avgtemp_c']
day_weather['maxwind_mph'] = day_res['day']['maxwind_mph']
day_weather['avghumidity'] = day_res['day']['avghumidity']
day_weather['conidition'] = day_res['day']['condition']['text']
weather['forecast'].append(day_weather)
return weather

To start the task scheduler, the following command is executed in the terminal.

demo@localhost ~ % celery -A tasks beat --loglevel=INFO -s ./celerybeat-schedule_11-29-21  
celery beat v5.1.2 (sun-harmonics) is starting.  
__    -    ... __   -        _  
LocalTime -> 2021-11-29 00:51:11  
Configuration ->  
    . broker -> redis://:**@localhost:6379/0  
    . loader -> celery.loaders.app.AppLoader  
    . scheduler -> celery.beat.PersistentScheduler  
    . db -> ./celerybeat-schedule_11-29-21  
    . logfile -> [stderr]@%INFO  
    . maxinterval -> 5.00 minutes (300s)  
[2021-11-29 00:51:11,633: INFO/MainProcess] beat: Starting...  
[2021-11-29 00:51:21,669: INFO/MainProcess] Scheduler: Sending due task tasks.current_weather('Los Angeles') (tasks.current_weather)  
[2021-11-29 00:51:31,657: INFO/MainProcess] Scheduler: Sending due task tasks.current_weather('Los Angeles') (tasks.current_weather)  
[2021-11-29 00:51:41,657: INFO/MainProcess] Scheduler: Sending due task tasks.current_weather('Los Angeles') (tasks.current_weather)  
[2021-11-29 00:51:51,657: INFO/MainProcess] Scheduler: Sending due task tasks.current_weather('Los Angeles') (tasks.current_weather)  
[2021-11-29 00:52:01,657: INFO/MainProcess] Scheduler: Sending due task tasks.current_weather('Los Angeles') (tasks.current_weather)  
[2021-11-29 00:52:11,657: INFO/MainProcess] Scheduler: Sending due task tasks.current_weather('Los Angeles') (tasks.current_weather)  
[2021-11-29 00:52:21,657: INFO/MainProcess] Scheduler: Sending due task tasks.current_weather('Los Angeles') (tasks.current_weather)

From the above logs in the terminal, we can see that the task scheduler has started pushing tasks into the message queue (broker), and the schedules and status are stored in the shelve database file: celerybeat-schedule_11-29-21. By checking the messages in the broker, we can confirm it.

localhost:6379> keys *  
1) "celery"  
2) "_kombu.binding.celery"  
localhost:6379> type celery  
list  
localhost:6379> llen celery  
(integer) 11  
localhost:6379> lrange celery 0 1  
1) "{\"body\": \"W1siTG9zIEFuZ2VsZXMiXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"tasks.current_weather\", \"id\": \"d0faec6e-4973-4054-8cda-0d63f17466f3\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"d0faec6e-4973-4054-8cda-0d63f17466f3\", \"parent_id\": null, \"argsrepr\": \"['Los Angeles']\", \"kwargsrepr\": \"{}\", \"origin\": \"gen81468@localhost\", \"ignore_result\": false}, \"properties\": {\"correlation_id\": \"d0faec6e-4973-4054-8cda-0d63f17466f3\", \"reply_to\": \"3a9f95ed-cbee-37aa-8283-faa7df0fc0b4\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"2cc7d91a-d928-4cad-a618-c0b3187f41e0\"}}"  
2) "{\"body\": \"W1siTG9zIEFuZ2VsZXMiXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"tasks.current_weather\", \"id\": \"bdcad31c-d5d6-43e9-80f5-265e895fc8b8\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"bdcad31c-d5d6-43e9-80f5-265e895fc8b8\", \"parent_id\": null, \"argsrepr\": \"['Los Angeles']\", \"kwargsrepr\": \"{}\", \"origin\": \"gen81468@localhost\", \"ignore_result\": false}, \"properties\": {\"correlation_id\": \"bdcad31c-d5d6-43e9-80f5-265e895fc8b8\", \"reply_to\": \"3a9f95ed-cbee-37aa-8283-faa7df0fc0b4\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"f4433f4d-88a4-49dc-8344-66c7d30dc7d3\"}}"  
localhost:6379>   
localhost:6379>

So far, the tasks are pushed into the message queue, but there is no worker started yet. Therefore, none of the tasks is executed. This can be confirmed by checking the contents in the result backend database.

postgres=# \c demo  
You are now connected to database "demo" as user "postgres".  
demo=# \dt  
Did not find any relations.  
demo=#

To have the tasks pushed into the queue actually executed, we start working in another terminal window as follows.

demo@localhost ~ % celery -A tasks worker --loglevel=info
celery@localhost v5.1.2 (sun-harmonics)
macOS-11.6-x86_64-i386-64bit 2021-11-29 01:10:18
[config]  
.> app:         tasks:0x1097804f0  
.> transport:   redis://:**@localhost:6379/0  
.> results:     postgresql://postgres:**@localhost:5432/demo  
.> concurrency: 16 (prefork)  
.> task events: OFF (enable -E to monitor tasks in this worker)[queues]  
.> celery           exchange=celery(direct) key=celery[tasks]  
  . tasks.current_weather  
  . tasks.weather_forecast[2021-11-29 01:10:19,592: INFO/MainProcess] Connected to redis://:**@localhost:6379/0  
[2021-11-29 01:10:19,606: INFO/MainProcess] mingle: searching for neighbors  
[2021-11-29 01:10:20,642: INFO/MainProcess] mingle: all alone  
[2021-11-29 01:10:20,675: INFO/MainProcess] celery@localhost ready.  
[2021-11-29 01:10:20,679: INFO/MainProcess] Task tasks.current_weather[2e688dce-7d2b-4c5f-a97e-1b1eefd3d359] received  
[2021-11-29 01:10:20,684: INFO/MainProcess] Task tasks.current_weather[5f4b9b44-5c8b-4393-a64c-e1b511471950] received  
[2021-11-29 01:10:20,688: INFO/MainProcess] Task tasks.current_weather[31d7a1cd-3284-4c37-a105-ca2e595361e8] received  
[2021-11-29 01:10:20,693: INFO/MainProcess] Task tasks.current_weather[c1af82ac-5085-44a8-9ed3-8c8340ad7696] received  
[2021-11-29 01:10:20,697: INFO/MainProcess] Task tasks.current_weather[7fd04e9f-49bc-4bc5-83b2-96d1c0662857] received  
[2021-11-29 01:10:20,701: INFO/MainProcess] Task tasks.current_weather[bdcad31c-d5d6-43e9-80f5-265e895fc8b8] received  
[2021-11-29 01:10:20,706: INFO/MainProcess] Task tasks.current_weather[d0faec6e-4973-4054-8cda-0d63f17466f3] received  
[2021-11-29 01:10:20,976: INFO/MainProcess] Task tasks.current_weather[3ffe5246-6f58-4ceb-9df9-dda3be3aa5fc] received  
[2021-11-29 01:10:20,983: INFO/MainProcess] Task tasks.current_weather[f551e732-d3fb-4263-b479-93ef41256938] received  
[2021-11-29 01:10:20,989: INFO/MainProcess] Task tasks.current_weather[8275a0af-fdfd-42e7-a40b-027c37a6410c] received  
[2021-11-29 01:10:20,994: INFO/MainProcess] Task tasks.current_weather[46c2b209-eaf9-4c22-99f5-eb4e3ff40bf6] received  
[2021-11-29 01:10:21,521: INFO/ForkPoolWorker-16] Task tasks.current_weather[2e688dce-7d2b-4c5f-a97e-1b1eefd3d359] succeeded in 0.8405978789999997s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87}  
[2021-11-29 01:10:21,548: INFO/ForkPoolWorker-2] Task tasks.current_weather[31d7a1cd-3284-4c37-a105-ca2e595361e8] succeeded in 0.858004432s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87}  
[2021-11-29 01:10:21,944: INFO/ForkPoolWorker-9] Task tasks.current_weather[8275a0af-fdfd-42e7-a40b-027c37a6410c] succeeded in 0.9534888619999999s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87}  
[2021-11-29 01:10:21,977: INFO/ForkPoolWorker-1] Task tasks.current_weather[5f4b9b44-5c8b-4393-a64c-e1b511471950] succeeded in 1.2927104649999999s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87}  
[2021-11-29 01:10:22,312: INFO/ForkPoolWorker-8] Task tasks.current_weather[f551e732-d3fb-4263-b479-93ef41256938] succeeded in 1.3275789269999998s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87}  
[2021-11-29 01:10:22,542: INFO/ForkPoolWorker-3] Task tasks.current_weather[c1af82ac-5085-44a8-9ed3-8c8340ad7696] succeeded in 1.848025523s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87}  
[2021-11-29 01:10:22,880: INFO/ForkPoolWorker-10] Task tasks.current_weather[46c2b209-eaf9-4c22-99f5-eb4e3ff40bf6] succeeded in 1.884862826s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87}  
[2021-11-29 01:10:23,048: INFO/ForkPoolWorker-7] Task tasks.current_weather[3ffe5246-6f58-4ceb-9df9-dda3be3aa5fc] succeeded in 2.0701256270000004s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87}  
[2021-11-29 01:10:23,592: INFO/ForkPoolWorker-6] Task tasks.current_weather[d0faec6e-4973-4054-8cda-0d63f17466f3] succeeded in 2.621252047s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87}  
[2021-11-29 01:10:24,050: INFO/ForkPoolWorker-4] Task tasks.current_weather[7fd04e9f-49bc-4bc5-83b2-96d1c0662857] succeeded in 3.3515452580000002s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87}  
[2021-11-29 01:10:24,592: INFO/ForkPoolWorker-5] Task tasks.current_weather[bdcad31c-d5d6-43e9-80f5-265e895fc8b8] succeeded in 3.889102834s: {'last_updated': '2021-11-29 00:00', 'condition': 'Fog', 'temp_c': 14.4, 'wind_mph': 3.8, 'humidity': 87}

From the above logs, we can see that the worker has started executing the queued tasks. Checking the result backend (database), we see that the first few results have been saved.

demo=# \dt  
               List of relations  
 Schema |        Name        | Type  |  Owner  
--------+--------------------+-------+----------  
 public | celery_taskmeta    | table | postgres  
 public | celery_tasksetmeta | table | postgres  
(2 rows)demo=# SELECT COUNT(*) FROM celery_taskmeta;  
 count  
-------  
    11  
(1 row)demo=#   
demo=# SELECT task_id, status, result FROM celery_taskmeta;  
               task_id                | status  |                                                                                                                      result  
--------------------------------------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
 2e688dce-7d2b-4c5f-a97e-1b1eefd3d359 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e  
 31d7a1cd-3284-4c37-a105-ca2e595361e8 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e  
 8275a0af-fdfd-42e7-a40b-027c37a6410c | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e  
 5f4b9b44-5c8b-4393-a64c-e1b511471950 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e  
 f551e732-d3fb-4263-b479-93ef41256938 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e  
 c1af82ac-5085-44a8-9ed3-8c8340ad7696 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e  
 46c2b209-eaf9-4c22-99f5-eb4e3ff40bf6 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e  
 3ffe5246-6f58-4ceb-9df9-dda3be3aa5fc | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e  
 d0faec6e-4973-4054-8cda-0d63f17466f3 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e  
 7fd04e9f-49bc-4bc5-83b2-96d1c0662857 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e  
 bdcad31c-d5d6-43e9-80f5-265e895fc8b8 | SUCCESS | \x8005956c000000000000007d94288c0c6c6173745f75706461746564948c10323032312d31312d32392030303a3030948c  
09636f6e646974696f6e948c03466f67948c0674656d705f639447402ccccccccccccd8c0877696e645f6d70689447400e6666666666668c0868756d6964697479944b57752e  
(11 rows)
demo=#

The results stored in the database have been serialized. To see the deserialized results, either pickle or SQLAlchemy can be used (this is demonstrated in [2]).

Conclusion

In this tutorial, I have demonstrated how to schedule recurring tasks using both regular time intervals and crontab schedule, and in the example, we used Redis as broker, and PostgreSQL as result backend. The complete sample code, which is completely deployed using docker-compose, is available in this Github Repo.

Different from scheduling tasks using Cron, celery executes tasks in parallel. In the example, we only started one worker process, but from the logs, we can see that it has 16 threads, which indicates that in our example, we can have up to 16 tasks executed concurrently. If more workers were started, we could have more tasks executed concurrently. However, similar to Cron, Celery beat also has the issue of a single point of failure. According to the documentation,

You have to ensure only a single scheduler is running for a schedule at a time, otherwise you'd end up with duplicate tasks.

To schedule recurring tasks in distributed style, we need to either integrate celery beat with a distributed lock or use a completely different tool, such as Chronos, Quartz, Dkron, etc. In the next tutorial, I am going to show how to integrate celery beat with distributed lock, such that recurring tasks can be scheduled in distributed style.




Continue Learning