Message Protocol — Celery 5.5.0rc1 documentation (original) (raw)

This document describes the current stable version of Celery (5.5). For development docs,go here.

Task messages

Version 2

Definition

properties = { 'correlation_id': uuid task_id, 'content_type': string mimetype, 'content_encoding': string encoding,

# optional
'reply_to': string queue_or_url,

} headers = { 'lang': string 'py' 'task': string task, 'id': uuid task_id, 'root_id': uuid root_id, 'parent_id': uuid parent_id, 'group': uuid group_id,

# optional
'meth': string method_name,
'shadow': string alias_name,
'eta': iso8601 ETA,
'expires': iso8601 expires,
'retries': int retries,
'timelimit': (soft, hard),
'argsrepr': str repr(args),
'kwargsrepr': str repr(kwargs),
'origin': str nodename,
'replaced_task_nesting': int

}

body = ( object[] args, Mapping kwargs, Mapping embed { 'callbacks': Signature[] callbacks, 'errbacks': Signature[] errbacks, 'chain': Signature[] chain, 'chord': Signature chord_callback, } )

Example

This example sends a task message using version 2 of the protocol:

chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8

import json import os import socket

task_id = uuid() args = (2, 2) kwargs = {} basic_publish( message=json.dumps((args, kwargs, None)), application_headers={ 'lang': 'py', 'task': 'proj.tasks.add', 'argsrepr': repr(args), 'kwargsrepr': repr(kwargs), 'origin': '@'.join([os.getpid(), socket.gethostname()]) } properties={ 'correlation_id': task_id, 'content_type': 'application/json', 'content_encoding': 'utf-8', } )

Changes from version 1

def apply_async(self, args, kwargs, **options):
fun, real_args = self.unpack_args(*args)
return super().apply_async(
(fun, real_args, kwargs), shadow=qualname(fun), **options
)

@app.task(base=PickleTask)
def call(fun, args, kwargs):
return fun(*args, **kwargs)

Version 1

In version 1 of the protocol all fields are stored in the message body: meaning workers and intermediate consumers must deserialize the payload to read the fields.

Message body

Example message

This is an example invocation of a celery.task.ping task in json format:

{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77", "task": "celery.task.PingTask", "args": [], "kwargs": {}, "retries": 0, "eta": "2009-11-17T12:30:56.527191"}

Task Serialization

Several types of serialization formats are supported using thecontent_type message header.

The MIME-types supported by default are shown in the following table.

Scheme MIME Type
json application/json
yaml application/x-yaml
pickle application/x-python-serialize
msgpack application/x-msgpack

Event Messages

Event messages are always JSON serialized and can contain arbitrary message body fields.

Since version 4.0. the body can consist of either a single mapping (one event), or a list of mappings (multiple events).

There are also standard fields that must always be present in an event message:

Standard body fields

Standard event types

For a list of standard event types and their fields see theEvent Reference.

Example message

This is the message fields for a task-succeeded event:

properties = { 'routing_key': 'task.succeeded', 'exchange': 'celeryev', 'content_type': 'application/json', 'content_encoding': 'utf-8', 'delivery_mode': 1, } headers = { 'hostname': 'worker1@george.vandelay.com', } body = { 'type': 'task-succeeded', 'hostname': 'worker1@george.vandelay.com', 'pid': 6335, 'clock': 393912923921, 'timestamp': 1401717709.101747, 'utcoffset': -1, 'uuid': '9011d855-fdd1-4f8f-adb3-a413b499eafb', 'retval': '4', 'runtime': 0.0003212, )