Skip to content

Commit 8959385

Browse files
author
Oleh Dokuka
committed
ensures resumable connection awaits termination of all component
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: OlegDokuka <[email protected]>
1 parent 0ab392a commit 8959385

8 files changed

+190
-112
lines changed

rsocket-core/src/main/java/io/rsocket/core/ClientSetup.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.netty.buffer.Unpooled;
55
import io.rsocket.DuplexConnection;
66
import java.nio.channels.ClosedChannelException;
7+
import reactor.core.Disposable;
78
import reactor.core.publisher.Mono;
89
import reactor.util.function.Tuple2;
910
import reactor.util.function.Tuples;
@@ -25,8 +26,24 @@ class ResumableClientSetup extends ClientSetup {
2526

2627
@Override
2728
Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection) {
28-
return Mono.<Tuple2<ByteBuf, DuplexConnection>>create(
29-
sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, sink)))
30-
.or(connection.onClose().then(Mono.error(ClosedChannelException::new)));
29+
return Mono.create(
30+
sink -> {
31+
sink.onRequest(
32+
__ -> {
33+
new SetupHandlingDuplexConnection(connection, sink);
34+
});
35+
36+
Disposable subscribe =
37+
connection
38+
.onClose()
39+
.doFinally(__ -> sink.error(new ClosedChannelException()))
40+
.subscribe();
41+
sink.onCancel(
42+
() -> {
43+
subscribe.dispose();
44+
connection.dispose();
45+
connection.receive().subscribe();
46+
});
47+
});
3148
}
3249
}

rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ void dispose() {}
6060

6161
void sendError(DuplexConnection duplexConnection, RSocketErrorException exception) {
6262
duplexConnection.sendErrorAndClose(exception);
63+
duplexConnection.receive().subscribe();
6364
}
6465

6566
static class DefaultServerSetup extends ServerSetup {

rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java

+71-45
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import io.rsocket.frame.ResumeOkFrameCodec;
3030
import io.rsocket.keepalive.KeepAliveSupport;
3131
import java.time.Duration;
32-
import java.util.concurrent.TimeoutException;
3332
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
3433
import java.util.function.Function;
3534
import org.reactivestreams.Subscription;
@@ -79,31 +78,49 @@ public ClientRSocketSession(
7978
this.resumeToken = resumeToken;
8079
this.session = resumeToken.toString(CharsetUtil.UTF_8);
8180
this.connectionFactory =
82-
connectionFactory.flatMap(
83-
dc -> {
84-
final long impliedPosition = resumableFramesStore.frameImpliedPosition();
85-
final long position = resumableFramesStore.framePosition();
86-
dc.sendFrame(
87-
0,
88-
ResumeFrameCodec.encode(
89-
dc.alloc(),
90-
resumeToken.retain(),
91-
// server uses this to release its cache
92-
impliedPosition, // observed on the client side
93-
// server uses this to check whether there is no mismatch
94-
position // sent from the client sent
95-
));
96-
97-
if (logger.isDebugEnabled()) {
98-
logger.debug(
99-
"Side[client]|Session[{}]. ResumeFrame[impliedPosition[{}], position[{}]] has been sent.",
100-
session,
101-
impliedPosition,
102-
position);
103-
}
104-
105-
return connectionTransformer.apply(dc);
106-
});
81+
connectionFactory
82+
.doOnDiscard(
83+
DuplexConnection.class,
84+
c -> {
85+
final ConnectionErrorException connectionErrorException =
86+
new ConnectionErrorException("resumption_server=[Session Expired]");
87+
c.sendErrorAndClose(connectionErrorException);
88+
c.receive().subscribe();
89+
})
90+
.flatMap(
91+
dc -> {
92+
final long impliedPosition = resumableFramesStore.frameImpliedPosition();
93+
final long position = resumableFramesStore.framePosition();
94+
dc.sendFrame(
95+
0,
96+
ResumeFrameCodec.encode(
97+
dc.alloc(),
98+
resumeToken.retain(),
99+
// server uses this to release its cache
100+
impliedPosition, // observed on the client side
101+
// server uses this to check whether there is no mismatch
102+
position // sent from the client sent
103+
));
104+
105+
if (logger.isDebugEnabled()) {
106+
logger.debug(
107+
"Side[client]|Session[{}]. ResumeFrame[impliedPosition[{}], position[{}]] has been sent.",
108+
session,
109+
impliedPosition,
110+
position);
111+
}
112+
113+
return connectionTransformer
114+
.apply(dc)
115+
.doOnDiscard(
116+
Tuple2.class,
117+
tuple2 -> {
118+
if (logger.isDebugEnabled()) {
119+
logger.debug("try to reestablish from discard");
120+
}
121+
tryReestablishSession(tuple2);
122+
});
123+
});
107124
this.resumableFramesStore = resumableFramesStore;
108125
this.allocator = resumableDuplexConnection.alloc();
109126
this.resumeSessionDuration = resumeSessionDuration;
@@ -160,11 +177,20 @@ public void onImpliedPosition(long remoteImpliedPos) {
160177

161178
@Override
162179
public void dispose() {
163-
Operators.terminate(S, this);
180+
if (logger.isDebugEnabled()) {
181+
logger.debug("Side[client]|Session[{}]. Disposing", session);
182+
}
183+
184+
boolean result = Operators.terminate(S, this);
185+
186+
if (logger.isDebugEnabled()) {
187+
logger.debug("Side[client]|Session[{}]. Sessions[isDisposed={}]", session, result);
188+
}
164189

165190
reconnectDisposable.dispose();
166191
resumableConnection.dispose();
167-
resumableFramesStore.dispose();
192+
// frame store is disposed by resumable connection
193+
// resumableFramesStore.dispose();
168194

169195
if (resumeToken.refCnt() > 0) {
170196
resumeToken.release();
@@ -177,6 +203,9 @@ public boolean isDisposed() {
177203
}
178204

179205
void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
206+
if (logger.isDebugEnabled()) {
207+
logger.debug("Active subscription is canceled {}", s == Operators.cancelledSubscription());
208+
}
180209
ByteBuf shouldBeResumeOKFrame = tuple2.getT1();
181210
DuplexConnection nextDuplexConnection = tuple2.getT2();
182211

@@ -189,9 +218,9 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
189218
}
190219
final ConnectionErrorException connectionErrorException =
191220
new ConnectionErrorException("RESUME_OK frame must be received before any others");
192-
resumableConnection.dispose(connectionErrorException);
221+
resumableConnection.dispose(nextDuplexConnection, connectionErrorException);
193222
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
194-
nextDuplexConnection.receive().subscribe().dispose();
223+
nextDuplexConnection.receive().subscribe();
195224

196225
throw connectionErrorException; // throw to retry connection again
197226
}
@@ -227,10 +256,10 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
227256
}
228257
final ConnectionErrorException t = new ConnectionErrorException(e.getMessage(), e);
229258

230-
resumableConnection.dispose(t);
259+
resumableConnection.dispose(nextDuplexConnection, t);
231260

232261
nextDuplexConnection.sendErrorAndClose(t);
233-
nextDuplexConnection.receive().subscribe().dispose();
262+
nextDuplexConnection.receive().subscribe();
234263

235264
return;
236265
}
@@ -244,7 +273,7 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
244273
final ConnectionErrorException connectionErrorException =
245274
new ConnectionErrorException("resumption_server=[Session Expired]");
246275
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
247-
nextDuplexConnection.receive().subscribe().dispose();
276+
nextDuplexConnection.receive().subscribe();
248277
return;
249278
}
250279

@@ -263,7 +292,7 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
263292
final ConnectionErrorException connectionErrorException =
264293
new ConnectionErrorException("resumption_server_pos=[Session Expired]");
265294
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
266-
nextDuplexConnection.receive().subscribe().dispose();
295+
nextDuplexConnection.receive().subscribe();
267296
// no need to do anything since connection resumable connection is liklly to
268297
// be disposed
269298
}
@@ -278,10 +307,10 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
278307
final ConnectionErrorException connectionErrorException =
279308
new ConnectionErrorException("resumption_server_pos=[" + remoteImpliedPos + "]");
280309

281-
resumableConnection.dispose(connectionErrorException);
310+
resumableConnection.dispose(nextDuplexConnection, connectionErrorException);
282311

283312
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
284-
nextDuplexConnection.receive().subscribe().dispose();
313+
nextDuplexConnection.receive().subscribe();
285314
}
286315
} else if (frameType == FrameType.ERROR) {
287316
final RuntimeException exception = Exceptions.from(0, shouldBeResumeOKFrame);
@@ -292,13 +321,14 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
292321
exception);
293322
}
294323
if (exception instanceof RejectedResumeException) {
295-
resumableConnection.dispose(exception);
324+
resumableConnection.dispose(nextDuplexConnection, exception);
296325
nextDuplexConnection.dispose();
297-
nextDuplexConnection.receive().subscribe().dispose();
326+
nextDuplexConnection.receive().subscribe();
298327
return;
299328
}
300329

301330
nextDuplexConnection.dispose();
331+
nextDuplexConnection.receive().subscribe();
302332
throw exception; // assume retryable exception
303333
} else {
304334
if (logger.isDebugEnabled()) {
@@ -309,10 +339,10 @@ void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
309339
final ConnectionErrorException connectionErrorException =
310340
new ConnectionErrorException("RESUME_OK frame must be received before any others");
311341

312-
resumableConnection.dispose(connectionErrorException);
342+
resumableConnection.dispose(nextDuplexConnection, connectionErrorException);
313343

314344
nextDuplexConnection.sendErrorAndClose(connectionErrorException);
315-
nextDuplexConnection.receive().subscribe().dispose();
345+
nextDuplexConnection.receive().subscribe();
316346

317347
// no need to do anything since remote server rejected our connection completely
318348
}
@@ -349,11 +379,7 @@ public void onError(Throwable t) {
349379
Operators.onErrorDropped(t, currentContext());
350380
}
351381

352-
if (t instanceof TimeoutException) {
353-
resumableConnection.dispose();
354-
} else {
355-
resumableConnection.dispose(t);
356-
}
382+
resumableConnection.dispose();
357383
}
358384

359385
@Override

rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java

+17-3
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,9 @@ void drain(long expectedState) {
168168

169169
if (isConnected(expectedState)) {
170170
if (isTerminated(expectedState)) {
171-
handleTerminal(this.terminal);
171+
handleTerminated(qs, this.terminal);
172172
} else if (isDisposed()) {
173-
handleTerminal(new CancellationException("Disposed"));
173+
handleDisposed();
174174
} else if (hasFrames(expectedState)) {
175175
handlePendingFrames(qs);
176176
}
@@ -402,14 +402,28 @@ void handleFrame(ByteBuf frame) {
402402
handleConnectionFrame(frame);
403403
}
404404

405-
void handleTerminal(@Nullable Throwable t) {
405+
void handleTerminated(Fuseable.QueueSubscription<ByteBuf> qs, @Nullable Throwable t) {
406+
for (; ; ) {
407+
final ByteBuf frame = qs.poll();
408+
final boolean empty = frame == null;
409+
410+
if (empty) {
411+
break;
412+
}
413+
414+
handleFrame(frame);
415+
}
406416
if (t != null) {
407417
this.actual.onError(t);
408418
} else {
409419
this.actual.onComplete();
410420
}
411421
}
412422

423+
void handleDisposed() {
424+
this.actual.onError(new CancellationException("Disposed"));
425+
}
426+
413427
void handleConnectionFrame(ByteBuf frame) {
414428
this.actual.onNext(frame);
415429
}

0 commit comments

Comments
 (0)