Subscription unsubscribes unexpectedly (original) (raw)
Greetings.
We face a situation when sometimes subscriptions get unsubscribed on their own will.
The application code looks simple:
_redis = ConnectionMultiplexer.Connect(_redisConnectionString);
_subscriber = _redis.GetSubscriber();
_subscriber.Subscribe("inbox-channel").OnMessage(OnMessage);
The ConnectionMultiplexer log has nothing suspicious:
Connecting 172.22.0.2:6379/Interactive...
BeginConnect: 172.22.0.2:6379
Connected Interactive/172.22.0.2:6379
Server handshake
Authenticating (user/password)
Setting client name: SvcInboxApp
Auto-configure...
Sending critical tracer: Interactive/172.22.0.2:6379
Writing to Interactive/172.22.0.2:6379: ECHO
Flushing outbound buffer
Starting read
Response from Interactive/172.22.0.2:6379 / ECHO: BulkString: 16 bytes
1 unique nodes specified
Writing to Interactive/172.22.0.2:6379: PING
Requesting tie-break from 172.22.0.2:6379 > __Booksleeve_TieBreak...
Writing to Interactive/172.22.0.2:6379: GET __Booksleeve_TieBreak
Allowing endpoints 00:00:40 to respond...
Awaiting task completion, IOCP: (Busy=0,Free=1000,Min=2,Max=1000), WORKER: (Busy=3,Free=32764,Min=2,Max=32767)
Response from Interactive/172.22.0.2:6379 / PING: SimpleString: PONG
Response from Interactive/172.22.0.2:6379 / GET __Booksleeve_TieBreak: (null)
All tasks completed cleanly, IOCP: (Busy=0,Free=1000,Min=2,Max=1000), WORKER: (Busy=3,Free=32764,Min=2,Max=32767)
172.22.0.2:6379 returned with success
Waiting for tiebreakers...
All tasks are already complete
172.22.0.2:6379 had no tiebreaker set
Single master detected: 172.22.0.2:6379
172.22.0.2:6379: Standalone v2.0.0, master; keep-alive: 00:01:00; int: ConnectedEstablished; sub: ConnectedEstablishing
172.22.0.2:6379: int ops=11, qu=0, qs=0, qc=0, wr=0, socks=1; sub ops=4, qu=0, qs=4, qc=0, wr=0, socks=1
Circular op-count snapshot; int: 0+11=11 (1.10 ops/s; spans 10s); sub: 0+4=4 (0.40 ops/s; spans 10s)
Sync timeouts: 0; async timeouts: 0; fire and forget: 0; last heartbeat: -1s ago
Starting heartbeat...
But when we recompiled the library with VERBOSE flag and captured the traces, we got:
08:17:28.621 +00:00|OnHeartbeat: ConnectedEstablished
08:17:28.621 +00:00|Subscription/172.22.0.2:6379|OnHeartbeat: ConnectedEstablished
08:17:29.607 +00:00|ReconfigureAsync|172.22.0.2:6379: WaitingForActivation
08:17:29.608 +00:00|172.22.0.2:6379|Now unusable: DidNotRespond
08:17:29.608 +00:00|ReconfigureAsync|Exiting reconfiguration...
08:17:29.609 +00:00|Subscription/172.22.0.2:6379|Writing: [-1]:UNSUBSCRIBE inbox-channel (TrackSubscriptionsProcessor)
08:17:29.610 +00:00|Subscription#3@172.22.0.2:6379 (Idle)|MultiBulk: 3 items
08:17:29.610 +00:00|Subscription#3@172.22.0.2:6379 (Idle)|Matching result...
08:17:29.610 +00:00|Subscription#3@172.22.0.2:6379 (Idle)|Response to: [-1]:UNSUBSCRIBE inbox-channel (TrackSubscriptionsProcessor)
08:17:29.611 +00:00|StackExchange.Redis.ResultProcessor+TrackSubscriptionsProcessor|Completed with success: MultiBulk: 3 items (TrackSubscriptionsProcessor)
08:17:29.611 +00:00|Subscription#3@172.22.0.2:6379 (Idle)|Processed 1 messages
08:17:29.621 +00:00|OnHeartbeat|heartbeat
08:17:29.621 +00:00|Interactive/172.22.0.2:6379|OnHeartbeat: ConnectedEstablished
08:17:29.621 +00:00|172.22.0.2:6379|Now usable
08:17:29.621 +00:00|Subscription/172.22.0.2:6379|OnHeartbeat: ConnectedEstablished
08:17:30.621 +00:00|OnHeartbeat|heartbeat
08:17:30.622 +00:00|Interactive/172.22.0.2:6379|OnHeartbeat: ConnectedEstablished
08:17:30.622 +00:00|Subscription/172.22.0.2:6379|OnHeartbeat: ConnectedEstablished
Please pay attention to the lines:
08:17:29.607 +00:00|ReconfigureAsync|172.22.0.2:6379: WaitingForActivation
08:17:29.608 +00:00|172.22.0.2:6379|Now unusable: DidNotRespond
08:17:29.609 +00:00|Subscription/172.22.0.2:6379|Writing: [-1]:UNSUBSCRIBE inbox-channel (TrackSubscriptionsProcessor)
08:17:29.621 +00:00|172.22.0.2:6379|Now usable
They mean that, for some reason, the ConnectionMultiplexer decided to unsubscribe and did not restore the subscription later.
Looking at the source code here gives some glues.
| Trace(Format.ToString(endpoints[i]) + ": " + task.Status); |
|---|
| if (task.IsFaulted) |
| { |
| servers[i].SetUnselectable(UnselectableFlags.DidNotRespond); |
| var aex = task.Exception; |
| foreach (var ex in aex.InnerExceptions) |
| { |
| log?.WriteLine($"{Format.ToString(endpoints[i])} faulted: {ex.Message}"); |
| failureMessage = ex.Message; |
| } |
| } |
| else if (task.IsCanceled) |
| { |
| servers[i].SetUnselectable(UnselectableFlags.DidNotRespond); |
| log?.WriteLine($"{Format.ToString(endpoints[i])} was canceled"); |
| } |
| else if (task.IsCompleted) |
| { |
| var server = servers[i]; |
| if (task.Result) |
| { |
| servers[i].ClearUnselectable(UnselectableFlags.DidNotRespond); |
| log?.WriteLine($"{Format.ToString(endpoints[i])} returned with success"); |
| // count the server types |
| switch (server.ServerType) |
| { |
| case ServerType.Twemproxy: |
| case ServerType.Standalone: |
| standaloneCount++; |
| break; |
| case ServerType.Sentinel: |
| sentinelCount++; |
| break; |
| case ServerType.Cluster: |
| clusterCount++; |
| break; |
| } |
| if (clusterCount > 0 && !encounteredConnectedClusterServer) |
| { |
| // we have encountered a connected server with clustertype for the first time. |
| // so we will get list of other nodes from this server using "CLUSTER NODES" command |
| // and try to connect to these other nodes in the next iteration |
| encounteredConnectedClusterServer = true; |
| updatedClusterEndpointCollection = await GetEndpointsFromClusterNodes(server, log).ForAwait(); |
| } |
| // set the server UnselectableFlags and update masters list |
| switch (server.ServerType) |
| { |
| case ServerType.Twemproxy: |
| case ServerType.Sentinel: |
| case ServerType.Standalone: |
| case ServerType.Cluster: |
| servers[i].ClearUnselectable(UnselectableFlags.ServerType); |
| if (server.IsReplica) |
| { |
| servers[i].ClearUnselectable(UnselectableFlags.RedundantMaster); |
| } |
| else |
| { |
| masters.Add(server); |
| } |
| break; |
| default: |
| servers[i].SetUnselectable(UnselectableFlags.ServerType); |
| break; |
| } |
| } |
| else |
| { |
| servers[i].SetUnselectable(UnselectableFlags.DidNotRespond); |
| log?.WriteLine($"{Format.ToString(endpoints[i])} returned, but incorrectly"); |
| } |
| } |
| else |
| { |
| servers[i].SetUnselectable(UnselectableFlags.DidNotRespond); |
| log?.WriteLine($"{Format.ToString(endpoints[i])} did not respond"); |
| } |
| } |
The multiplexer sends ping/echo messages to the server and expects the server to respond in a timeout. No/wrong/failed response - the server is considered unreachable, and subscriptions are dropped.
But in our case, the ping task is in WaitingForActivation state. I guess (due to high application load?), it has not even started to execute.
The behavior seems like a bug for me. And we'd appreciate if it could be addressed.