ISubscriber.UnsubscribeAsync Memory Leak (original) (raw)

Version: 2.6.111 (Latest Stable)

I've been having an issue with UnsubscribeAsync(Channel, Handler) where some object are being held by ConnectionMultiplexer.Subscribtion even after Unsubscribe has been called.

ConnectionMultiplexer.GetCounters().Subscription.Subscriptions also never returns to Zero after all my objects are unsubscribed.

This issue only seems to occur when there is a lot of concurrency.

I was able to reproduce it consistently with this small example:

using System.Collections.Concurrent;
using StackExchange.Redis;

public static class Program
{
    public static async Task Main(string[] args)
    {
        ConnectionMultiplexer pubsub = await ConnectionMultiplexer.ConnectAsync("localhost:6379");

        {
            const int ELEMENTS = 1000;
            //--------------------
            // Create Subscribers
            List<Subscriber> subscribers = new List<Subscriber>(ELEMENTS);
            for (int i = 0; i < ELEMENTS; i++)
                subscribers.Add(new Subscriber(pubsub.GetSubscriber()));

            //--------------------
            // Initialize Subscribers
            List<Func<Task>> initializes = new List<Func<Task>>(ELEMENTS);
            for (int i = 0; i < ELEMENTS; i++)
            {
                int index = i;
                initializes.Add(async () => { await subscribers[index].Initialize(); });
            }
            await Task.WhenAll(initializes.AsParallel().Select(async task => await task()));

            //--------------------
            // Publish Data
            await pubsub.GetSubscriber().PublishAsync(Subscriber.CHANNEL, "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789");
            await Task.Delay(1000);

            //--------------------
            // Shutdown Subscribers
            List<Func<Task>> shutdowns = new List<Func<Task>>(ELEMENTS);
            for (int i = 0; i < ELEMENTS; i++)
            {
                int index = i;
                shutdowns.Add(async () => { await subscribers[index].Shutdown(); });
            }
            await Task.WhenAll(shutdowns.AsParallel().Select(async task => await task()));
        }

        //--------------------
        // Random Delay + GC.Collect
        await Task.Delay(1000);
        GC.Collect();
        await Task.Delay(1000);

        //--------------------
        // Issue => Subscription leaked Subs != Refs
        Console.WriteLine($"Subs: {pubsub.GetCounters().Subscription.Subscriptions}");
        Console.WriteLine($"Refs: {Subscriber.REF_COUNT}");

        //If you were to take a heap snapshot here, you'd see a random number of Subscribers being held by ConnectionMultiplexer.Subscription
    }

    public class Subscriber
    {
        public const string CHANNEL = "CHANNEL:TEST";
        public static int REF_COUNT = 0;

        public Subscriber(ISubscriber sub)
        {
            _sub = sub;
        }

        public async Task Initialize()
        {
            await _sub.SubscribeAsync(CHANNEL, OnMessage);
            Interlocked.Increment(ref REF_COUNT);
        }

        public async Task Shutdown()
        {
            await _sub.UnsubscribeAsync(CHANNEL, OnMessage);
            Interlocked.Decrement(ref REF_COUNT);
        }

        private void OnMessage(RedisChannel channel, RedisValue value)
        {
            _pending.Enqueue(value);
        }

        private ISubscriber _sub;
        private ConcurrentQueue<string> _pending = new ConcurrentQueue<string>();
    }
}

Here's an example of it running with Watch + Heap Snapshot:

image

Here's another Heap Snapshot of objects being held by the Subscription after they Unsubscribed and GC was manually called:

image