Skip to content

Celery Integration

The Celery integration provides a task class that can be used to automatically create and update tasks in Task Badger from Celery tasks. Although you can use the basic SDK functions to create and update tasks from Celery tasks, the Celery integration simplifies the usage significantly.

There are two ways you can use the Celery integration:

  1. Use the CelerySystemIntegration to automatically track all Celery tasks.
  2. Use the taskbadger.celery.Task class as the base class for Celery tasks you wish to track.

You can use both mechanisms at the same time since the base class is useful if you want to access to the Task Badger task object within the body of the Celery task.

Celery System Integration

If you want to track all tasks, you can use the CelerySystemIntegration class. By default, this will track every task that is executed by the Celery workers (except the internal Celery tasks), including periodic / scheduled tasks.

import taskbadger
from taskbadger.systems import CelerySystemIntegration

taskbadger.init(
    organization_slug="my-org",
    project_slug="my-project",
    token="***",
    systems=[CelerySystemIntegration()]
)

To track only certain tasks, you can use the includes and excludes parameters which take a list of task names or patterns:

CelerySystemIntegration(
    includes=[
        "myapp.tasks.*",
        "myapp.other_tasks.special_task"
    ],
    excludes=[
        "myapp.tasks.heartbeat",
    ]
)

Exclusions take precedence over inclusions so if a task name matches both an include and an exclude, it will be excluded.

Celery Task Class

To track individual tasks, or if you want access to the Task object within the body of the Celery task, you can use the taskbadger.celery.Task class as the base class for your Celery tasks. This can be used with or without the CelerySystemIntegration.

This custom Celery task class tells Task Badger that the task should be tracked irrespective of any configuration passed to CelerySystemIntegration ie. even if the task matches an exclusion rule it will still be tracked if it is using taskbadger.celery.Task as its base.

The task class also provides convenient access to the Task Badger task object within the body of the Celery task.

Basic Usage

To use the integration simply set the base parameter of your Celery task to taskbadger.celery.Task:

This works the same with the @celery.shared_task decorator.

from celery import Celery
from taskbadger.celery import Task

app = Celery("tasks")


@app.task(base=Task)
def my_task():
    pass


result = my_task.delay()
taskbadger_task_id = result.taskbadger_task_id
taskbadger_task = result.get_taskbadger_task()

Having made this change, a task will now be automatically created in Task Badger when the celery task is published. The Task Badger task will also be updated when the task completes.

Info

Note that Task Badger will only track the Celery task if it is run asynchronously. If the task is run synchronously via .apply(), by calling the function directly, or if task_always_eager has been set, the task will not be tracked.

This also means that the taskbadger_task_id attribute of the result as well as the return value of result.get_taskbadger_task() will be None if the task is not being tracked by Task Badger.

Task Customization

You can pass additional parameters to the Task Badger Task class which will be used when creating the task. This can be done by passing keyword arguments prefixed with taskbadger_ to the .appy_async() function or to the task decorator.

# using the task decorator

@app.task(base=Task, taskbadger_monitor_id="xyz")
def my_task(arg1, arg2):
    ...


# using individual keyword arguments
my_task.apply_async(
    arg1, arg2,
    taskbadger_name="my task",
    taskbadger_value_max=1000,
    taskbadger_data={"custom": "data"},
)

# using a dictionary
my_task.apply_async(arg1, arg2, taskbadger_kwargs={
    "name": "my task",
    "value_max": 1000,
    "data": {"custom": "data"}
})

Order of Precedence

Values passed via apply_async take precedence over values passed in the task decorator.

In both the decorator and apply_async, if individual keyword arguments are used as well as the taskbadger_kwargs dictionary, the individual arguments will take precedence.

Accessing the Task Object

The taskbadger.celery.Task class provides access to the Task Badger task object via the taskbadger_task property of the Celery task. The Celery task instance can be accessed within a task function body by creating a bound task.

@app.task(bind=True, base=taskbadger.celery.Task)
def my_task(self, items):
    # Retrieve the Task Badger task
    task = self.taskbadger_task

    for i, item in enumerate(items):
        do_something(item)

        if i % 100 == 0:
            # Track progress
            task.update(value=i)

    # Mark the task as complete
    # This is normally handled automatically when the task completes but we call it here so that we
    # can also update the `value` property or other task properties.
    task.success(value=len(items))

Note

The taskbadger_task property will be None if the task is not being tracked by Task Badger. This could indicate that the Task Badger API has not been configured, there was an error creating the task, or the task is being run synchronously e.g. via .apply() or calling the task using .map or .starmap, .chunk.