@@ -134,6 +134,14 @@ default Payload createTestPayload(int metadataPresent) {
134134 return ByteBufPayload .create (MOCK_DATA , metadata );
135135 }
136136
137+ default RSocket getClient () {
138+ return getTransportPair ().getClient ();
139+ }
140+
141+ Duration getTimeout ();
142+
143+ TransportPair getTransportPair ();
144+
137145 @ DisplayName ("makes 10 fireAndForget requests" )
138146 @ Test
139147 default void fireAndForget10 () {
@@ -143,7 +151,8 @@ default void fireAndForget10() {
143151 .expectComplete ()
144152 .verify (getTimeout ());
145153
146- getTransportPair ().responder .awaitUntilObserved (10 , getTimeout ());
154+ Assertions .assertThat (getTransportPair ().responder .awaitUntilObserved (10 , getTimeout ()))
155+ .isTrue ();
147156 }
148157
149158 @ DisplayName ("makes 10 fireAndForget with Large Payload in Requests" )
@@ -155,17 +164,10 @@ default void largePayloadFireAndForget10() {
155164 .expectComplete ()
156165 .verify (getTimeout ());
157166
158- getTransportPair ().responder .awaitUntilObserved (10 , getTimeout ());
167+ Assertions .assertThat (getTransportPair ().responder .awaitUntilObserved (10 , getTimeout ()))
168+ .isTrue ();
159169 }
160170
161- default RSocket getClient () {
162- return getTransportPair ().getClient ();
163- }
164-
165- Duration getTimeout ();
166-
167- TransportPair getTransportPair ();
168-
169171 @ DisplayName ("makes 10 metadataPush requests" )
170172 @ Test
171173 default void metadataPush10 () {
@@ -176,7 +178,8 @@ default void metadataPush10() {
176178 .expectComplete ()
177179 .verify (getTimeout ());
178180
179- getTransportPair ().responder .awaitUntilObserved (10 , getTimeout ());
181+ Assertions .assertThat (getTransportPair ().responder .awaitUntilObserved (10 , getTimeout ()))
182+ .isTrue ();
180183 }
181184
182185 @ DisplayName ("makes 10 metadataPush with Large Metadata in requests" )
@@ -189,7 +192,8 @@ default void largePayloadMetadataPush10() {
189192 .expectComplete ()
190193 .verify (getTimeout ());
191194
192- getTransportPair ().responder .awaitUntilObserved (10 , getTimeout ());
195+ Assertions .assertThat (getTransportPair ().responder .awaitUntilObserved (10 , getTimeout ()))
196+ .isTrue ();
193197 }
194198
195199 @ DisplayName ("makes 1 requestChannel request with 0 payloads" )
@@ -452,6 +456,7 @@ class TransportPair<T, S extends Closeable> implements Disposable {
452456 private static final String metadata = "metadata" ;
453457
454458 private final boolean withResumability ;
459+ private final boolean withAsyncSupport ;
455460
456461 private final LeaksTrackingByteBufAllocator byteBufAllocator1 =
457462 LeaksTrackingByteBufAllocator .instrument (
@@ -492,122 +497,143 @@ public TransportPair(
492497 BiFunction <T , ByteBufAllocator , ServerTransport <S >> serverTransportSupplier ,
493498 boolean withRandomFragmentation ,
494499 boolean withResumability ) {
495- this .withResumability = withResumability ;
496-
497- T address = addressSupplier .get ();
498-
499- final boolean runClientWithAsyncInterceptors = ThreadLocalRandom .current ().nextBoolean ();
500- final boolean runServerWithAsyncInterceptors = ThreadLocalRandom .current ().nextBoolean ();
501-
502- ByteBufAllocator allocatorToSupply1 ;
503- ByteBufAllocator allocatorToSupply2 ;
504- if (ResourceLeakDetector .getLevel () == ResourceLeakDetector .Level .ADVANCED
505- || ResourceLeakDetector .getLevel () == ResourceLeakDetector .Level .PARANOID ) {
506- logger .info ("Using LeakTrackingByteBufAllocator" );
507- allocatorToSupply1 = byteBufAllocator1 ;
508- allocatorToSupply2 = byteBufAllocator2 ;
509- } else {
510- allocatorToSupply1 = ByteBufAllocator .DEFAULT ;
511- allocatorToSupply2 = ByteBufAllocator .DEFAULT ;
512- }
513- responder = new TestRSocket (TransportPair .data , metadata );
514- final RSocketServer rSocketServer =
515- RSocketServer .create ((setup , sendingSocket ) -> Mono .just (responder ))
516- .payloadDecoder (PayloadDecoder .ZERO_COPY )
517- .interceptors (
518- registry -> {
519- if (runServerWithAsyncInterceptors && !withResumability ) {
520- logger .info (
521- "Perform Integration Test with Async Interceptors Enabled For Server" );
522- registry
523- .forConnection (
524- (type , duplexConnection ) ->
525- new AsyncDuplexConnection (duplexConnection ))
526- .forSocketAcceptor (
527- delegate ->
528- (connectionSetupPayload , sendingSocket ) ->
529- delegate
530- .accept (connectionSetupPayload , sendingSocket )
531- .subscribeOn (Schedulers .parallel ()));
532- }
533-
534- if (withResumability ) {
535- registry .forConnection (
536- (type , duplexConnection ) ->
537- type == DuplexConnectionInterceptor .Type .SOURCE
538- ? new DisconnectingDuplexConnection (
539- "Server" ,
540- duplexConnection ,
541- Duration .ofMillis (
542- ThreadLocalRandom .current ().nextInt (100 , 1000 )))
543- : duplexConnection );
544- }
545- });
546-
547- if (withResumability ) {
548- rSocketServer .resume (
549- new Resume ()
550- .storeFactory (
551- token -> new InMemoryResumableFramesStore ("server" , token , Integer .MAX_VALUE )));
552- }
500+ this (
501+ addressSupplier ,
502+ clientTransportSupplier ,
503+ serverTransportSupplier ,
504+ withRandomFragmentation ,
505+ withResumability ,
506+ true );
507+ }
553508
554- if (withRandomFragmentation ) {
555- rSocketServer .fragment (ThreadLocalRandom .current ().nextInt (256 , 512 ));
556- }
509+ public TransportPair (
510+ Supplier <T > addressSupplier ,
511+ TriFunction <T , S , ByteBufAllocator , ClientTransport > clientTransportSupplier ,
512+ BiFunction <T , ByteBufAllocator , ServerTransport <S >> serverTransportSupplier ,
513+ boolean withRandomFragmentation ,
514+ boolean withResumability ,
515+ boolean withAsyncSupport ) {
516+
517+ this .withResumability = withResumability ;
518+ this .withAsyncSupport = withAsyncSupport ;
519+
520+ T address = addressSupplier .get ();
521+
522+ final boolean runClientWithAsyncInterceptors =
523+ ThreadLocalRandom .current ().nextBoolean () && withAsyncSupport ;
524+ final boolean runServerWithAsyncInterceptors =
525+ ThreadLocalRandom .current ().nextBoolean () && withAsyncSupport ;
526+
527+ ByteBufAllocator allocatorToSupply1 ;
528+ ByteBufAllocator allocatorToSupply2 ;
529+ if (ResourceLeakDetector .getLevel () == ResourceLeakDetector .Level .ADVANCED
530+ || ResourceLeakDetector .getLevel () == ResourceLeakDetector .Level .PARANOID ) {
531+ logger .info ("Using LeakTrackingByteBufAllocator" );
532+ allocatorToSupply1 = byteBufAllocator1 ;
533+ allocatorToSupply2 = byteBufAllocator2 ;
534+ } else {
535+ allocatorToSupply1 = ByteBufAllocator .DEFAULT ;
536+ allocatorToSupply2 = ByteBufAllocator .DEFAULT ;
537+ }
538+ responder = new TestRSocket (TransportPair .data , metadata );
539+ final RSocketServer rSocketServer =
540+ RSocketServer .create ((setup , sendingSocket ) -> Mono .just (responder ))
541+ .payloadDecoder (PayloadDecoder .ZERO_COPY )
542+ .interceptors (
543+ registry -> {
544+ if (runServerWithAsyncInterceptors && !withResumability ) {
545+ logger .info (
546+ "Perform Integration Test with Async Interceptors Enabled For Server" );
547+ registry
548+ .forConnection (
549+ (type , duplexConnection ) ->
550+ new AsyncDuplexConnection (duplexConnection ))
551+ .forSocketAcceptor (
552+ delegate ->
553+ (connectionSetupPayload , sendingSocket ) ->
554+ delegate
555+ .accept (connectionSetupPayload , sendingSocket )
556+ .subscribeOn (Schedulers .parallel ()));
557+ }
558+
559+ if (withResumability ) {
560+ registry .forConnection (
561+ (type , duplexConnection ) ->
562+ type == DuplexConnectionInterceptor .Type .SOURCE
563+ ? new DisconnectingDuplexConnection (
564+ "Server" ,
565+ duplexConnection ,
566+ Duration .ofMillis (
567+ ThreadLocalRandom .current ().nextInt (100 , 1000 )))
568+ : duplexConnection );
569+ }
570+ });
571+
572+ if (withResumability ) {
573+ rSocketServer .resume (
574+ new Resume ()
575+ .storeFactory (
576+ token -> new InMemoryResumableFramesStore ("server" , token ,Integer .MAX_VALUE )));
577+ }
557578
558- server =
559- rSocketServer .bind (serverTransportSupplier .apply (address , allocatorToSupply2 )).block ();
560-
561- final RSocketConnector rSocketConnector =
562- RSocketConnector .create ()
563- .payloadDecoder (PayloadDecoder .ZERO_COPY )
564- .keepAlive (Duration .ofMillis (10 ), Duration .ofHours (1 ))
565- .interceptors (
566- registry -> {
567- if (runClientWithAsyncInterceptors && !withResumability ) {
568- logger .info (
569- "Perform Integration Test with Async Interceptors Enabled For Client" );
570- registry
571- .forConnection (
572- (type , duplexConnection ) ->
573- new AsyncDuplexConnection (duplexConnection ))
574- .forSocketAcceptor (
575- delegate ->
576- (connectionSetupPayload , sendingSocket ) ->
577- delegate
578- .accept (connectionSetupPayload , sendingSocket )
579- .subscribeOn (Schedulers .parallel ()));
580- }
581-
582- if (withResumability ) {
583- registry .forConnection (
584- (type , duplexConnection ) ->
585- type == DuplexConnectionInterceptor .Type .SOURCE
586- ? new DisconnectingDuplexConnection (
587- "Client" ,
588- duplexConnection ,
589- Duration .ofMillis (
590- ThreadLocalRandom .current ().nextInt (10 , 1500 )))
591- : duplexConnection );
592- }
593- });
594-
595- if (withResumability ) {
596- rSocketConnector .resume (
597- new Resume ()
598- .storeFactory (
599- token -> new InMemoryResumableFramesStore ("client" , token , Integer .MAX_VALUE )));
600- }
579+ if (withRandomFragmentation ) {
580+ rSocketServer .fragment (ThreadLocalRandom .current ().nextInt (256 , 512 ));
581+ }
601582
602- if (withRandomFragmentation ) {
603- rSocketConnector .fragment (ThreadLocalRandom .current ().nextInt (256 , 512 ));
604- }
583+ server =
584+ rSocketServer .bind (serverTransportSupplier .apply (address , allocatorToSupply2 )).block ();
585+
586+ final RSocketConnector rSocketConnector =
587+ RSocketConnector .create ()
588+ .payloadDecoder (PayloadDecoder .ZERO_COPY )
589+ .keepAlive (
590+ Duration .ofMillis (10 ), Duration .ofSeconds (10 ))
591+ .interceptors (
592+ registry -> {
593+ if (runClientWithAsyncInterceptors && !withResumability ) {
594+ logger .info (
595+ "Perform Integration Test with Async Interceptors Enabled For Client" );
596+ registry
597+ .forConnection (
598+ (type , duplexConnection ) ->
599+ new AsyncDuplexConnection (duplexConnection ))
600+ .forSocketAcceptor (
601+ delegate ->
602+ (connectionSetupPayload , sendingSocket ) ->
603+ delegate
604+ .accept (connectionSetupPayload , sendingSocket )
605+ .subscribeOn (Schedulers .parallel ()));
606+ }
607+
608+ if (withResumability ) {
609+ registry .forConnection (
610+ (type , duplexConnection ) ->
611+ type == DuplexConnectionInterceptor .Type .SOURCE
612+ ? new DisconnectingDuplexConnection (
613+ "Client" ,
614+ duplexConnection ,
615+ Duration .ofMillis (
616+ ThreadLocalRandom .current ().nextInt (10 , 1500 )))
617+ : duplexConnection );
618+ }
619+ });
620+
621+ if (withResumability ) {
622+ rSocketConnector .resume (
623+ new Resume ()
624+ .storeFactory (
625+ token -> new InMemoryResumableFramesStore ("client" , token , Integer .MAX_VALUE )));
626+ }
627+
628+ if (withRandomFragmentation ) {
629+ rSocketConnector .fragment (ThreadLocalRandom .current ().nextInt (256 , 512 ));
630+ }
605631
606- client =
607- rSocketConnector
608- .connect (clientTransportSupplier .apply (address , server , allocatorToSupply1 ))
609- .doOnError (Throwable ::printStackTrace )
610- .block ();
632+ client =
633+ rSocketConnector
634+ .connect (clientTransportSupplier .apply (address , server , allocatorToSupply1 ))
635+ .doOnError (Throwable ::printStackTrace )
636+ .block ();
611637 }
612638
613639 @ Override
0 commit comments