19
19
import io .netty .util .CharsetUtil ;
20
20
import io .netty .util .ReferenceCountUtil ;
21
21
import io .netty .util .ReferenceCounted ;
22
+ import io .rsocket .FrameAssert ;
22
23
import io .rsocket .Payload ;
23
24
import io .rsocket .RSocket ;
24
25
import io .rsocket .RaceTestConstants ;
@@ -434,6 +435,8 @@ public void shouldBeAbleToResolveOriginalSource() {
434
435
assertSubscriber1 .assertTerminated ().assertValueCount (1 );
435
436
436
437
Assertions .assertThat (assertSubscriber1 .values ()).isEqualTo (assertSubscriber .values ());
438
+
439
+ rule .allocator .assertHasNoLeaks ();
437
440
}
438
441
439
442
@ Test
@@ -457,6 +460,13 @@ public void shouldDisposeOriginalSource() {
457
460
.assertErrorMessage ("Disposed" );
458
461
459
462
Assertions .assertThat (rule .socket .isDisposed ()).isTrue ();
463
+
464
+ FrameAssert .assertThat (rule .connection .awaitFrame ())
465
+ .hasStreamIdZero ()
466
+ .hasData ("Disposed" )
467
+ .hasNoLeaks ();
468
+
469
+ rule .allocator .assertHasNoLeaks ();
460
470
}
461
471
462
472
@ Test
@@ -494,6 +504,13 @@ public Mono<Void> onClose() {
494
504
onCloseSubscriber .assertTerminated ().assertComplete ();
495
505
496
506
Assertions .assertThat (rule .socket .isDisposed ()).isTrue ();
507
+
508
+ FrameAssert .assertThat (rule .connection .awaitFrame ())
509
+ .hasStreamIdZero ()
510
+ .hasData ("Disposed" )
511
+ .hasNoLeaks ();
512
+
513
+ rule .allocator .assertHasNoLeaks ();
497
514
}
498
515
499
516
@ Test
@@ -515,6 +532,13 @@ public void shouldResolveOnStartSource() {
515
532
assertSubscriber1 .assertTerminated ().assertComplete ();
516
533
517
534
Assertions .assertThat (rule .socket .isDisposed ()).isTrue ();
535
+
536
+ FrameAssert .assertThat (rule .connection .awaitFrame ())
537
+ .hasStreamIdZero ()
538
+ .hasData ("Disposed" )
539
+ .hasNoLeaks ();
540
+
541
+ rule .allocator .assertHasNoLeaks ();
518
542
}
519
543
520
544
@ Test
@@ -536,6 +560,13 @@ public void shouldNotStartIfAlreadyDisposed() {
536
560
assertSubscriber1 .assertTerminated ().assertComplete ();
537
561
538
562
Assertions .assertThat (rule .socket .isDisposed ()).isTrue ();
563
+
564
+ FrameAssert .assertThat (rule .connection .awaitFrame ())
565
+ .hasStreamIdZero ()
566
+ .hasData ("Disposed" )
567
+ .hasNoLeaks ();
568
+
569
+ rule .allocator .assertHasNoLeaks ();
539
570
}
540
571
541
572
@ Test
@@ -553,6 +584,11 @@ public void shouldBeRestartedIfSourceWasClosed() {
553
584
554
585
rule .socket .dispose ();
555
586
587
+ FrameAssert .assertThat (rule .connection .awaitFrame ())
588
+ .hasStreamIdZero ()
589
+ .hasData ("Disposed" )
590
+ .hasNoLeaks ();
591
+
556
592
terminateSubscriber .assertNotTerminated ();
557
593
Assertions .assertThat (rule .client .isDisposed ()).isFalse ();
558
594
@@ -576,6 +612,13 @@ public void shouldBeRestartedIfSourceWasClosed() {
576
612
Assertions .assertThat (rule .client .connect ()).isFalse ();
577
613
578
614
Assertions .assertThat (rule .socket .isDisposed ()).isTrue ();
615
+
616
+ FrameAssert .assertThat (rule .connection .awaitFrame ())
617
+ .hasStreamIdZero ()
618
+ .hasData ("Disposed" )
619
+ .hasNoLeaks ();
620
+
621
+ rule .allocator .assertHasNoLeaks ();
579
622
}
580
623
581
624
@ Test
@@ -603,6 +646,13 @@ public void shouldDisposeOriginalSourceIfRacing() {
603
646
.assertTerminated ()
604
647
.assertError (CancellationException .class )
605
648
.assertErrorMessage ("Disposed" );
649
+
650
+ ByteBuf buf ;
651
+ while ((buf = rule .connection .pollFrame ()) != null ) {
652
+ FrameAssert .assertThat (buf ).hasStreamIdZero ().hasData ("Disposed" ).hasNoLeaks ();
653
+ }
654
+
655
+ rule .allocator .assertHasNoLeaks ();
606
656
}
607
657
}
608
658
@@ -632,8 +682,14 @@ public void shouldStartOriginalSourceOnceIfRacing() {
632
682
AssertSubscriber <Void > assertSubscriber1 = AssertSubscriber .create ();
633
683
634
684
rule .client .onClose ().subscribe (assertSubscriber1 );
685
+ FrameAssert .assertThat (rule .connection .awaitFrame ())
686
+ .hasStreamIdZero ()
687
+ .hasData ("Disposed" )
688
+ .hasNoLeaks ();
635
689
636
690
assertSubscriber1 .assertTerminated ().assertComplete ();
691
+
692
+ rule .allocator .assertHasNoLeaks ();
637
693
}
638
694
}
639
695
0 commit comments