Direct Reply-To | RabbitMQ (original) (raw)

Overview

Direct Reply-To lets you implement RPC (request/reply) patterns like those in tutorial 6 without creating a dedicated reply queue.

Motivation

RPC (request/reply) is a common pattern with brokers such as RabbitMQ.Tutorial 6 shows several client implementations. Typically, the requester (RPC client) sends requests that are routed to a long-lived, known request queue. The responder (RPC server) consumes from that queue and sends replies using the queue name supplied in the request message’s reply-to property.

Where does the requester’s reply queue come from? A requester could declare a single-use queue for each request–reply pair, but that’s inefficient: even an unreplicated queue is relatively expensive to create and delete compared to the cost of receiving a reply. In clusters the overhead is higher because all nodes must agree on creation, type, replication parameters, and other metadata.

A better approach is to create a single reply queue per requester and reuse it across requests. The properties of that queue depend on the use case:

Direct Reply-To is a RabbitMQ-specific alternative that completely eliminates the reply queue. That means:

Main benefits:

With Direct Reply-To, on the broker side, the responder’s AMQP 1.0 session or AMQP 0.9.1 channel process delivers the reply directly to the requester’s session/channel process without going through an actual queue.

“Directly” still means via the broker; there is no point-to-point network connection between the two client applications.

When to Use Direct Reply-To

The main use case is scale: many (tens of thousands of) clients performing request/reply.

While clients should prefer long-lived connections, Direct Reply-To also works well with high connection churn, where a client connects for a single RPC and disconnects immediately afterward. Avoiding queue create/delete reduces overhead and latency.

Since Direct Reply-To has at-most-once delivery semantics for replies, use it only when losing a reply is acceptable. For example, if a reply isn’t received within a timeout, the requester is expected to resend the request.

When to Avoid Direct Reply-To

Avoid Direct Reply-To if any of the following apply:

For workloads with long-lived connections and multiple RPCs, the benefits of Direct Reply-To are smaller relative to using classic queues. Modern RabbitMQ versions optimize classic queues for low latency and low resource usage, so they can be similarly efficient in these scenarios. Conventional request-reply using explicitly declared classic queues is equally valid and can be preferable for long-running tasks.

Broker Implementation Details

Internally, RabbitMQ implements Direct Reply-To using the rabbit_volatile_queue queue type. “Volatile” describes the semantics: non-durable, zero-buffer, at-most-once, may drop, and not stored in the metadata store.

You will see rabbit_volatile_queue only in a few places. Instances do not appear in the Management UI or in rabbitmqctl list_queues.

One place is in Prometheus metrics, for example:

Usage

Direct Reply-To is supported for AMQP 1.0 and AMQP 0.9.1. It also works across protocols (e.g., AMQP 1.0 requester with AMQP 0.9.1 responder, or vice versa).

Usage in AMQP 1.0

The requester first attaches a link to receive reply messages. The attach (with a source) must set specific fields. If your AMQP 1.0 client library supports Direct Reply-To, it will set them for you (see examples). Otherwise:

RabbitMQ returns a broker-generated pseudo-queue address in the address field of the attach response. It looks like /queues/amq.rabbitmq.reply-to.<opaque-suffix>, where <opaque-suffix> is not meaningful to clients.

Before sending the first request, the requester must grant link credit to this pseudo-queue.

For each request, set the following message properties:

  1. message-id: a globally unique value. (The responder will set the reply’s correlation-id to this value.)
  2. reply-to: the address received in the attach response.

The responder reads the request’s reply-to and sends the reply to that address via one of two options:

  1. Attach a sending link to the anonymous terminus (null target address) as described in Target Address v2, and set the reply address in the message’s to property. Useful when replying to many different requesters (no per-requester link).
  2. Create a sending link directly to the provided address. In this case, RabbitMQ checks whether the requester is still connected; if not, RabbitMQ refuses the link.

If the responder will perform expensive work, it can proactively check whether the requester is still present by issuing an HTTP GET over AMQP. A 200 status indicates the requester is still connected (see examples).

AMQP 1.0 Caveats and Limitations

If message loss is unacceptable, use classic queues instead of Direct Reply-To.

Examples: AMQP 1.0


String requestQueue = "request-queue";

// create the responder

Responder responder = connection.responderBuilder()

    .requestQueue(requestQueue)

    .handler((ctx, req) -> {

        // check whether the requester is still connected (optional)

        if (ctx.isRequesterAlive(req)) {

            String in = new String(req.body(), UTF_8);

            String out = "*** " + in + " ***";

            return ctx.message(out.getBytes(UTF_8));

        } else {

            return null;

        }

    }).build();

// create the requester, it uses direct reply-to by default

Requester requester = connection.requesterBuilder()

    .requestAddress().queue(requestQueue)

    .requester()

    .build();

// create the request message

Message request = requester.message("hello".getBytes(UTF_8));

// send the request

CompletableFuture<Message> responseFuture = requester.publish(request);

// wait for the response

Message response = responseFuture.get(10, TimeUnit.SECONDS);

Usage in AMQP 0.9.1

To use Direct Reply-To, a requester must:

  1. Consume from the pseudo-queue amq.rabbitmq.reply-to in no-ack mode. There is no need to declare this "queue" first (though the client may).
  2. Set the request message’s reply-to to amq.rabbitmq.reply-to.

When forwarding the request, RabbitMQ transparently rewrites reply-to to amq.rabbitmq.reply-to.<opaque-suffix>, where <opaque-suffix> is not meaningful to clients. The responder then publishes the reply to the default exchange ("") using that value as the routing key.

If the responder will perform expensive work, it can check whether the client has gone away by passively declaring the generated reply queue name on a disposable channel. Even with passive=false there is no way to create it; the declare either succeeds (0 ready messages, 1 consumer) or fails.

AMQP 0.9.1 Caveats and Limitations

Examples: AMQP 0.9.1


%% 1. Requester consumes from pseudo-queue in no-ack mode.

amqp_channel:subscribe(RequesterChan,

                       #'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,

                                        no_ack = true},

                       self()),

CTagRequester = receive #'basic.consume_ok'{consumer_tag = CTag} -> CTag

                end,

%% 2. Requester sends the request.

amqp_channel:cast(

  RequesterChan,

  #'basic.publish'{routing_key = RequestQueue},

  #amqp_msg{props = #'P_basic'{reply_to = <<"amq.rabbitmq.reply-to">>,

                               message_id = RpcId},

            payload = RequestPayload}),

%% 3. Responder receives the request.

{ReplyTo, RpcId} =

    receive {#'basic.deliver'{consumer_tag = CTagResponder},

             #amqp_msg{payload = RequestPayload,

                       props = #'P_basic'{reply_to = ReplyTo0,

                                          message_id = RpcId0}}} ->

                {ReplyTo0, RpcId0}

    end,

%% 4. Responder replies.

amqp_channel:cast(

  ResponderChan,

  #'basic.publish'{routing_key = ReplyTo},

  #amqp_msg{props = #'P_basic'{correlation_id = RpcId},

            payload = ReplyPayload}),

%% 5. Requester receives the reply

receive {#'basic.deliver'{consumer_tag = CTagRequester},

         #amqp_msg{payload = ReplyPayload,

                   props = #'P_basic'{correlation_id = RpcId}}} ->

            %% process reply here...

            ok

end.