Skip to content

Commit 5da37cd

Browse files
authored
ensures onClose awaits all underlying components to be closed (#1085)
1 parent 8959385 commit 5da37cd

30 files changed

+445
-88
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package io.rsocket.resume;
2+
3+
import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;
4+
5+
import io.netty.buffer.ByteBuf;
6+
import io.netty.buffer.ByteBufAllocator;
7+
import io.netty.buffer.ByteBufUtil;
8+
import io.netty.buffer.Unpooled;
9+
import io.rsocket.exceptions.ConnectionErrorException;
10+
import io.rsocket.frame.ErrorFrameCodec;
11+
import io.rsocket.frame.PayloadFrameCodec;
12+
import io.rsocket.internal.UnboundedProcessor;
13+
import org.openjdk.jcstress.annotations.Actor;
14+
import org.openjdk.jcstress.annotations.Arbiter;
15+
import org.openjdk.jcstress.annotations.JCStressTest;
16+
import org.openjdk.jcstress.annotations.Outcome;
17+
import org.openjdk.jcstress.annotations.State;
18+
import org.openjdk.jcstress.infra.results.LL_Result;
19+
import reactor.core.Disposable;
20+
21+
public class InMemoryResumableFramesStoreStressTest {
22+
boolean storeClosed;
23+
24+
InMemoryResumableFramesStore store =
25+
new InMemoryResumableFramesStore("test", Unpooled.EMPTY_BUFFER, 128);
26+
boolean processorClosed;
27+
UnboundedProcessor processor = new UnboundedProcessor(() -> processorClosed = true);
28+
29+
void subscribe() {
30+
store.saveFrames(processor).subscribe();
31+
store.onClose().subscribe(null, t -> storeClosed = true, () -> storeClosed = true);
32+
}
33+
34+
@JCStressTest
35+
@Outcome(
36+
id = {"true, true"},
37+
expect = ACCEPTABLE)
38+
@State
39+
public static class TwoSubscribesRaceStressTest extends InMemoryResumableFramesStoreStressTest {
40+
41+
Disposable d1;
42+
43+
final ByteBuf b1 =
44+
PayloadFrameCodec.encode(
45+
ByteBufAllocator.DEFAULT,
46+
1,
47+
false,
48+
true,
49+
false,
50+
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello1"),
51+
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello2"));
52+
final ByteBuf b2 =
53+
PayloadFrameCodec.encode(
54+
ByteBufAllocator.DEFAULT,
55+
3,
56+
false,
57+
true,
58+
false,
59+
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello3"),
60+
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello4"));
61+
final ByteBuf b3 =
62+
PayloadFrameCodec.encode(
63+
ByteBufAllocator.DEFAULT,
64+
5,
65+
false,
66+
true,
67+
false,
68+
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello5"),
69+
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello6"));
70+
71+
final ByteBuf c1 =
72+
ErrorFrameCodec.encode(ByteBufAllocator.DEFAULT, 0, new ConnectionErrorException("closed"));
73+
74+
{
75+
subscribe();
76+
d1 = store.doOnDiscard(ByteBuf.class, ByteBuf::release).subscribe(ByteBuf::release, t -> {});
77+
}
78+
79+
@Actor
80+
public void producer1() {
81+
processor.tryEmitNormal(b1);
82+
processor.tryEmitNormal(b2);
83+
processor.tryEmitNormal(b3);
84+
}
85+
86+
@Actor
87+
public void producer2() {
88+
processor.tryEmitFinal(c1);
89+
}
90+
91+
@Actor
92+
public void producer3() {
93+
d1.dispose();
94+
store
95+
.doOnDiscard(ByteBuf.class, ByteBuf::release)
96+
.subscribe(ByteBuf::release, t -> {})
97+
.dispose();
98+
store
99+
.doOnDiscard(ByteBuf.class, ByteBuf::release)
100+
.subscribe(ByteBuf::release, t -> {})
101+
.dispose();
102+
store.doOnDiscard(ByteBuf.class, ByteBuf::release).subscribe(ByteBuf::release, t -> {});
103+
}
104+
105+
@Actor
106+
public void producer4() {
107+
store.releaseFrames(0);
108+
store.releaseFrames(0);
109+
store.releaseFrames(0);
110+
}
111+
112+
@Arbiter
113+
public void arbiter(LL_Result r) {
114+
r.r1 = storeClosed;
115+
r.r2 = processorClosed;
116+
}
117+
}
118+
}

rsocket-core/src/main/java/io/rsocket/core/ClientServerInputMultiplexer.java

+43-3
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ public ClientServerInputMultiplexer(
6767
this.source = source;
6868
this.isClient = isClient;
6969

70-
this.serverReceiver = new InternalDuplexConnection(this, source);
71-
this.clientReceiver = new InternalDuplexConnection(this, source);
70+
this.serverReceiver = new InternalDuplexConnection(Type.SERVER, this, source);
71+
this.clientReceiver = new InternalDuplexConnection(Type.CLIENT, this, source);
7272
this.serverConnection = registry.initConnection(Type.SERVER, serverReceiver);
7373
this.clientConnection = registry.initConnection(Type.CLIENT, clientReceiver);
7474
}
@@ -195,8 +195,33 @@ int incrementAndGetCheckingState() {
195195
}
196196
}
197197

198+
@Override
199+
public String toString() {
200+
return "ClientServerInputMultiplexer{"
201+
+ "serverReceiver="
202+
+ serverReceiver
203+
+ ", clientReceiver="
204+
+ clientReceiver
205+
+ ", serverConnection="
206+
+ serverConnection
207+
+ ", clientConnection="
208+
+ clientConnection
209+
+ ", source="
210+
+ source
211+
+ ", isClient="
212+
+ isClient
213+
+ ", s="
214+
+ s
215+
+ ", t="
216+
+ t
217+
+ ", state="
218+
+ state
219+
+ '}';
220+
}
221+
198222
private static class InternalDuplexConnection extends Flux<ByteBuf>
199223
implements Subscription, DuplexConnection {
224+
private final Type type;
200225
private final ClientServerInputMultiplexer clientServerInputMultiplexer;
201226
private final DuplexConnection source;
202227

@@ -207,7 +232,10 @@ private static class InternalDuplexConnection extends Flux<ByteBuf>
207232
CoreSubscriber<? super ByteBuf> actual;
208233

209234
public InternalDuplexConnection(
210-
ClientServerInputMultiplexer clientServerInputMultiplexer, DuplexConnection source) {
235+
Type type,
236+
ClientServerInputMultiplexer clientServerInputMultiplexer,
237+
DuplexConnection source) {
238+
this.type = type;
211239
this.clientServerInputMultiplexer = clientServerInputMultiplexer;
212240
this.source = source;
213241
}
@@ -304,5 +332,17 @@ public Mono<Void> onClose() {
304332
public double availability() {
305333
return source.availability();
306334
}
335+
336+
@Override
337+
public String toString() {
338+
return "InternalDuplexConnection{"
339+
+ "type="
340+
+ type
341+
+ ", source="
342+
+ source
343+
+ ", state="
344+
+ state
345+
+ '}';
346+
}
307347
}
308348
}

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.function.Supplier;
4848
import reactor.core.Disposable;
4949
import reactor.core.publisher.Mono;
50+
import reactor.core.publisher.Sinks;
5051
import reactor.util.annotation.Nullable;
5152
import reactor.util.function.Tuples;
5253
import reactor.util.retry.Retry;
@@ -633,8 +634,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
633634
wrappedConnection = resumableDuplexConnection;
634635
} else {
635636
keepAliveHandler =
636-
new KeepAliveHandler.DefaultKeepAliveHandler(
637-
clientServerConnection);
637+
new KeepAliveHandler.DefaultKeepAliveHandler();
638638
wrappedConnection = clientServerConnection;
639639
}
640640

@@ -655,6 +655,11 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
655655
requesterLeaseTracker = null;
656656
}
657657

658+
final Sinks.Empty<Void> requesterOnAllClosedSink =
659+
Sinks.unsafe().empty();
660+
final Sinks.Empty<Void> responderOnAllClosedSink =
661+
Sinks.unsafe().empty();
662+
658663
RSocket rSocketRequester =
659664
new RSocketRequester(
660665
multiplexer.asClientConnection(),
@@ -667,7 +672,11 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
667672
(int) keepAliveMaxLifeTime.toMillis(),
668673
keepAliveHandler,
669674
interceptors::initRequesterRequestInterceptor,
670-
requesterLeaseTracker);
675+
requesterLeaseTracker,
676+
requesterOnAllClosedSink,
677+
Mono.whenDelayError(
678+
responderOnAllClosedSink.asMono(),
679+
requesterOnAllClosedSink.asMono()));
671680

672681
RSocket wrappedRSocketRequester =
673682
interceptors.initRequester(rSocketRequester);
@@ -715,7 +724,8 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
715724
(RequestInterceptor)
716725
leases.sender)
717726
: interceptors
718-
::initResponderRequestInterceptor);
727+
::initResponderRequestInterceptor,
728+
responderOnAllClosedSink);
719729

720730
return wrappedRSocketRequester;
721731
})

0 commit comments

Comments
 (0)