pubsub package - cloud.google.com/go/pubsub/v2 - Go Packages (original) (raw)
Package pubsub provides an easy way to publish and receive Google Cloud Pub/Sub messages, hiding the details of the underlying server RPCs. Pub/Sub is a many-to-many, asynchronous messaging system that decouples senders and receivers.
If you are migrating from the v1 library, please read over the migration guide:https://github.com/googleapis/google-cloud-go/blob/main/pubsub/MIGRATING.md
More information about Pub/Sub is available athttps://cloud.google.com/pubsub/docs.
See https://godoc.org/cloud.google.com/go for authentication, timeouts, connection pooling and similar aspects of this package.
Publishing ¶
Pub/Sub messages are published to topics via publishers. A Topic may be created like so:
ctx := context.Background() client, _ := pubsub.NewClient(ctx, "my-project") topic, err := client.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{ Name: "projects/my-project/topics/my-topic", })
A Publisher client can then be instantiated and used to publish messages.
publisher := client.Publisher(topic.GetName()) res := publisher.Publish(ctx, &pubsub.Message{Data: []byte("payload")})
Publisher.Publish queues the message for publishing and returns immediately. When enough messages have accumulated, or enough time has elapsed, the batch of messages is sent to the Pub/Sub service.
Publisher.Publish returns a PublishResult, which behaves like a future: its Get method blocks until the message has been sent to the service.
The first time you call Publisher.Publish on a Publisher, goroutines are started in the background. To clean up these goroutines, call Publisher.Stop:
publisher.Stop()
Receiving ¶
To receive messages published to a topic, clients create a subscription for the topic. There may be more than one subscription per topic; each message that is published to the topic will be delivered to all associated subscriptions.
You then need to create a Subscriber client to pull messages from a subscription.
A subscription may be created like so:
ctx := context.Background() client, _ := pubsub.NewClient(ctx, "my-project") subscription, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{ Name: "projects/my-project/subscriptions/my-sub", Topic: "projects/my-project/topics/my-topic"} ), }
A Subscriber client can be instantiated like so:
sub := client.Subscriber(subscription.GetName())
You then provide a callback to Subscriber which processes the messages.
err := sub.Receive(ctx, func(ctx context.Context, m *Message) { log.Printf("Got message: %s", m.Data) m.Ack() }) if err != nil && !errors.Is(err, context.Canceled) { // Handle error. }
The callback is invoked concurrently by multiple goroutines, maximizing throughput. To terminate a call to Subscriber.Receive, cancel its context.
Once client code has processed the Message, it must call Message.Ack or Message.Nack. If Ack is not called, the Message will eventually be redelivered. Ack/Nack MUST be called within the Subscriber.Receive handler function, and not from a goroutine. Otherwise, flow control (e.g. ReceiveSettings.MaxOutstandingMessages) will not be respected. Additionally, messages can get orphaned when Receive is canceled, resulting in slow redelivery.
If the client cannot or does not want to process the message, it can call Message.Nack to speed redelivery. For more information and configuration options, see Ack Deadlines below.
Note: It is possible for a Message to be redelivered even if Message.Ack has been called unless exactly once delivery is enabled. Applications should be aware of these deliveries.
Note: This uses pubsub's streaming pull feature. This feature has properties that may be surprising. Please refer to https://cloud.google.com/pubsub/docs/pull#streamingpullfor more details on how streaming pull behaves.
Emulator ¶
To use an emulator with this library, you can set the PUBSUB_EMULATOR_HOST environment variable to the address at which your emulator is running. This will send requests to that address instead of to Pub/Sub. You can then create and use a client as usual:
// Set PUBSUB_EMULATOR_HOST environment variable. err := os.Setenv("PUBSUB_EMULATOR_HOST", "localhost:8085") if err != nil { // TODO: Handle error. } // Create client as usual. client, err := pubsub.NewClient(ctx, "my-project-id") if err != nil { // TODO: Handle error. } defer client.Close()
Ack Deadlines ¶
The default ack deadlines are suitable for most use cases, but may be overridden. This section describes the tradeoffs that should be considered when overriding the defaults.
Behind the scenes, each message returned by the Pub/Sub server has an associated lease, known as an "ack deadline". Unless a message is acknowledged within the ack deadline, or the client requests that the ack deadline be extended, the message will become eligible for redelivery.
As a convenience, the pubsub client will automatically extend deadlines until either:
- Message.Ack or Message.Nack is called, or
- The "MaxExtension" duration elapses from the time the message is fetched from the server. This defaults to 60m.
Ack deadlines are extended periodically by the client. The period between extensions, as well as the length of the extension, automatically adjusts based on the time it takes the subscriber application to ack messages (based on the 99th percentile of ack latency). By default, this extension period is capped at 10m, but this limit can be configured by the Min/MaxDurationPerAckExtension settings. This has the effect that subscribers that process messages quickly have their message ack deadlines extended for a short amount, whereas subscribers that process message slowly have their message ack deadlines extended for a large amount. The net effect is fewer RPCs sent from the client library.
For example, consider a subscriber that takes 3 minutes to process each message. Since the library has already recorded several 3-minute "ack latencies"s in a percentile distribution, future message extensions are sent with a value of 3 minutes, every 3 minutes. Suppose the application crashes 5 seconds after the library sends such an extension: the Pub/Sub server would wait the remaining 2m55s before re-sending the messages out to other subscribers.
Please note that the client library does not use the subscription's AckDeadline for the MaxExtension value.
Fine Tuning PubSub Receive Performance ¶
As the PubSub client receives messages from the PubSub server, it puts them into the callback function passed to Receive. The user must Ack or Nack a message in this function. Each invocation by the client of the passed-in callback occurs in a goroutine; that is, messages are processed concurrently.
The buffer holds a maximum of MaxOutstandingMessages messages or MaxOutstandingBytes bytes, and the client stops requesting more messages from the server whenever the buffer is full. Messages in the buffer have an ack deadline; that is, the server keeps a deadline for each outstanding message. When that deadline expires, the server considers the message lost and redelivers the message. Each message in the buffer automatically has its deadline periodically extended. If a message is held beyond its deadline, for example if your program hangs, the message will be redelivered.
This medium post describes tuning Pub/Sub performance in more detailhttps://medium.com/google-cloud/pub-sub-flow-control-batching-9ba9a75bce3b
- Subscription.ReceiveSettings.MaxExtension
This is the maximum amount of time that the client will extend a message's deadline. This value should be set to the maximum expected processing time, plus some buffer. It is fairly safe to set it quite high; the only downside is that it will take longer to recover from hanging programs. The higher the extension allowed, the longer it takes before the server considers messages lost and re-sends them to some other, healthy instance of your application.
- Subscription.ReceiveSettings.MaxDurationPerAckExtension
This is the maximum amount of time to extend each message's deadline per ModifyAckDeadline RPC. Normally, the deadline is determined by the 99th percentile of previous message processing times. However, if normal processing time takes 10 minutes but an error occurs while processing a message within 1 minute, a message will be stuck and held by the client for the remaining 9 minutes. By setting the maximum amount of time to extend a message's deadline on a per-RPC basis, you can decrease the amount of time before message redelivery when errors occur. However, the downside is that more ModifyAckDeadline RPCs will be sent.
- Subscription.ReceiveSettings.MinDurationPerAckExtension
This is the minimum amount of time to extend each message's deadline per ModifyAckDeadline RPC. This is the complement setting of MaxDurationPerAckExtension and represents the lower bound of modack deadlines to send. If processing time is very low, it may be better to issue fewer ModifyAckDeadline RPCs rather than every 10 seconds. Setting both Min/MaxDurationPerAckExtension to the same value effectively removes the automatic derivation of deadlines and fixes it to the value you wish to extend your messages' deadlines by each time.
- Subscription.ReceiveSettings.MaxOutstandingMessages
This is the maximum number of messages that are to be processed by the callback function at a time. Once this limit is reached, the client waits for messages to be acked or nacked by the callback before requesting more messages from the server.
This value is set by default to a fairly conservatively low number. We strongly encourage setting this number as high as memory allows, since a low setting will artificially rate limit reception. Setting this value to -1 causes it to be unbounded.
- Subscription.ReceiveSettings.MaxOutstandingBytes
This is the maximum amount of bytes (message size) that are to be processed by the callback function at a time. Once this limit is reached, the client waits for messages to be acked or nacked by the callback before requesting more messages from the server.
Note that there sometimes can be more bytes pulled and being processed than MaxOutstandingBytes allows. This is due to the fact that the server does not consider byte size when tracking server-side flow control. For example, if the client sets MaxOutstandingBytes to 50 KiB, but receives a batch of messages totaling 100 KiB, there will be a temporary overflow of message byte size until messages are acked.
Similar to MaxOutstandingMessages, we recommend setting this higher to maximize processing throughput. Setting this value to -1 causes it to be unbounded.
- Subscription.ReceiveSettings.NumGoroutines
This is the number of goroutines spawned to receive messages from the Pubsub server, where each goroutine opens a StreamingPull stream. This setting affects the rate of message intake from server to local buffer.
Setting this value to 1 is sufficient for many workloads. Each stream can handle about 10 MB/s of messages, so if your throughput is under this, set NumGoroutines=1. Reducing the number of streams can improve the performance by decreasing overhead. Currently, there is an issue where setting NumGoroutines greater than 1 results in poor behavior interacting with flow control. Since each StreamingPull stream has its own flow control, the server-side flow control will not match what is available locally.
Going above 100 streams can lead to increasingly poor behavior, such as acks/modacks not succeeding in a reasonable amount of time, leading to message expiration. In these cases, we recommend horizontally scaling by increasing the number of subscriber client applications.
General tips ¶
Each application should use a single PubSub client instead of creating many. In addition, when publishing to a single topic, a publisher should be instantiated once and reused to take advantage of flow control and batching capabilities.
const (
MaxPublishRequestCount = 1000
MaxPublishRequestBytes = 1e7)
const (
ScopePubSub = "https://www.googleapis.com/auth/pubsub"
ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform")
const (
ShutdownBehaviorWaitForProcessing = [iota](/builtin#iota)
ShutdownBehaviorNackImmediately)
const DetectProjectID = "detect-project-id"
DetectProjectID is a sentinel value that instructs NewClient to detect the project ID. It is given in place of the projectID argument. NewClient will use the project ID from the given credentials or the default credentials (https://developers.google.com/accounts/docs/application-default-credentials) if no credentials were provided. When providing credentials, not all options will allow NewClient to extract the project ID. Specifically a JWT does not have the project ID encoded.
var (
ErrFlowControllerMaxOutstandingMessages = [errors](/errors).[New](/errors#New)("pubsub: MaxOutstandingMessages flow controller limit exceeded")
ErrFlowControllerMaxOutstandingBytes = [errors](/errors).[New](/errors#New)("pubsub: MaxOutstandingBytes flow control limit exceeded"))
var (
PublishedMessages = [stats](/go.opencensus.io/stats).[Int64](/go.opencensus.io/stats#Int64)(statsPrefix+"published_messages", "Number of PubSub message published", [stats](/go.opencensus.io/stats).[UnitDimensionless](/go.opencensus.io/stats#UnitDimensionless))
PublishLatency = [stats](/go.opencensus.io/stats).[Float64](/go.opencensus.io/stats#Float64)(statsPrefix+"publish_roundtrip_latency", "The latency in milliseconds per publish batch", [stats](/go.opencensus.io/stats).[UnitMilliseconds](/go.opencensus.io/stats#UnitMilliseconds))
PullCount = [stats](/go.opencensus.io/stats).[Int64](/go.opencensus.io/stats#Int64)(statsPrefix+"pull_count", "Number of PubSub messages pulled", [stats](/go.opencensus.io/stats).[UnitDimensionless](/go.opencensus.io/stats#UnitDimensionless))
AckCount = [stats](/go.opencensus.io/stats).[Int64](/go.opencensus.io/stats#Int64)(statsPrefix+"ack_count", "Number of PubSub messages acked", [stats](/go.opencensus.io/stats).[UnitDimensionless](/go.opencensus.io/stats#UnitDimensionless))
NackCount = [stats](/go.opencensus.io/stats).[Int64](/go.opencensus.io/stats#Int64)(statsPrefix+"nack_count", "Number of PubSub messages nacked", [stats](/go.opencensus.io/stats).[UnitDimensionless](/go.opencensus.io/stats#UnitDimensionless))
ModAckCount = [stats](/go.opencensus.io/stats).[Int64](/go.opencensus.io/stats#Int64)(statsPrefix+"mod_ack_count", "Number of ack-deadlines modified", [stats](/go.opencensus.io/stats).[UnitDimensionless](/go.opencensus.io/stats#UnitDimensionless))
ModAckTimeoutCount = [stats](/go.opencensus.io/stats).[Int64](/go.opencensus.io/stats#Int64)(statsPrefix+"mod_ack_timeout_count", "Number of ModifyAckDeadline RPCs that timed out", [stats](/go.opencensus.io/stats).[UnitDimensionless](/go.opencensus.io/stats#UnitDimensionless))
StreamOpenCount = [stats](/go.opencensus.io/stats).[Int64](/go.opencensus.io/stats#Int64)(statsPrefix+"stream_open_count", "Number of calls opening a new streaming pull", [stats](/go.opencensus.io/stats).[UnitDimensionless](/go.opencensus.io/stats#UnitDimensionless))
StreamRetryCount = [stats](/go.opencensus.io/stats).[Int64](/go.opencensus.io/stats#Int64)(statsPrefix+"stream_retry_count", "Number of retries of a stream send or receive", [stats](/go.opencensus.io/stats).[UnitDimensionless](/go.opencensus.io/stats#UnitDimensionless))
StreamRequestCount = [stats](/go.opencensus.io/stats).[Int64](/go.opencensus.io/stats#Int64)(statsPrefix+"stream_request_count", "Number gRPC StreamingPull request messages sent", [stats](/go.opencensus.io/stats).[UnitDimensionless](/go.opencensus.io/stats#UnitDimensionless))
StreamResponseCount = [stats](/go.opencensus.io/stats).[Int64](/go.opencensus.io/stats#Int64)(statsPrefix+"stream_response_count", "Number of gRPC StreamingPull response messages received", [stats](/go.opencensus.io/stats).[UnitDimensionless](/go.opencensus.io/stats#UnitDimensionless))
OutstandingMessages = [stats](/go.opencensus.io/stats).[Int64](/go.opencensus.io/stats#Int64)(statsPrefix+"outstanding_messages", "Number of outstanding Pub/Sub messages", [stats](/go.opencensus.io/stats).[UnitDimensionless](/go.opencensus.io/stats#UnitDimensionless))
OutstandingBytes = [stats](/go.opencensus.io/stats).[Int64](/go.opencensus.io/stats#Int64)(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", [stats](/go.opencensus.io/stats).[UnitDimensionless](/go.opencensus.io/stats#UnitDimensionless))
PublisherOutstandingMessages = [stats](/go.opencensus.io/stats).[Int64](/go.opencensus.io/stats#Int64)(statsPrefix+"publisher_outstanding_messages", "Number of outstanding publish messages", [stats](/go.opencensus.io/stats).[UnitDimensionless](/go.opencensus.io/stats#UnitDimensionless))
PublisherOutstandingBytes = [stats](/go.opencensus.io/stats).[Int64](/go.opencensus.io/stats#Int64)(statsPrefix+"publisher_outstanding_bytes", "Number of outstanding publish bytes", [stats](/go.opencensus.io/stats).[UnitDimensionless](/go.opencensus.io/stats#UnitDimensionless)))
The following are measures recorded in publish/subscribe flows.
These arrays hold the default OpenCensus views that keep track of publish/subscribe operations. It is EXPERIMENTAL and subject to change or removal without notice.
var DefaultPublishSettings = PublishSettings{ DelayThreshold: 10 * time.Millisecond, CountThreshold: 100, ByteThreshold: 1e6, Timeout: 60 * time.Second, FlowControlSettings: FlowControlSettings{ MaxOutstandingMessages: 1000, MaxOutstandingBytes: -1, LimitExceededBehavior: FlowControlIgnore, },
EnableCompression: [false](/builtin#false),
CompressionBytesThreshold: 240,}
DefaultPublishSettings holds the default values for topics' PublishSettings.
var DefaultReceiveSettings = ReceiveSettings{ MaxExtension: 60 * time.Minute, MaxDurationPerAckExtension: 0, MinDurationPerAckExtension: 0, MaxOutstandingMessages: 1000, MaxOutstandingBytes: 1e9, NumGoroutines: 1, }
DefaultReceiveSettings holds the default values for ReceiveSettings.
ErrEmptyProjectID denotes that the project string passed into NewClient was empty. Please provide a valid project ID or use the DetectProjectID sentinel value to detect project ID from well defined sources.
ErrOversizedMessage indicates that a message's size exceeds MaxPublishRequestBytes.
ErrPublisherStopped indicates that topic has been stopped and further publishing will fail.
NewMessageCarrierFromPB creates a propagation.TextMapCarrier that can be used to extract the trace context from a protobuf PubsubMessage.
Example: ctx = propagation.TraceContext{}.Extract(ctx, pubsub.NewMessageCarrierFromPB(msg))
AckResult holds the result from a call to Ack or Nack.
Call Get to obtain the result of the Ack/NackWithResult call. Example:
// Get blocks until Ack/NackWithResult completes or ctx is done. ackStatus, err := r.Get(ctx) if err != nil { // TODO: Handle error. }
AcknowledgeStatus represents the status of an Ack or Nack request.
const (
AcknowledgeStatusSuccess [AcknowledgeStatus](#AcknowledgeStatus) = [iota](/builtin#iota)
AcknowledgeStatusPermissionDenied
AcknowledgeStatusFailedPrecondition
AcknowledgeStatusInvalidAckID
AcknowledgeStatusOther)
Client is a Pub/Sub client scoped to a single project.
Clients should be reused rather than being created as needed. A Client may be shared by multiple goroutines.
NewClient creates a new PubSub client. It uses a default configuration.
ctx := context.Background() _, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. }
// See the other examples to learn how to use the Client.
NewClientWithConfig creates a new PubSub client.
Close releases any resources held by the client, such as memory and goroutines.
If the client is available for the lifetime of the program, then Close need not be called at exit.
Project returns the project ID or number for this instance of the client, which may have either been explicitly specified or autodetected.
func (c *Client) Publisher(topicNameOrID string) *Publisher
Publisher constructs a publisher client from either a topicID or a topic name, otherwise known as a full path.
The client created is a reference and does not return any errors if the topic does not exist. Errors will be returned when attempting to Publish instead. If a Publisher's Publish method is called, it has background goroutines associated with it. Clean them up by calling Publisher.Stop.
It is best practice to reuse the Publisher when publishing to the same topic. Avoid creating many Publisher instances if you use them to publish.
Use Publisher to refer to a topic that is not in the client's project, such as a public topic.
ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } otherProjectID := "another-project-id" publisher := client.Publisher(fmt.Sprintf("projects/%s/topics/%s", otherProjectID, "my-topic")) _ = publisher // TODO: use the publisher client.
func (c *Client) Subscriber(nameOrID string) *Subscriber
Subscriber creates a subscriber client which references a single subscription.
ClientConfig has configurations for the client.
type ErrPublishingPaused struct { OrderingKey string }
ErrPublishingPaused is a custom error indicating that the publish paused for the specified ordering key.
type FlowControlSettings struct {
MaxOutstandingMessages [int](/builtin#int)
MaxOutstandingBytes [int](/builtin#int)
LimitExceededBehavior [LimitExceededBehavior](#LimitExceededBehavior)}
FlowControlSettings controls flow control for messages while publishing or subscribing.
type LimitExceededBehavior int
LimitExceededBehavior configures the behavior that flowController can use in case the flow control limits are exceeded.
const (
FlowControlIgnore [LimitExceededBehavior](#LimitExceededBehavior) = [iota](/builtin#iota)
FlowControlBlock
FlowControlSignalError)
Message represents a Pub/Sub message.
Message can be passed to Publisher.Publish for publishing.
If received in the callback passed to Subscription.Receive, client code must call Message.Ack or Message.Nack when finished processing the Message. Calls to Ack or Nack have no effect after the first call.
Ack indicates successful processing of a Message. If message acknowledgement fails, the Message will be redelivered. Nack indicates that the client will not or cannot process a Message. Nack will result in the Message being redelivered more quickly than if it were allowed to expire.
If using exactly once delivery, you should call Message.AckWithResult and Message.NackWithResult instead. These methods will return an AckResult, which tracks the state of acknowledgement operation. If the AckResult returns successful, the message is guaranteed NOT to be re-delivered. Otherwise, the AckResult will return an error with more details about the failure and the message may be re-delivered.
A PublishResult holds the result from a call to Publish.
Call Get to obtain the result of the Publish call. Example:
// Get blocks until Publish completes or ctx is done. id, err := r.Get(ctx) if err != nil { // TODO: Handle error. }
type PublishSettings struct {
DelayThreshold [time](/time).[Duration](/time#Duration)
CountThreshold [int](/builtin#int)
ByteThreshold [int](/builtin#int)
NumGoroutines [int](/builtin#int)
Timeout [time](/time).[Duration](/time#Duration)
FlowControlSettings [FlowControlSettings](#FlowControlSettings)
EnableCompression [bool](/builtin#bool)
CompressionBytesThreshold [int](/builtin#int)}
PublishSettings control the bundling of published messages.
type Publisher struct {
PublishSettings [PublishSettings](#PublishSettings)
EnableMessageOrdering [bool](/builtin#bool)}
Publisher is a reference to a PubSub publisher, associated with a single topic.
The methods of Publisher are safe for use by multiple goroutines.
func (t *Publisher) Flush()
Flush blocks until all remaining messages are sent.
ID returns the unique identifier of the topic within its project.
Publish publishes msg to the topic asynchronously. Messages are batched and sent according to the topic's PublishSettings. Publish never blocks.
Publish returns a non-nil PublishResult which will be ready when the message has been sent (or has failed to be sent) to the server.
Publish creates goroutines for batching and sending messages. These goroutines need to be stopped by calling t.Stop(). Once stopped, future calls to Publish will immediately return a PublishResult with an error.
ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. }
publisher := client.Publisher("topicName") defer publisher.Stop() var results []*pubsub.PublishResult r := publisher.Publish(ctx, &pubsub.Message{ Data: []byte("hello world"), }) results = append(results, r) // Do other work ... for _, r := range results { id, err := r.Get(ctx) if err != nil { // TODO: Handle error. } fmt.Printf("Published a message with a message ID: %s\n", id) }
func (t *Publisher) ResumePublish(orderingKey string)
ResumePublish resumes accepting messages for the provided ordering key. Publishing using an ordering key might be paused if an error is encountered while publishing, to prevent messages from being published out of order.
func (t *Publisher) Stop()
Stop sends all remaining published messages and stop goroutines created for handling publishing. Returns once all outstanding messages have been sent or have failed to be sent.
String returns the printable globally unique name for the topic.
type ReceiveSettings struct {
MaxExtension [time](/time).[Duration](/time#Duration)
MaxDurationPerAckExtension [time](/time).[Duration](/time#Duration)
MinDurationPerAckExtension [time](/time).[Duration](/time#Duration)
MaxOutstandingMessages [int](/builtin#int)
MaxOutstandingBytes [int](/builtin#int)
EnablePerStreamFlowControl [bool](/builtin#bool)
NumGoroutines [int](/builtin#int)
ShutdownOptions *[ShutdownOptions](#ShutdownOptions)}
ReceiveSettings configure the Receive method. A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
type ShutdownBehavior int
ShutdownBehavior defines the strategy the subscriber should take when shutting down. Current options are graceful shutdown vs nacking messages.
ShutdownOptions configures the shutdown behavior of the subscriber. When ShutdownOptions is nil, the client library will assume disabled/infinite timeout.
Warning: The interaction between Timeout and Behavior might be surprising. Read about the interaction of these below to ensure you get the desired behavior.
type Subscriber struct {
ReceiveSettings [ReceiveSettings](#ReceiveSettings)}
Subscriber is a subscriber client that references a subscription.
ID returns the unique identifier of the subscription within its project.
Receive calls f with the outstanding messages from the subscription. It blocks until ctx is done, or the service returns a non-retryable error.
The standard way to terminate a Receive is to cancel its context:
cctx, cancel := context.WithCancel(ctx) err := sub.Receive(cctx, callback) // Call cancel from callback, or another goroutine.
If the service returns a non-retryable error, Receive returns that error after all of the outstanding calls to f have returned. If ctx is done, Receive returns nil after all of the outstanding calls to f have returned and all messages have been acknowledged or have expired.
Receive calls f concurrently from multiple goroutines. It is encouraged to process messages synchronously in f, even if that processing is relatively time-consuming; Receive will spawn new goroutines for incoming messages, limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings.
The context passed to f will be canceled when ctx is Done or there is a fatal service error.
Receive will send an ack deadline extension on message receipt, then automatically extend the ack deadline of all fetched Messages up to the period specified by s.ReceiveSettings.MaxExtension.
Each Subscriber may have only one invocation of Receive active at a time.
ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } // Can use either "projects/project-id/subscriptions/sub-id" or just "sub-id" here sub := client.Subscriber("sub-id") err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { // TODO: Handle message. // NOTE: May be called concurrently; synchronize access to shared memory. m.Ack() }) if err != nil && !errors.Is(err, context.Canceled) { // TODO: Handle error. }
This example shows how to configure keepalive so that unacknowledged messages expire quickly, allowing other subscribers to take them.
ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscriber("subNameOrID") // This program is expected to process and acknowledge messages in 30 seconds. If // not, the Pub/Sub API will assume the message is not acknowledged. sub.ReceiveSettings.MaxExtension = 30 * time.Second err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { // TODO: Handle message. m.Ack() }) if err != nil && !errors.Is(err, context.Canceled) { // TODO: Handle error. }
This example shows how to throttle Subscription.Receive, which aims for high throughput by default. By limiting the number of messages and/or bytes being processed at once, you can bound your program's resource consumption.
ctx := context.Background() client, err := pubsub.NewClient(ctx, "project-id") if err != nil { // TODO: Handle error. } sub := client.Subscriber("subNameOrID") sub.ReceiveSettings.MaxOutstandingMessages = 5 sub.ReceiveSettings.MaxOutstandingBytes = 10e6 err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { // TODO: Handle message. m.Ack() }) if err != nil && !errors.Is(err, context.Canceled) { // TODO: Handle error. }
String returns the globally unique printable name of the subscription.