Pub/Sub fixes for subscribe/re-subscribe by NickCraver · Pull Request #1947 · StackExchange/StackExchange.Redis (original) (raw)
@@ -8,7 +8,6 @@
using System.Threading.Channels;
using System.Threading.Tasks;
using static StackExchange.Redis.ConnectionMultiplexer;
using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState;
#if !NETCOREAPP
using Pipelines.Sockets.Unofficial.Threading;
using static Pipelines.Sockets.Unofficial.Threading.MutexSlim;
@@ -102,7 +101,6 @@ public enum State : byte
public void Dispose()
{
isDisposed = true;
ShutdownSubscriptionQueue();
using (var tmp = physical)
{
physical = null;
@@ -220,71 +218,6 @@ internal void GetCounters(ConnectionCounters counters)
physical?.GetCounters(counters);
}
private Channel _subscriptionBackgroundQueue;
private static readonly UnboundedChannelOptions s_subscriptionQueueOptions = new UnboundedChannelOptions
{
AllowSynchronousContinuations = false, // we do *not* want the async work to end up on the caller's thread
SingleReader = true, // only one reader will be started per channel
SingleWriter = true, // writes will be synchronized, because order matters
};
private Channel GetSubscriptionQueue()
{
var queue = _subscriptionBackgroundQueue;
if (queue == null)
{
queue = Channel.CreateUnbounded(s_subscriptionQueueOptions);
var existing = Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, queue, null);
if (existing != null) return existing; // we didn't win, but that's fine
// we won (_subqueue is now queue)
// this means we have a new channel without a reader; let's fix that!
Task.Run(() => ExecuteSubscriptionLoop());
}
return queue;
}
private void ShutdownSubscriptionQueue()
{
try
{
Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, null, null)?.Writer.TryComplete();
}
catch { }
}
private async Task ExecuteSubscriptionLoop() // pushes items that have been enqueued over the bridge
{
// note: this will execute on the default pool rather than our dedicated pool; I'm... OK with this
var queue = _subscriptionBackgroundQueue ?? Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, null, null); // just to be sure we can read it!
try
{
while (await queue.Reader.WaitToReadAsync().ForAwait() && queue.Reader.TryRead(out var next))
{
try
{
// Treat these commands as background/handshake and do not allow queueing to backlog
if ((await TryWriteAsync(next.Message, next.IsReplica).ForAwait()) != WriteResult.Success)
{
next.Abort();
}
}
catch (Exception ex)
{
next.Fail(ex);
}
}
}
catch (Exception ex)
{
Multiplexer.OnInternalError(ex, ServerEndPoint?.EndPoint, ConnectionType);
}
}
internal bool TryEnqueueBackgroundSubscriptionWrite(in PendingSubscriptionState state)
=> !isDisposed && (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state);
internal readonly struct BridgeStatus
{
///