2525import io .rsocket .DuplexConnection ;
2626import io .rsocket .Payload ;
2727import io .rsocket .frame .FrameType ;
28+ import io .rsocket .plugins .RequestInterceptor ;
2829import java .time .Duration ;
2930import java .util .concurrent .atomic .AtomicLongFieldUpdater ;
3031import org .reactivestreams .Subscription ;
@@ -51,21 +52,31 @@ final class FireAndForgetRequesterMono extends Mono<Void> implements Subscriptio
5152 final RequesterResponderSupport requesterResponderSupport ;
5253 final DuplexConnection connection ;
5354
55+ @ Nullable final RequestInterceptor requestInterceptor ;
56+
5457 FireAndForgetRequesterMono (Payload payload , RequesterResponderSupport requesterResponderSupport ) {
5558 this .allocator = requesterResponderSupport .getAllocator ();
5659 this .payload = payload ;
5760 this .mtu = requesterResponderSupport .getMtu ();
5861 this .maxFrameLength = requesterResponderSupport .getMaxFrameLength ();
5962 this .requesterResponderSupport = requesterResponderSupport ;
6063 this .connection = requesterResponderSupport .getDuplexConnection ();
64+ this .requestInterceptor = requesterResponderSupport .getRequestInterceptor ();
6165 }
6266
6367 @ Override
6468 public void subscribe (CoreSubscriber <? super Void > actual ) {
6569 long previousState = markSubscribed (STATE , this );
6670 if (isSubscribedOrTerminated (previousState )) {
67- Operators .error (
68- actual , new IllegalStateException ("FireAndForgetMono allows only a single Subscriber" ));
71+ final IllegalStateException e =
72+ new IllegalStateException ("FireAndForgetMono allows only a single Subscriber" );
73+
74+ final RequestInterceptor requestInterceptor = this .requestInterceptor ;
75+ if (requestInterceptor != null ) {
76+ requestInterceptor .onReject (e , FrameType .REQUEST_FNF , null );
77+ }
78+
79+ Operators .error (actual , e );
6980 return ;
7081 }
7182
@@ -76,14 +87,28 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
7687 try {
7788 if (!isValid (mtu , this .maxFrameLength , p , false )) {
7889 lazyTerminate (STATE , this );
79- p . release ();
80- actual . onError (
90+
91+ final IllegalArgumentException e =
8192 new IllegalArgumentException (
82- String .format (INVALID_PAYLOAD_ERROR_MESSAGE , this .maxFrameLength )));
93+ String .format (INVALID_PAYLOAD_ERROR_MESSAGE , this .maxFrameLength ));
94+ final RequestInterceptor requestInterceptor = this .requestInterceptor ;
95+ if (requestInterceptor != null ) {
96+ requestInterceptor .onReject (e , FrameType .REQUEST_FNF , p .metadata ());
97+ }
98+
99+ p .release ();
100+
101+ actual .onError (e );
83102 return ;
84103 }
85104 } catch (IllegalReferenceCountException e ) {
86105 lazyTerminate (STATE , this );
106+
107+ final RequestInterceptor requestInterceptor = this .requestInterceptor ;
108+ if (requestInterceptor != null ) {
109+ requestInterceptor .onReject (e , FrameType .REQUEST_FNF , null );
110+ }
111+
87112 actual .onError (e );
88113 return ;
89114 }
@@ -93,26 +118,54 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
93118 streamId = this .requesterResponderSupport .getNextStreamId ();
94119 } catch (Throwable t ) {
95120 lazyTerminate (STATE , this );
121+
122+ final Throwable ut = Exceptions .unwrap (t );
123+ final RequestInterceptor requestInterceptor = this .requestInterceptor ;
124+ if (requestInterceptor != null ) {
125+ requestInterceptor .onReject (ut , FrameType .REQUEST_FNF , p .metadata ());
126+ }
127+
96128 p .release ();
97- actual .onError (Exceptions .unwrap (t ));
129+
130+ actual .onError (ut );
98131 return ;
99132 }
100133
134+ final RequestInterceptor interceptor = this .requestInterceptor ;
135+ if (interceptor != null ) {
136+ interceptor .onStart (streamId , FrameType .REQUEST_FNF , p .metadata ());
137+ }
138+
101139 try {
102140 if (isTerminated (this .state )) {
103141 p .release ();
142+
143+ if (interceptor != null ) {
144+ interceptor .onCancel (streamId );
145+ }
146+
104147 return ;
105148 }
106149
107150 sendReleasingPayload (
108151 streamId , FrameType .REQUEST_FNF , mtu , p , this .connection , this .allocator , true );
109152 } catch (Throwable e ) {
110153 lazyTerminate (STATE , this );
154+
155+ if (interceptor != null ) {
156+ interceptor .onTerminate (streamId , e );
157+ }
158+
111159 actual .onError (e );
112160 return ;
113161 }
114162
115163 lazyTerminate (STATE , this );
164+
165+ if (interceptor != null ) {
166+ interceptor .onTerminate (streamId , null );
167+ }
168+
116169 actual .onComplete ();
117170 }
118171
@@ -137,19 +190,41 @@ public Void block(Duration m) {
137190 public Void block () {
138191 long previousState = markSubscribed (STATE , this );
139192 if (isSubscribedOrTerminated (previousState )) {
140- throw new IllegalStateException ("FireAndForgetMono allows only a single Subscriber" );
193+ final IllegalStateException e =
194+ new IllegalStateException ("FireAndForgetMono allows only a single Subscriber" );
195+ final RequestInterceptor requestInterceptor = this .requestInterceptor ;
196+ if (requestInterceptor != null ) {
197+ requestInterceptor .onReject (e , FrameType .REQUEST_FNF , null );
198+ }
199+ throw e ;
141200 }
142201
143202 final Payload p = this .payload ;
144203 try {
145204 if (!isValid (this .mtu , this .maxFrameLength , p , false )) {
146205 lazyTerminate (STATE , this );
206+
207+ final IllegalArgumentException e =
208+ new IllegalArgumentException (
209+ String .format (INVALID_PAYLOAD_ERROR_MESSAGE , this .maxFrameLength ));
210+
211+ final RequestInterceptor requestInterceptor = this .requestInterceptor ;
212+ if (requestInterceptor != null ) {
213+ requestInterceptor .onReject (e , FrameType .REQUEST_FNF , p .metadata ());
214+ }
215+
147216 p .release ();
148- throw new IllegalArgumentException (
149- String . format ( INVALID_PAYLOAD_ERROR_MESSAGE , this . maxFrameLength )) ;
217+
218+ throw e ;
150219 }
151220 } catch (IllegalReferenceCountException e ) {
152221 lazyTerminate (STATE , this );
222+
223+ final RequestInterceptor requestInterceptor = this .requestInterceptor ;
224+ if (requestInterceptor != null ) {
225+ requestInterceptor .onReject (e , FrameType .REQUEST_FNF , null );
226+ }
227+
153228 throw Exceptions .propagate (e );
154229 }
155230
@@ -158,10 +233,22 @@ public Void block() {
158233 streamId = this .requesterResponderSupport .getNextStreamId ();
159234 } catch (Throwable t ) {
160235 lazyTerminate (STATE , this );
236+
237+ final RequestInterceptor requestInterceptor = this .requestInterceptor ;
238+ if (requestInterceptor != null ) {
239+ requestInterceptor .onReject (Exceptions .unwrap (t ), FrameType .REQUEST_FNF , p .metadata ());
240+ }
241+
161242 p .release ();
243+
162244 throw Exceptions .propagate (t );
163245 }
164246
247+ final RequestInterceptor interceptor = this .requestInterceptor ;
248+ if (interceptor != null ) {
249+ interceptor .onStart (streamId , FrameType .REQUEST_FNF , p .metadata ());
250+ }
251+
165252 try {
166253 sendReleasingPayload (
167254 streamId ,
@@ -173,10 +260,20 @@ public Void block() {
173260 true );
174261 } catch (Throwable e ) {
175262 lazyTerminate (STATE , this );
263+
264+ if (interceptor != null ) {
265+ interceptor .onTerminate (streamId , e );
266+ }
267+
176268 throw Exceptions .propagate (e );
177269 }
178270
179271 lazyTerminate (STATE , this );
272+
273+ if (interceptor != null ) {
274+ interceptor .onTerminate (streamId , null );
275+ }
276+
180277 return null ;
181278 }
182279
0 commit comments