Support sharded pubsub commands by mgravell · Pull Request #2887 · StackExchange/StackExchange.Redis (original) (raw)
@@ -29,7 +29,7 @@ internal sealed partial class PhysicalConnection : IDisposable
private const int DefaultRedisDatabaseCount = 16;
private static readonly CommandBytes message = "message", pmessage = "pmessage";
private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage";
private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(
i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray();
@@ -276,7 +276,11 @@ private enum ReadMode : byte
private RedisProtocol _protocol; // note starts at **zero**, not RESP2
public RedisProtocol? Protocol => _protocol == 0 ? null : _protocol;
internal void SetProtocol(RedisProtocol value) => _protocol = value;
internal void SetProtocol(RedisProtocol value)
{
_protocol = value;
BridgeCouldBeNull?.SetProtocol(value);
}
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times", Justification = "Trust me yo")]
internal void Shutdown()
@@ -384,7 +388,7 @@ public void RecordConnectionFailed(
bool isInitialConnect = false,
IDuplexPipe? connectingPipe = null)
{
bool weAskedForThis = false;
bool weAskedForThis;
Exception? outerException = innerException;
IdentifyFailureType(innerException, ref failureType);
var bridge = BridgeCouldBeNull;
@@ -1644,9 +1648,9 @@ private void MatchResult(in RawResult result)
// out of band message does not match to a queued message
var items = result.GetItems();
if (items.Length >= 3 && items[0].IsEqual(message))
if (items.Length >= 3 && (items[0].IsEqual(message) || items[0].IsEqual(smessage)))
{
_readStatus = ReadStatus.PubSubMessage;
_readStatus = items[0].IsEqual(message) ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage;
// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)
var configChanged = muxer.ConfigurationChangedChannel;
@@ -1668,8 +1672,17 @@ private void MatchResult(in RawResult result)
}
// invoke the handlers
var channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
Trace("MESSAGE: " + channel);
RedisChannel channel;
if (items[0].IsEqual(message))
{
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None);
Trace("MESSAGE: " + channel);
}
else // see check on outer-if that restricts to message / smessage
{
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded);
Trace("SMESSAGE: " + channel);
}
if (!channel.IsNull)
{
if (TryGetPubSubPayload(items[2], out var payload))
@@ -1690,27 +1703,30 @@ private void MatchResult(in RawResult result)
{
_readStatus = ReadStatus.PubSubPMessage;
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
Trace("PMESSAGE: " + channel);
if (!channel.IsNull)
{
if (TryGetPubSubPayload(items[3], out var payload))
{
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(sub, channel, payload);
}
else if (TryGetMultiPubSubPayload(items[3], out var payloads))
{
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(sub, channel, payloads);
}
}
return; // AND STOP PROCESSING!
}
// if it didn't look like "[p]message", then we still need to process the pending queue
// if it didn't look like "[p|s]message", then we still need to process the pending queue
}
Trace("Matching result...");
@@ -2110,6 +2126,7 @@ internal enum ReadStatus
MatchResult,
PubSubMessage,
PubSubPMessage,
PubSubSMessage,
Reconfigure,
InvokePubSub,
ResponseSequenceCheck, // high-integrity mode only