Subscription unsubscribes unexpectedly (original) (raw)

Greetings.

We face a situation when sometimes subscriptions get unsubscribed on their own will.

The application code looks simple:

_redis = ConnectionMultiplexer.Connect(_redisConnectionString);
_subscriber = _redis.GetSubscriber();
_subscriber.Subscribe("inbox-channel").OnMessage(OnMessage);

The ConnectionMultiplexer log has nothing suspicious:

Connecting 172.22.0.2:6379/Interactive...
BeginConnect: 172.22.0.2:6379
Connected Interactive/172.22.0.2:6379
Server handshake
Authenticating (user/password)
Setting client name: SvcInboxApp
Auto-configure...
Sending critical tracer: Interactive/172.22.0.2:6379
Writing to Interactive/172.22.0.2:6379: ECHO
Flushing outbound buffer
Starting read
Response from Interactive/172.22.0.2:6379 / ECHO: BulkString: 16 bytes
1 unique nodes specified
Writing to Interactive/172.22.0.2:6379: PING
Requesting tie-break from 172.22.0.2:6379 > __Booksleeve_TieBreak...
Writing to Interactive/172.22.0.2:6379: GET __Booksleeve_TieBreak
Allowing endpoints 00:00:40 to respond...
Awaiting task completion, IOCP: (Busy=0,Free=1000,Min=2,Max=1000), WORKER: (Busy=3,Free=32764,Min=2,Max=32767)
Response from Interactive/172.22.0.2:6379 / PING: SimpleString: PONG
Response from Interactive/172.22.0.2:6379 / GET __Booksleeve_TieBreak: (null)
All tasks completed cleanly, IOCP: (Busy=0,Free=1000,Min=2,Max=1000), WORKER: (Busy=3,Free=32764,Min=2,Max=32767)
172.22.0.2:6379 returned with success
Waiting for tiebreakers...
All tasks are already complete
172.22.0.2:6379 had no tiebreaker set
Single master detected: 172.22.0.2:6379
172.22.0.2:6379: Standalone v2.0.0, master; keep-alive: 00:01:00; int: ConnectedEstablished; sub: ConnectedEstablishing
172.22.0.2:6379: int ops=11, qu=0, qs=0, qc=0, wr=0, socks=1; sub ops=4, qu=0, qs=4, qc=0, wr=0, socks=1
Circular op-count snapshot; int: 0+11=11 (1.10 ops/s; spans 10s); sub: 0+4=4 (0.40 ops/s; spans 10s)
Sync timeouts: 0; async timeouts: 0; fire and forget: 0; last heartbeat: -1s ago
Starting heartbeat...

But when we recompiled the library with VERBOSE flag and captured the traces, we got:

08:17:28.621 +00:00|OnHeartbeat: ConnectedEstablished
08:17:28.621 +00:00|Subscription/172.22.0.2:6379|OnHeartbeat: ConnectedEstablished
08:17:29.607 +00:00|ReconfigureAsync|172.22.0.2:6379: WaitingForActivation
08:17:29.608 +00:00|172.22.0.2:6379|Now unusable: DidNotRespond
08:17:29.608 +00:00|ReconfigureAsync|Exiting reconfiguration...
08:17:29.609 +00:00|Subscription/172.22.0.2:6379|Writing: [-1]:UNSUBSCRIBE inbox-channel (TrackSubscriptionsProcessor)
08:17:29.610 +00:00|Subscription#3@172.22.0.2:6379 (Idle)|MultiBulk: 3 items
08:17:29.610 +00:00|Subscription#3@172.22.0.2:6379 (Idle)|Matching result...
08:17:29.610 +00:00|Subscription#3@172.22.0.2:6379 (Idle)|Response to: [-1]:UNSUBSCRIBE inbox-channel (TrackSubscriptionsProcessor)
08:17:29.611 +00:00|StackExchange.Redis.ResultProcessor+TrackSubscriptionsProcessor|Completed with success: MultiBulk: 3 items (TrackSubscriptionsProcessor)
08:17:29.611 +00:00|Subscription#3@172.22.0.2:6379 (Idle)|Processed 1 messages
08:17:29.621 +00:00|OnHeartbeat|heartbeat
08:17:29.621 +00:00|Interactive/172.22.0.2:6379|OnHeartbeat: ConnectedEstablished
08:17:29.621 +00:00|172.22.0.2:6379|Now usable
08:17:29.621 +00:00|Subscription/172.22.0.2:6379|OnHeartbeat: ConnectedEstablished
08:17:30.621 +00:00|OnHeartbeat|heartbeat
08:17:30.622 +00:00|Interactive/172.22.0.2:6379|OnHeartbeat: ConnectedEstablished
08:17:30.622 +00:00|Subscription/172.22.0.2:6379|OnHeartbeat: ConnectedEstablished

Please pay attention to the lines:

08:17:29.607 +00:00|ReconfigureAsync|172.22.0.2:6379: WaitingForActivation
08:17:29.608 +00:00|172.22.0.2:6379|Now unusable: DidNotRespond
08:17:29.609 +00:00|Subscription/172.22.0.2:6379|Writing: [-1]:UNSUBSCRIBE inbox-channel (TrackSubscriptionsProcessor)
08:17:29.621 +00:00|172.22.0.2:6379|Now usable

They mean that, for some reason, the ConnectionMultiplexer decided to unsubscribe and did not restore the subscription later.

Looking at the source code here gives some glues.

Trace(Format.ToString(endpoints[i]) + ": " + task.Status);
if (task.IsFaulted)
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
var aex = task.Exception;
foreach (var ex in aex.InnerExceptions)
{
log?.WriteLine($"{Format.ToString(endpoints[i])} faulted: {ex.Message}");
failureMessage = ex.Message;
}
}
else if (task.IsCanceled)
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
log?.WriteLine($"{Format.ToString(endpoints[i])} was canceled");
}
else if (task.IsCompleted)
{
var server = servers[i];
if (task.Result)
{
servers[i].ClearUnselectable(UnselectableFlags.DidNotRespond);
log?.WriteLine($"{Format.ToString(endpoints[i])} returned with success");
// count the server types
switch (server.ServerType)
{
case ServerType.Twemproxy:
case ServerType.Standalone:
standaloneCount++;
break;
case ServerType.Sentinel:
sentinelCount++;
break;
case ServerType.Cluster:
clusterCount++;
break;
}
if (clusterCount > 0 && !encounteredConnectedClusterServer)
{
// we have encountered a connected server with clustertype for the first time.
// so we will get list of other nodes from this server using "CLUSTER NODES" command
// and try to connect to these other nodes in the next iteration
encounteredConnectedClusterServer = true;
updatedClusterEndpointCollection = await GetEndpointsFromClusterNodes(server, log).ForAwait();
}
// set the server UnselectableFlags and update masters list
switch (server.ServerType)
{
case ServerType.Twemproxy:
case ServerType.Sentinel:
case ServerType.Standalone:
case ServerType.Cluster:
servers[i].ClearUnselectable(UnselectableFlags.ServerType);
if (server.IsReplica)
{
servers[i].ClearUnselectable(UnselectableFlags.RedundantMaster);
}
else
{
masters.Add(server);
}
break;
default:
servers[i].SetUnselectable(UnselectableFlags.ServerType);
break;
}
}
else
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
log?.WriteLine($"{Format.ToString(endpoints[i])} returned, but incorrectly");
}
}
else
{
servers[i].SetUnselectable(UnselectableFlags.DidNotRespond);
log?.WriteLine($"{Format.ToString(endpoints[i])} did not respond");
}
}

The multiplexer sends ping/echo messages to the server and expects the server to respond in a timeout. No/wrong/failed response - the server is considered unreachable, and subscriptions are dropped.

But in our case, the ping task is in WaitingForActivation state. I guess (due to high application load?), it has not even started to execute.

The behavior seems like a bug for me. And we'd appreciate if it could be addressed.