Support sharded pubsub commands by mgravell · Pull Request #2887 · StackExchange/StackExchange.Redis (original) (raw)

Expand Up

@@ -29,7 +29,7 @@ internal sealed partial class PhysicalConnection : IDisposable

private const int DefaultRedisDatabaseCount = 16;

private static readonly CommandBytes message = "message", pmessage = "pmessage";

private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage";

private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(

i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray();

Expand Down Expand Up

@@ -276,7 +276,11 @@ private enum ReadMode : byte

private RedisProtocol _protocol; // note starts at **zero**, not RESP2

public RedisProtocol? Protocol => _protocol == 0 ? null : _protocol;

internal void SetProtocol(RedisProtocol value) => _protocol = value;

internal void SetProtocol(RedisProtocol value)

{

_protocol = value;

BridgeCouldBeNull?.SetProtocol(value);

}

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times", Justification = "Trust me yo")]

internal void Shutdown()

Expand Down Expand Up

@@ -384,7 +388,7 @@ public void RecordConnectionFailed(

bool isInitialConnect = false,

IDuplexPipe? connectingPipe = null)

{

bool weAskedForThis = false;

bool weAskedForThis;

Exception? outerException = innerException;

IdentifyFailureType(innerException, ref failureType);

var bridge = BridgeCouldBeNull;

Expand Down Expand Up

@@ -1644,9 +1648,9 @@ private void MatchResult(in RawResult result)

// out of band message does not match to a queued message

var items = result.GetItems();

if (items.Length >= 3 && items[0].IsEqual(message))

if (items.Length >= 3 && (items[0].IsEqual(message) || items[0].IsEqual(smessage)))

{

_readStatus = ReadStatus.PubSubMessage;

_readStatus = items[0].IsEqual(message) ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage;

// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)

var configChanged = muxer.ConfigurationChangedChannel;

Expand All

@@ -1668,8 +1672,17 @@ private void MatchResult(in RawResult result)

}

// invoke the handlers

var channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);

Trace("MESSAGE: " + channel);

RedisChannel channel;

if (items[0].IsEqual(message))

{

channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None);

Trace("MESSAGE: " + channel);

}

else // see check on outer-if that restricts to message / smessage

{

channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded);

Trace("SMESSAGE: " + channel);

}

if (!channel.IsNull)

{

if (TryGetPubSubPayload(items[2], out var payload))

Expand All

@@ -1690,27 +1703,30 @@ private void MatchResult(in RawResult result)

{

_readStatus = ReadStatus.PubSubPMessage;

var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);

var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);

Trace("PMESSAGE: " + channel);

if (!channel.IsNull)

{

if (TryGetPubSubPayload(items[3], out var payload))

{

var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);

var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);

_readStatus = ReadStatus.InvokePubSub;

muxer.OnMessage(sub, channel, payload);

}

else if (TryGetMultiPubSubPayload(items[3], out var payloads))

{

var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);

var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);

_readStatus = ReadStatus.InvokePubSub;

muxer.OnMessage(sub, channel, payloads);

}

}

return; // AND STOP PROCESSING!

}

// if it didn't look like "[p]message", then we still need to process the pending queue

// if it didn't look like "[p|s]message", then we still need to process the pending queue

}

Trace("Matching result...");

Expand Down Expand Up

@@ -2110,6 +2126,7 @@ internal enum ReadStatus

MatchResult,

PubSubMessage,

PubSubPMessage,

PubSubSMessage,

Reconfigure,

InvokePubSub,

ResponseSequenceCheck, // high-integrity mode only

Expand Down