Skip to content

Commit 308e4c3

Browse files
author
Oleh Dokuka
committed
Merge #1090 into 1.2.0-SNAPSHOT
Signed-off-by: OlegDokuka <[email protected]>
2 parents 47e4e3b + 5547cb1 commit 308e4c3

25 files changed

+1180
-785
lines changed

rsocket-core/src/main/java/io/rsocket/core/DefaultRSocketClient.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -450,8 +450,8 @@ public void accept(RSocket rSocket, Throwable t) {
450450

451451
@Override
452452
public void request(long n) {
453-
this.main.request(n);
454453
super.request(n);
454+
this.main.request(n);
455455
}
456456

457457
public void cancel() {

rsocket-core/src/test/java/io/rsocket/core/ClientServerInputMultiplexerTest.java

+20-4
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,20 @@ public void clientSplits() {
5656
clientMultiplexer
5757
.asClientConnection()
5858
.receive()
59-
.doOnNext(f -> clientFrames.incrementAndGet())
59+
.doOnNext(
60+
f -> {
61+
clientFrames.incrementAndGet();
62+
f.release();
63+
})
6064
.subscribe();
6165
clientMultiplexer
6266
.asServerConnection()
6367
.receive()
64-
.doOnNext(f -> serverFrames.incrementAndGet())
68+
.doOnNext(
69+
f -> {
70+
serverFrames.incrementAndGet();
71+
f.release();
72+
})
6573
.subscribe();
6674

6775
source.addToReceivedBuffer(errorFrame(1).retain());
@@ -101,12 +109,20 @@ public void serverSplits() {
101109
serverMultiplexer
102110
.asClientConnection()
103111
.receive()
104-
.doOnNext(f -> clientFrames.incrementAndGet())
112+
.doOnNext(
113+
f -> {
114+
clientFrames.incrementAndGet();
115+
f.release();
116+
})
105117
.subscribe();
106118
serverMultiplexer
107119
.asServerConnection()
108120
.receive()
109-
.doOnNext(f -> serverFrames.incrementAndGet())
121+
.doOnNext(
122+
f -> {
123+
serverFrames.incrementAndGet();
124+
f.release();
125+
})
110126
.subscribe();
111127

112128
source.addToReceivedBuffer(errorFrame(1).retain());

rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java

+56
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.netty.util.CharsetUtil;
2020
import io.netty.util.ReferenceCountUtil;
2121
import io.netty.util.ReferenceCounted;
22+
import io.rsocket.FrameAssert;
2223
import io.rsocket.Payload;
2324
import io.rsocket.RSocket;
2425
import io.rsocket.RaceTestConstants;
@@ -434,6 +435,8 @@ public void shouldBeAbleToResolveOriginalSource() {
434435
assertSubscriber1.assertTerminated().assertValueCount(1);
435436

436437
Assertions.assertThat(assertSubscriber1.values()).isEqualTo(assertSubscriber.values());
438+
439+
rule.allocator.assertHasNoLeaks();
437440
}
438441

439442
@Test
@@ -457,6 +460,13 @@ public void shouldDisposeOriginalSource() {
457460
.assertErrorMessage("Disposed");
458461

459462
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
463+
464+
FrameAssert.assertThat(rule.connection.awaitFrame())
465+
.hasStreamIdZero()
466+
.hasData("Disposed")
467+
.hasNoLeaks();
468+
469+
rule.allocator.assertHasNoLeaks();
460470
}
461471

462472
@Test
@@ -494,6 +504,13 @@ public Mono<Void> onClose() {
494504
onCloseSubscriber.assertTerminated().assertComplete();
495505

496506
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
507+
508+
FrameAssert.assertThat(rule.connection.awaitFrame())
509+
.hasStreamIdZero()
510+
.hasData("Disposed")
511+
.hasNoLeaks();
512+
513+
rule.allocator.assertHasNoLeaks();
497514
}
498515

499516
@Test
@@ -515,6 +532,13 @@ public void shouldResolveOnStartSource() {
515532
assertSubscriber1.assertTerminated().assertComplete();
516533

517534
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
535+
536+
FrameAssert.assertThat(rule.connection.awaitFrame())
537+
.hasStreamIdZero()
538+
.hasData("Disposed")
539+
.hasNoLeaks();
540+
541+
rule.allocator.assertHasNoLeaks();
518542
}
519543

520544
@Test
@@ -536,6 +560,13 @@ public void shouldNotStartIfAlreadyDisposed() {
536560
assertSubscriber1.assertTerminated().assertComplete();
537561

538562
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
563+
564+
FrameAssert.assertThat(rule.connection.awaitFrame())
565+
.hasStreamIdZero()
566+
.hasData("Disposed")
567+
.hasNoLeaks();
568+
569+
rule.allocator.assertHasNoLeaks();
539570
}
540571

541572
@Test
@@ -553,6 +584,11 @@ public void shouldBeRestartedIfSourceWasClosed() {
553584

554585
rule.socket.dispose();
555586

587+
FrameAssert.assertThat(rule.connection.awaitFrame())
588+
.hasStreamIdZero()
589+
.hasData("Disposed")
590+
.hasNoLeaks();
591+
556592
terminateSubscriber.assertNotTerminated();
557593
Assertions.assertThat(rule.client.isDisposed()).isFalse();
558594

@@ -576,6 +612,13 @@ public void shouldBeRestartedIfSourceWasClosed() {
576612
Assertions.assertThat(rule.client.connect()).isFalse();
577613

578614
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
615+
616+
FrameAssert.assertThat(rule.connection.awaitFrame())
617+
.hasStreamIdZero()
618+
.hasData("Disposed")
619+
.hasNoLeaks();
620+
621+
rule.allocator.assertHasNoLeaks();
579622
}
580623

581624
@Test
@@ -603,6 +646,13 @@ public void shouldDisposeOriginalSourceIfRacing() {
603646
.assertTerminated()
604647
.assertError(CancellationException.class)
605648
.assertErrorMessage("Disposed");
649+
650+
ByteBuf buf;
651+
while ((buf = rule.connection.pollFrame()) != null) {
652+
FrameAssert.assertThat(buf).hasStreamIdZero().hasData("Disposed").hasNoLeaks();
653+
}
654+
655+
rule.allocator.assertHasNoLeaks();
606656
}
607657
}
608658

@@ -632,8 +682,14 @@ public void shouldStartOriginalSourceOnceIfRacing() {
632682
AssertSubscriber<Void> assertSubscriber1 = AssertSubscriber.create();
633683

634684
rule.client.onClose().subscribe(assertSubscriber1);
685+
FrameAssert.assertThat(rule.connection.awaitFrame())
686+
.hasStreamIdZero()
687+
.hasData("Disposed")
688+
.hasNoLeaks();
635689

636690
assertSubscriber1.assertTerminated().assertComplete();
691+
692+
rule.allocator.assertHasNoLeaks();
637693
}
638694
}
639695

0 commit comments

Comments
 (0)