Celery is a distributed task queue framework that delegates tasks across multiple workers or threads. It supports parallel processing. As a reader, I assume that you already know about the basics of Celery and this article will only focus on its parallel tasks feature.
The parallel capacity of Celery is based on how many worker instances you run and how many child processes in a single worker instance are set.
$ celery -A celery_app worker --loglevel=INFO
-------------- celery@roelzkie v5.3.6 (emerald-rush) --- ***** ----- -- ******* ---- macOS-11.0.1-arm64-arm-64bit 2024-01-05 23:32:13 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: tasks:0x10651dcd0 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery
Notice that this single worker has 8 child processors by default. See
concurrency: 8 (prefork)
.
This means that my machine can do 8 processes at the same time. It could be different in your case since the number of processes will be based on the number of your CPU cores.
Here's how you check the CPU details in the macOS-based machine.
$ sysctl -n hw.physicalcpu
For Linux-based OS see this stackoverflow post.
You can also control the number of concurrency by providing --concurrency
or
-c
option when starting a celery worker.
$ celery -A celery_app worker --loglevel=INFO -c 2
You may also check the pool details of the celery_app
via:
$ celery -A celery_app inspect stats
In the pool
information, check the processes
.
"pool": {
...
"processes": [
15491,
15492
]
}
Now it's only limited to processing 2 tasks simultaneously. Processes 15491 and 15492.
Now let's explore celery parallel tasks execution with the given worker setup above. Having only 2 concurrencies.
Imagine you have a function cpu_intensive_func
that processes something that could take
seconds to complete.
import os from celery import Celery, group app = Celery("tasks", broker="amqp://guest:guest@localhost:5672") @app.task def cpu_intensive_func(id: int, duration: int, intensity: int) -> str: """Consumes CPU resources for the specified duration and intensity.""" message = f"Execution ID: {id} / Process ID: {os.getpid()}" start_time = time.time() while time.time() - start_time < duration: for _ in range(intensity): # Intensive floating-point calculation x = 1.00000001 ** 10000000 return message # Rollout 10 cpu_intensive_func tasks each will take 3 seconds. group(cpu_intensive_func.s(i, 3, 100000) for i in range(10))()
[2024-01-05 22:53:41,455: INFO/ForkPoolWorker-2] Task celery_app.cpu_intensive_func[06cdfc41-120e-44c5-a177-7631139a212d] succeeded in 3.0003304999991087s: 'Execution ID: 0 / Process ID: 15492' [2024-01-05 22:53:41,455: INFO/ForkPoolWorker-1] Task celery_app.cpu_intensive_func[c4d97ca2-1a65-4b64-9e7e-04190ae36d7c] succeeded in 3.001020208001137s: 'Execution ID: 1 / Process ID: 15491' [2024-01-05 22:53:44,456: INFO/ForkPoolWorker-2] Task celery_app.cpu_intensive_func[116af0cd-8cab-4a77-86ea-9b3e6df503e1] succeeded in 3.0006894170001033s: 'Execution ID: 2 / Process ID: 15492' [2024-01-05 22:53:44,456: INFO/ForkPoolWorker-1] Task celery_app.cpu_intensive_func[e2bc101e-4d4b-418f-aa17-3b9e1109c84e] succeeded in 3.0005129589990247s: 'Execution ID: 3 / Process ID: 15491' [2024-01-05 22:53:47,457: INFO/ForkPoolWorker-2] Task celery_app.cpu_intensive_func[3433c6ef-9bed-458d-9b27-ab7a269433ad] succeeded in 3.000148374998389s: 'Execution ID: 4 / Process ID: 15492' [2024-01-05 22:53:47,458: INFO/ForkPoolWorker-1] Task celery_app.cpu_intensive_func[aa78ed42-73c0-4406-a178-ebffd33d7e13] succeeded in 3.0006364170003508s: 'Execution ID: 5 / Process ID: 15491' [2024-01-05 22:53:50,458: INFO/ForkPoolWorker-2] Task celery_app.cpu_intensive_func[2778838e-cadd-458a-be7e-d4acd2afdc67] succeeded in 3.000157250000484s: 'Execution ID: 6 / Process ID: 15492' [2024-01-05 22:53:50,459: INFO/ForkPoolWorker-1] Task celery_app.cpu_intensive_func[f05b6c9d-40a1-4fac-b409-583bc904ff09] succeeded in 3.001206042001286s: 'Execution ID: 7 / Process ID: 15491' [2024-01-05 22:53:53,459: INFO/ForkPoolWorker-2] Task celery_app.cpu_intensive_func[73a93c04-9c78-4b53-8dfb-119b854a9a01] succeeded in 3.0006820839989814s: 'Execution ID: 8 / Process ID: 15492' [2024-01-05 22:53:53,461: INFO/ForkPoolWorker-1] Task celery_app.cpu_intensive_func[dec8de30-0b71-40da-b4d6-ae9c88321550] succeeded in 3.000751625000703s: 'Execution ID: 9 / Process ID: 15491'
Image shows two CPU cores 3 and 7 are at or almost 100%.
Notice the Process ID: [PID]
. Each cpu_intensive_func
is delegated in a
separate worker thread/process and the processes took 12-15 seconds to complete. If you check the
timestamp, some tasks were executed almost or at the same time.
If we only set a single worker with one processor (concurrency):
$ celery -A celery_app worker --loglevel=INFO -c 1
It will execute the task one at a time and all the tasks will take roughly 30 seconds to complete.
Conclusion
So Celery parallelism is based on the number of concurrency and increasing the concurrency will speed up the task completion time. But make sure only to set concurrency at most the number of your CPU cores otherwise, it will cause some performance issues by taking other CPU resources and switching over again which is resource-consuming and may affect other applications.