Pub/Sub fixes for subscribe/re-subscribe by NickCraver · Pull Request #1947 · StackExchange/StackExchange.Redis (original) (raw)
We're working on pub/sub - breaking it out explicitly from #1912. This relates to several issues and in general handling resubscriptions on reconnect.
Issues: #1110, #1586, #1830 #1835
There are a few things in play we're investigating:
- Subscription heartbeat not going over the subscription connection (due to
PINGandGetBridge) - Subscriptions not reconnecting at all (or potentially doing to and unsubscribing according to some issues)
- Subscriptions always going to a single cluster node (due to
default(RedisKey))
Overall this set of changes:
- Completely restructures how RedisSubscriber works
- No more
PendingSubscriptionState(Subscriptionhas the needed bits to reconnect) - Cleaner method topology (in
RedisSubscriber, rather thanSubscriber,RedisSubscriber, andConnectionMultiplexer)
* By placing these onRedisSubscriber, we can cleanly useExecuteSync/Asyncbits, get proper profiling, etc. - Proper sync/async split (rather than
Wait()in sync paths)
- No more
- Changes how subscriptions work
- The
Subscriptionobject is added to theConnectionMultiplexertracking immediately, but the command itself actually goes to the server and back (unless FireAndForget) before returning for proper ordering like other commands. - No more
Task.Run()loop - we now ensure reconnects as part of the handshake - Subscriptions are marked as not having a server the moment a disconnect is fired
* Question: Should we have a throttle around this for massive numbers of connections, or async it?
- The
- Changes how connecting works
- The connection completion handler will now fire when the second bridge/connection completes, this means we won't have
interactiveconnected butsubscriptionin an unknown state - both are connected before we fire the handler meaning the moment we come back from connect, subscriptions are in business.
- The connection completion handler will now fire when the second bridge/connection completes, this means we won't have
- Moves to a
ConcurrentDictionarysince we only need limited locking around this and we only have it once per multiplexer.- TODO: This needs eyes, we could shift it - implementation changed along the way where this isn't a critical detail
- Fixes the
TrackSubscriptionsProcessor- this was never setting the result but didn't notice in 8 years because downstream code never cared.- Note: each
Subscriptionhas a processor instance (with minimal state) because when the subscription command comes back then we need to decide if it successfully registered (if it didn't, we need to maintain it has no successful server)
- Note: each
ConnectionMultiplexergrew aDefaultSubscriberfor running some commands without lots of method duplication, e.g. ensuring servers are connected.- Overrides
GetHashSlotonCommandChannelBasewith the newRedisChannel-based methods so that operates correctly
Not directly related changes which helped here:
- Better profiler helpers for tests and profiler logging in them
- Re-enables a few
PubSubtests that were unreliable before...but correctly so.
TODO: I'd like to add a few more test scenarios here:
- Simple Subscribe/Publish/await Until/check pattern to ensure back-to-back subscribe/publish works well
- Cluster connection failure and subscriptions moving to another node
To consider:
- Subscription await loop from EnsureSubscriptionsAsync and connection impact on large reconnect situations
- In a reconnect case, this is background and only the nodes affected have any latency...but still.
- TODOs in code around variadic commands, e.g. re-subscribing with far fewer commands by using
SUBSCRIBE <key1> <key2>...- In cluster, we'd have to batch per slot...or just go for the first available node
- ...but if we go for the first available node, the semantics of
IsConnectedare slightly off in the not connected (CurrentServer is null) case, because we'd say we're connected to where it would go even though that'd be non-deterministic without hashslot batching. I think this is really minor and shouldn't affect our decision.
ConcurrentDictionaryvs. returning to locks around aDictionary- ...but if we have to lock on firing consumption of handlers anyway, concurrency overhead is probably a wash.