Pub/Sub fixes for subscribe/re-subscribe by NickCraver · Pull Request #1947 · StackExchange/StackExchange.Redis (original) (raw)

Expand Up

@@ -8,7 +8,6 @@

using System.Threading.Channels;

using System.Threading.Tasks;

using static StackExchange.Redis.ConnectionMultiplexer;

using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState;

#if !NETCOREAPP

using Pipelines.Sockets.Unofficial.Threading;

using static Pipelines.Sockets.Unofficial.Threading.MutexSlim;

Expand Down Expand Up

@@ -102,7 +101,6 @@ public enum State : byte

public void Dispose()

{

isDisposed = true;

ShutdownSubscriptionQueue();

using (var tmp = physical)

{

physical = null;

Expand Down Expand Up

@@ -220,71 +218,6 @@ internal void GetCounters(ConnectionCounters counters)

physical?.GetCounters(counters);

}

private Channel _subscriptionBackgroundQueue;

private static readonly UnboundedChannelOptions s_subscriptionQueueOptions = new UnboundedChannelOptions

{

AllowSynchronousContinuations = false, // we do *not* want the async work to end up on the caller's thread

SingleReader = true, // only one reader will be started per channel

SingleWriter = true, // writes will be synchronized, because order matters

};

private Channel GetSubscriptionQueue()

{

var queue = _subscriptionBackgroundQueue;

if (queue == null)

{

queue = Channel.CreateUnbounded(s_subscriptionQueueOptions);

var existing = Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, queue, null);

if (existing != null) return existing; // we didn't win, but that's fine

// we won (_subqueue is now queue)

// this means we have a new channel without a reader; let's fix that!

Task.Run(() => ExecuteSubscriptionLoop());

}

return queue;

}

private void ShutdownSubscriptionQueue()

{

try

{

Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, null, null)?.Writer.TryComplete();

}

catch { }

}

private async Task ExecuteSubscriptionLoop() // pushes items that have been enqueued over the bridge

{

// note: this will execute on the default pool rather than our dedicated pool; I'm... OK with this

var queue = _subscriptionBackgroundQueue ?? Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, null, null); // just to be sure we can read it!

try

{

while (await queue.Reader.WaitToReadAsync().ForAwait() && queue.Reader.TryRead(out var next))

{

try

{

// Treat these commands as background/handshake and do not allow queueing to backlog

if ((await TryWriteAsync(next.Message, next.IsReplica).ForAwait()) != WriteResult.Success)

{

next.Abort();

}

}

catch (Exception ex)

{

next.Fail(ex);

}

}

}

catch (Exception ex)

{

Multiplexer.OnInternalError(ex, ServerEndPoint?.EndPoint, ConnectionType);

}

}

internal bool TryEnqueueBackgroundSubscriptionWrite(in PendingSubscriptionState state)

=> !isDisposed && (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state);

internal readonly struct BridgeStatus

{

///

Expand Down