1818 */
1919package org .jooby .reactor ;
2020
21+ import static java .util .Objects .requireNonNull ;
2122import static javaslang .API .$ ;
2223import static javaslang .API .Case ;
2324import static javaslang .API .Match ;
2425import static javaslang .Predicates .instanceOf ;
2526
26- import java .util .Optional ;
27- import java .util .function .Supplier ;
27+ import java .util .function .Function ;
2828
2929import org .jooby .Deferred ;
3030import org .jooby .Env ;
3131import org .jooby .Jooby ;
3232import org .jooby .Route ;
33- import org .jooby .Routes ;
3433
3534import com .google .inject .Binder ;
3635import com .typesafe .config .Config ;
3736import com .typesafe .config .ConfigFactory ;
3837
3938import reactor .core .publisher .Flux ;
4039import reactor .core .publisher .Mono ;
41- import reactor .core .scheduler .Scheduler ;
4240
4341/**
4442 * <h1>reactor</h1>
6563 * {
6664 * use(new Reactor());
6765 *
68- * get("/", req -> Flux.just("reactive programming in jooby!"))
69- * .map(Reactor.reactor());
66+ * get("/", req -> Flux.just("reactive programming in jooby!"));
7067 * }
7168 * }</pre>
7269 *
9188 * }</pre>
9289 *
9390 * <p>
94- * Translation is done with the {@link Reactor#reactor()} route operator . If you are a
91+ * Translation is done via {@link Reactor#reactor()} route mapper . If you are a
9592 * <a href="http://projectreactor.io">reactor</a> programmer then you don't need to worry
9693 * for learning a new API and semantic. The {@link Reactor#reactor()} route operator deal and take
9794 * cares of the {@link Deferred} API.
9895 * </p>
9996 *
100- * <h2>reactor() </h2>
97+ * <h2>reactor mapper </h2>
10198 * <p>
102- * We just learn that we are not force to learn a new API, just write
103- * <a href="http://projectreactor.io">reactor</a> code. That's cool!
104- * </p>
105- *
106- * <p>
107- * But.. what if you have 10 routes? 50 routes?
108- * </p>
109- *
110- * <pre>{@code
111- *
112- * ...
113- * import org.jooby.reactor.Reactor;
114- * ...
115- *
116- * {
117- * use(new Reactor());
118- *
119- * get("/1", req -> Observable...)
120- * .map(Reactor.reactor());
121- *
122- * get("/2", req -> Observable...)
123- * .map(Reactor.reactor());
124- *
125- * ....
126- *
127- * get("/N", req -> Observable...)
128- * .map(Reactor.reactor());
129- * }
130- * }</pre>
131- *
132- * <p>
133- * This is better than written N routes using the {@link Deferred} API route by route... but still
134- * there is one more option to help you (and your fingers) to right less code:
99+ * Advanced flux/mono configuration is allowed via function adapters:
135100 * </p>
136101 *
137102 * <pre>{@code
138- * ...
139- * import org.jooby.reactor.Reactor;
140- * ...
141- *
142- * {
143- * use(new Reactor());
144- *
145- * with(() -> {
146- * get("/1", req -> Observable...);
147- *
148- * get("/2", req -> Observable...);
149- *
150- * ....
151- *
152- * get("/N", req -> Observable...);
153- *
154- * }).map(Reactor.reactor());
155- * }
156- * }</pre>
157- *
158- * <p>
159- * <strong>Beautiful, hugh?</strong>
160- * </p>
161- *
162- * <p>
163- * The {@link Routes#with(Runnable) with} operator let you group any number of routes and apply
164- * common attributes and/or operator to all them!!!
165- * </p>
166- *
167- * <h2>reactor()+scheduler</h2>
168- * <p>
169- * You can provide a {@link Scheduler} to the {@link #reactor(Supplier)} operator:
170- * </p>
171103 *
172- * <pre>{@code
173104 * ...
174105 * import org.jooby.reactor.Reactor;
175106 * ...
176107 *
177108 * {
178- * use(new Reactor());
179- *
180- * with(() -> {
181- * get("/1", req -> Observable...);
182- *
183- * get("/2", req -> Observable...);
109+ * use(new Reactor()
110+ * .withFlux(f -> f.publishOn(Computations.concurrent())
111+ * .withMono(m -> m.publishOn(Computations.concurrent()));
184112 *
185- * ....
113+ * get("/flux", req -> Flux ...);
186114 *
187- * get("/N ", req -> Observable ...);
115+ * get("/mono ", req -> Mono ...);
188116 *
189- * }).map(Reactor.reactor(Computations::concurrent));
190117 * }
191118 * }</pre>
192119 *
193120 * <p>
194- * All the routes here will {@link Flux#subscribeOn(Scheduler) subscribe-on} the provided
195- * {@link Scheduler}.
121+ * Here every Flux/Mono from a route handler will publish on the <code>concurrent</code> scheduler.
196122 * </p>
197123 *
198124 * @author edgar
199125 * @since 1.0.0.CR3
200126 */
127+ @ SuppressWarnings ("rawtypes" )
201128public class Reactor implements Jooby .Module {
202129
130+ private Function <Flux , Flux > flux = Function .identity ();
131+
132+ private Function <Mono , Mono > mono = Function .identity ();
133+
134+ public Reactor withFlux (final Function <Flux , Flux > adapter ) {
135+ this .flux = requireNonNull (adapter , "Flux's adapter is required." );
136+ return this ;
137+ }
138+
139+ public Reactor withMono (final Function <Mono , Mono > adapter ) {
140+ this .mono = requireNonNull (adapter , "Mono's adapter is required." );
141+ return this ;
142+ }
143+
203144 /**
204145 * Map a reactor object like {@link Flux} or {@link Mono} into a {@link Deferred} object.
205146 *
@@ -209,26 +150,38 @@ public class Reactor implements Jooby.Module {
209150 * ...
210151 *
211152 * {
212- * use(new Reactor());
213- *
214153 * with(() -> {
215- * get("/1", req -> Observable...);
216- *
217- * get("/2", req -> Observable...);
154+ * get("/lux", req -> Flux...);
218155 *
219- * ....
156+ * get("/mono", req -> Mono ...);
220157 *
221- * get("/N", req -> Observable...);
222- *
223- * }).map(Reactor.reactor( ));
158+ * }).map(Reactor.reactor(
159+ * flux -> flux.publishOn(Computations.concurrent()),
160+ * mono -> mono.publishOn(Computations.concurrent() ));
224161 * }
225162 * }</pre>
226163 *
227- * @param subscribeOn An scheduler to subscribeOn.
164+ * @param flux A flux adapter.
165+ * @param mono A mono adapter.
228166 * @return A new mapper.
229167 */
230- public static Route .Mapper <Object > reactor (final Supplier <Scheduler > subscribeOn ) {
231- return reactor (Optional .of (subscribeOn ));
168+ @ SuppressWarnings ("unchecked" )
169+ public static Route .Mapper <Object > reactor (final Function <Flux , Flux > flux ,
170+ final Function <Mono , Mono > mono ) {
171+ requireNonNull (flux , "Flux's adapter is required." );
172+ requireNonNull (mono , "Mono's adapter is required." );
173+ return Route .Mapper .create ("reactor" , value -> Match (value ).of (
174+ /** Flux: */
175+ Case (instanceOf (Flux .class ),
176+ it -> new Deferred (deferred -> flux .apply (it )
177+ .consume (deferred ::set , deferred ::set ))),
178+ /** Mono: */
179+ Case (instanceOf (Mono .class ),
180+ it -> new Deferred (deferred -> mono .apply (it )
181+ .consume (deferred ::set , deferred ::set ))),
182+ /** Ignore */
183+ Case ($ (), value )));
184+
232185 }
233186
234187 /**
@@ -240,48 +193,29 @@ public static Route.Mapper<Object> reactor(final Supplier<Scheduler> subscribeOn
240193 * ...
241194 *
242195 * {
243- * use(new Reactor());
244- *
245196 * with(() -> {
246- * get("/1", req -> Observable...);
247- *
248- * get("/2", req -> Observable...);
249- *
250- * ....
197+ * get("/lux", req -> Flux...);
251198 *
252- * get("/N ", req -> Observable ...);
199+ * get("/mono ", req -> Mono ...);
253200 *
254- * }).map(Reactor.reactor());
201+ * }).map(Reactor.reactor(
202+ * flux -> flux.publishOn(Computations.concurrent()),
203+ * mono -> mono.publishOn(Computations.concurrent()));
255204 * }
256205 * }</pre>
257206 *
207+ * @param flux A flux adapter.
208+ * @param mono A mono adapter.
258209 * @return A new mapper.
259210 */
260211 public static Route .Mapper <Object > reactor () {
261- return reactor (Optional .empty ());
262- }
263-
264- @ SuppressWarnings ("unchecked" )
265- private static Route .Mapper <Object > reactor (final Optional <Supplier <Scheduler >> subscribeOn ) {
266- return Route .Mapper .create ("reactor" , value -> Match (value ).of (
267- /** Flux: */
268- Case (instanceOf (Flux .class ),
269- it -> new Deferred (deferred -> subscribeOn
270- .map (s -> it .subscribeOn (s .get ()))
271- .orElse (it )
272- .consume (deferred ::set , deferred ::set ))),
273- /** Mono: */
274- Case (instanceOf (Mono .class ),
275- it -> new Deferred (deferred -> subscribeOn
276- .map (s -> it .subscribeOn (s .get ()))
277- .orElse (it )
278- .consume (deferred ::set , deferred ::set ))),
279- /** Ignore */
280- Case ($ (), value )));
212+ return reactor (Function .identity (), Function .identity ());
281213 }
282214
283215 @ Override
284216 public void configure (final Env env , final Config conf , final Binder binder ) {
217+ env .routes ()
218+ .map (reactor (flux , mono ));
285219 }
286220
287221 @ Override
0 commit comments