Celery | Sentry for Python (original) (raw)

The Celery integration adds support for the Celery Task Queue System.

Install sentry-sdk from PyPI with the celery extra:

Copied

pip install "sentry-sdk[celery]"

If you have the celery package in your dependencies, the Celery integration will be enabled automatically when you initialize the Sentry SDK.

When using Celery without Django, you'll need to initialize the Sentry SDK in both your application and the Celery worker processes spawned by the Celery daemon.

In addition to capturing errors, you can use Sentry for distributed tracing and profiling. Select what you'd like to install to get the corresponding installation and configuration instructions below.

Error MonitoringTracingProfiling

tasks.py

Copied

from celery import Celery, signals
import sentry_sdk

# Initializing Celery
app = Celery("tasks", broker="...")

# Initialize Sentry SDK on Celery startup
@signals.celeryd_init.connect
def init_sentry(**_kwargs):
    sentry_sdk.init(
        dsn="https://examplePublicKey@o0.ingest.sentry.io/0",
        # Add request headers and IP for users,
        # see https://docs.sentry.io/platforms/python/data-management/data-collected/ for more info
        send_default_pii=True,
        #  performance
        # Set traces_sample_rate to 1.0 to capture 100%
        # of transactions for tracing.
        traces_sample_rate=1.0,
        #  performance
        #  profiling
        # Set profiles_sample_rate to 1.0 to profile 100%
        # of sampled transactions.
        # We recommend adjusting this value in production.
        profiles_sample_rate=1.0,
        #  profiling
    )

# Task definitions go here
@app.task
def add(x, y):
    return x + y

The celeryd_init signal is triggered when the Celery daemon starts, before the worker processes are spawned. If you need to initialize Sentry for each individual worker process, use the worker_init signal instead.

Error MonitoringTracingProfiling

main.py

Copied

from tasks import add
import sentry_sdk

def main():
    # Initializing Sentry SDK in our process
    sentry_sdk.init(
        dsn="https://examplePublicKey@o0.ingest.sentry.io/0",
        # Add data like request headers and IP for users, if applicable;
        # see https://docs.sentry.io/platforms/python/data-management/data-collected/ for more info
        send_default_pii=True,
        #  performance
        # Set traces_sample_rate to 1.0 to capture 100%
        # of transactions for tracing.
        traces_sample_rate=1.0,
        #  performance
        #  profiling
        # Set profiles_sample_rate to 1.0 to profile 100%
        # of sampled transactions.
        # We recommend adjusting this value in production.
        profiles_sample_rate=1.0,
        #  profiling
    )

    # Enqueueing a task to be processed by Celery
    with sentry_sdk.start_transaction(name="calling-a-celery-task"):
        result = add.delay(4, 4)

if __name__ == "__main__":
    main()

If you're using Celery with Django in a typical setup, have initialized the SDK in your settings.py file (as described in the Django integration documentation), and have your Celery configured to use the same settings as config_from_object, there's no need to initialize the Celery SDK separately.

To confirm that your SDK is initialized on worker start, pass debug=True to sentry_sdk.init(). This will add extra output to your Celery logs when the SDK is initialized. If you see the output during worker startup, and not just after a task has started, then it's working correctly.

The snippet below includes an intentional ZeroDivisionError in the Celery task that will be captured by Sentry. To trigger the error call debug_sentry.delay():

tasks.py

Copied

from celery import Celery, signals
import sentry_sdk

app = Celery("tasks", broker="...")

@signals.celeryd_init.connect
def init_sentry(**_kwargs):
    sentry_sdk.init(...)  # same as above

@app.task
def debug_sentry():
    1/0

To set options on CeleryIntegration to change its behavior, add it explicitly to your sentry_sdk.init():

Copied

import sentry_sdk
from sentry_sdk.integrations.celery import CeleryIntegration

sentry_sdk.init(
    # same as above
    integrations=[
        CeleryIntegration(
            monitor_beat_tasks=True,
            exclude_beat_tasks=[
                "unimportant-task",
                "payment-check-.*"
            ],
        ),
    ],
)

You can pass the following keyword arguments to CeleryIntegration():

Distributed tracing extends the trace from the code that's running your Celery task so that it includes the code that initiated the task.

You can disable this globally with the propagate_traces parameter, documented above. If you set propagate_traces to False, all Celery tasks will start their own trace.

If you want to have more fine-grained control over trace distribution, you can override the propagate_traces option by passing the sentry-propagate-traces header when starting the Celery task:

Note: The CeleryIntegration does not utilize the traces_sample_rate config option for deciding if a trace should be propagated into a Celery task.

Copied

import sentry_sdk

# Enable global distributed traces (this is the default, just to be explicit)
sentry_sdk.init(
    # same as above
    integrations=[
        CeleryIntegration(
            propagate_traces=True
        ),
    ],
)

# This will propagate the trace:
my_task_a.delay("some parameter")

# This will propagate the trace:
my_task_b.apply_async(
    args=("some_parameter", )
)

# This will NOT propagate the trace. The task will start its own trace:
my_task_b.apply_async(
    args=("some_parameter", ),
    headers={"sentry-propagate-traces": False},
)

# Note: overriding the tracing behaviour using `task_x.delay()` is not possible.

Help improve this content
Our documentation is open source and available on GitHub. Your contributions are welcome, whether fixing a typo (drat!) or suggesting an update ("yeah, this would be better").