improves BaseDuplexConnection and fixes PingClient impl (#1062) · rsocket/rsocket-java@d330a32 (original) (raw)

File tree

Original file line number Diff line number Diff line change
@@ -22,13 +22,10 @@
22 22 import reactor.core.publisher.Sinks;
23 23
24 24 public abstract class BaseDuplexConnection implements DuplexConnection {
25 -protected Sinks.Empty<Void> onClose = Sinks.empty();
25 +protected final Sinks.Empty<Void> onClose = Sinks.empty();
26 +protected final UnboundedProcessor sender = new UnboundedProcessor(onClose::tryEmitEmpty);
26 27
27 -protected UnboundedProcessor sender = new UnboundedProcessor();
28 -
29 -public BaseDuplexConnection() {
30 -onClose().doFinally(s -> doOnClose()).subscribe();
31 - }
28 +public BaseDuplexConnection() {}
32 29
33 30 @Override
34 31 public void sendFrame(int streamId, ByteBuf frame) {
@@ -48,7 +45,7 @@ public final Mono onClose() {
48 45
49 46 @Override
50 47 public final void dispose() {
51 -onClose.tryEmitEmpty();
48 +doOnClose();
52 49 }
53 50
54 51 @Override
Original file line number Diff line number Diff line change
@@ -63,8 +63,8 @@ Flux pingPong(
63 63 BiFunction<RSocket, ? super Payload, ? extends Publisher<Payload>> interaction,
64 64 int count,
65 65 final Recorder histogram) {
66 -return client
67 -.flatMapMany(
66 +return Flux.usingWhen(
67 + client,
68 68 rsocket ->
69 69 Flux.range(1, count)
70 70 .flatMap(
@@ -78,7 +78,11 @@ Flux pingPong(
78 78 histogram.recordValue(diff);
79 79 });
80 80 },
81 -64))
81 +64),
82 +rsocket -> {
83 +rsocket.dispose();
84 +return rsocket.onClose();
85 + })
82 86 .doOnError(Throwable::printStackTrace);
83 87 }
84 88 }