Celery — Parallel Tasks

Celery is a distributed task queue framework that delegates tasks across multiple workers or threads. It supports parallel processing. As a reader, I assume that you already know about the basics of Celery and this article will only focus on its parallel tasks feature.

The parallel capacity of Celery is based on how many worker instances you run and how many child processes in a single worker instance are set.

                $ celery -A celery_app worker --loglevel=INFO
                
    -------------- celery@roelzkie v5.3.6 (emerald-rush)
--- ***** ----- 
-- ******* ---- macOS-11.0.1-arm64-arm-64bit 2024-01-05 23:32:13
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x10651dcd0
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
    -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                
            

Notice that this single worker has 8 child processors by default. See concurrency: 8 (prefork).

This means that my machine can do 8 processes at the same time. It could be different in your case since the number of processes will be based on the number of your CPU cores.

Here's how you check the CPU details in the macOS-based machine.

                $ sysctl -n hw.physicalcpu
            

For Linux-based OS see this stackoverflow post.

You can also control the number of concurrency by providing --concurrency or -c option when starting a celery worker.

                $ celery -A celery_app worker --loglevel=INFO -c 2
            

You may also check the pool details of the celery_app via:

                $ celery -A celery_app inspect stats
            

In the pool information, check the processes.

                
"pool": {
    ... 
    "processes": [
        15491,
        15492
    ]
}
                
            

Now it's only limited to processing 2 tasks simultaneously. Processes 15491 and 15492.

Now let's explore celery parallel tasks execution with the given worker setup above. Having only 2 concurrencies.

Imagine you have a function cpu_intensive_func that processes something that could take seconds to complete.

                
import os

from celery import Celery, group

app = Celery("tasks", broker="amqp://guest:guest@localhost:5672")


@app.task
def cpu_intensive_func(id: int, duration: int, intensity: int) -> str:
    """Consumes CPU resources for the specified duration and intensity."""

    message = f"Execution ID: {id} / Process ID: {os.getpid()}"
    start_time = time.time()
    while time.time() - start_time < duration:
        for _ in range(intensity):
            # Intensive floating-point calculation
            x = 1.00000001 ** 10000000

    return message


# Rollout 10 cpu_intensive_func tasks each will take 3 seconds.
group(cpu_intensive_func.s(i, 3, 100000) for i in range(10))()
                
                
[2024-01-05 22:53:41,455: INFO/ForkPoolWorker-2] Task celery_app.cpu_intensive_func[06cdfc41-120e-44c5-a177-7631139a212d] succeeded in 3.0003304999991087s: 'Execution ID: 0 / Process ID: 15492'
[2024-01-05 22:53:41,455: INFO/ForkPoolWorker-1] Task celery_app.cpu_intensive_func[c4d97ca2-1a65-4b64-9e7e-04190ae36d7c] succeeded in 3.001020208001137s: 'Execution ID: 1 / Process ID: 15491'
[2024-01-05 22:53:44,456: INFO/ForkPoolWorker-2] Task celery_app.cpu_intensive_func[116af0cd-8cab-4a77-86ea-9b3e6df503e1] succeeded in 3.0006894170001033s: 'Execution ID: 2 / Process ID: 15492'
[2024-01-05 22:53:44,456: INFO/ForkPoolWorker-1] Task celery_app.cpu_intensive_func[e2bc101e-4d4b-418f-aa17-3b9e1109c84e] succeeded in 3.0005129589990247s: 'Execution ID: 3 / Process ID: 15491'
[2024-01-05 22:53:47,457: INFO/ForkPoolWorker-2] Task celery_app.cpu_intensive_func[3433c6ef-9bed-458d-9b27-ab7a269433ad] succeeded in 3.000148374998389s: 'Execution ID: 4 / Process ID: 15492'
[2024-01-05 22:53:47,458: INFO/ForkPoolWorker-1] Task celery_app.cpu_intensive_func[aa78ed42-73c0-4406-a178-ebffd33d7e13] succeeded in 3.0006364170003508s: 'Execution ID: 5 / Process ID: 15491'
[2024-01-05 22:53:50,458: INFO/ForkPoolWorker-2] Task celery_app.cpu_intensive_func[2778838e-cadd-458a-be7e-d4acd2afdc67] succeeded in 3.000157250000484s: 'Execution ID: 6 / Process ID: 15492'
[2024-01-05 22:53:50,459: INFO/ForkPoolWorker-1] Task celery_app.cpu_intensive_func[f05b6c9d-40a1-4fac-b409-583bc904ff09] succeeded in 3.001206042001286s: 'Execution ID: 7 / Process ID: 15491'
[2024-01-05 22:53:53,459: INFO/ForkPoolWorker-2] Task celery_app.cpu_intensive_func[73a93c04-9c78-4b53-8dfb-119b854a9a01] succeeded in 3.0006820839989814s: 'Execution ID: 8 / Process ID: 15492'
[2024-01-05 22:53:53,461: INFO/ForkPoolWorker-1] Task celery_app.cpu_intensive_func[dec8de30-0b71-40da-b4d6-ae9c88321550] succeeded in 3.000751625000703s: 'Execution ID: 9 / Process ID: 15491'
                
            

htop

Image shows two CPU cores 3 and 7 are at or almost 100%.

Notice the Process ID: [PID]. Each cpu_intensive_func is delegated in a separate worker thread/process and the processes took 12-15 seconds to complete. If you check the timestamp, some tasks were executed almost or at the same time.

If we only set a single worker with one processor (concurrency):

                $ celery -A celery_app worker --loglevel=INFO -c 1
            

It will execute the task one at a time and all the tasks will take roughly 30 seconds to complete.

Conclusion

So Celery parallelism is based on the number of concurrency and increasing the concurrency will speed up the task completion time. But make sure only to set concurrency at most the number of your CPU cores otherwise, it will cause some performance issues by taking other CPU resources and switching over again which is resource-consuming and may affect other applications.