Skip to content

Commit a5a060a

Browse files
OlegDokukaOlegDokuka
authored and
OlegDokuka
committed
wip
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent aa9b2cb commit a5a060a

File tree

2 files changed

+3
-19
lines changed

2 files changed

+3
-19
lines changed

rsocket-core/src/main/java/io/rsocket/core/RequesterResponderSupport.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public boolean remove(int streamId, FrameHandler frameHandler) {
177177
return false;
178178
}
179179
activeStreams.remove(streamId);
180-
if (activeStreams.size() == 0) {
180+
if (this.terminating && activeStreams.size() == 0) {
181181
terminated = true;
182182
this.terminated = true;
183183
} else {

rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java

+2-18
Original file line numberDiff line numberDiff line change
@@ -491,10 +491,6 @@ public Mono<Void> onClose() {
491491

492492
onCloseDelayer.tryEmitEmpty();
493493

494-
onCloseSubscriber.assertNotTerminated();
495-
496-
rule.otherClosedSink.tryEmitEmpty();
497-
498494
onCloseSubscriber.assertTerminated().assertComplete();
499495

500496
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
@@ -516,10 +512,6 @@ public void shouldResolveOnStartSource() {
516512

517513
rule.client.onClose().subscribe(assertSubscriber1);
518514

519-
assertSubscriber1.assertNotTerminated();
520-
521-
rule.otherClosedSink.tryEmitEmpty();
522-
523515
assertSubscriber1.assertTerminated().assertComplete();
524516

525517
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
@@ -579,8 +571,6 @@ public void shouldBeRestartedIfSourceWasClosed() {
579571

580572
rule.client.dispose();
581573

582-
rule.otherClosedSink.tryEmitEmpty();
583-
584574
terminateSubscriber.assertTerminated().assertComplete();
585575

586576
Assertions.assertThat(rule.client.connect()).isFalse();
@@ -636,8 +626,6 @@ public void shouldStartOriginalSourceOnceIfRacing() {
636626

637627
rule.client.dispose();
638628

639-
rule.otherClosedSink.tryEmitEmpty();
640-
641629
Assertions.assertThat(rule.client.isDisposed()).isTrue();
642630
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
643631

@@ -656,10 +644,8 @@ public static class ClientSocketRule extends AbstractSocketRule<RSocket> {
656644
protected Sinks.One<RSocket> producer;
657645

658646
protected Sinks.Empty<Void> onGracefulShutdownStartedSink;
659-
protected Sinks.Empty<Void> otherGracefulShutdownSink;
660647
protected Sinks.Empty<Void> thisGracefulShutdownSink;
661648
protected Sinks.Empty<Void> thisClosedSink;
662-
protected Sinks.Empty<Void> otherClosedSink;
663649

664650
@Override
665651
protected void doInit() {
@@ -679,10 +665,8 @@ protected void doInit() {
679665
@Override
680666
protected RSocket newRSocket() {
681667
this.onGracefulShutdownStartedSink = Sinks.empty();
682-
this.otherGracefulShutdownSink = Sinks.empty();
683668
this.thisGracefulShutdownSink = Sinks.empty();
684669
this.thisClosedSink = Sinks.empty();
685-
this.otherClosedSink = Sinks.empty();
686670
return new RSocketRequester(
687671
connection,
688672
PayloadDecoder.ZERO_COPY,
@@ -698,8 +682,8 @@ protected RSocket newRSocket() {
698682
onGracefulShutdownStartedSink,
699683
thisGracefulShutdownSink,
700684
thisClosedSink,
701-
otherGracefulShutdownSink.asMono().and(thisGracefulShutdownSink.asMono()),
702-
otherClosedSink.asMono().and(thisClosedSink.asMono()));
685+
thisGracefulShutdownSink.asMono(),
686+
thisClosedSink.asMono());
703687
}
704688
}
705689
}

0 commit comments

Comments
 (0)