11/*
2- * Copyright 2015-2020 the original author or authors.
2+ * Copyright 2015-2021 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
3939class RSocketPool extends ResolvingOperator <Object >
4040 implements CoreSubscriber <List <LoadbalanceTarget >> {
4141
42- final DeferredResolutionRSocket deferredResolutionRSocket = new DeferredResolutionRSocket (this );
43- final RSocketConnector connector ;
44- final LoadbalanceStrategy loadbalanceStrategy ;
45-
46- volatile PooledRSocket [] activeSockets ;
47-
4842 static final AtomicReferenceFieldUpdater <RSocketPool , PooledRSocket []> ACTIVE_SOCKETS =
4943 AtomicReferenceFieldUpdater .newUpdater (
5044 RSocketPool .class , PooledRSocket [].class , "activeSockets" );
51-
5245 static final PooledRSocket [] EMPTY = new PooledRSocket [0 ];
5346 static final PooledRSocket [] TERMINATED = new PooledRSocket [0 ];
54-
55- volatile Subscription s ;
5647 static final AtomicReferenceFieldUpdater <RSocketPool , Subscription > S =
5748 AtomicReferenceFieldUpdater .newUpdater (RSocketPool .class , Subscription .class , "s" );
49+ final DeferredResolutionRSocket deferredResolutionRSocket = new DeferredResolutionRSocket (this );
50+ final RSocketConnector connector ;
51+ final LoadbalanceStrategy loadbalanceStrategy ;
52+ volatile PooledRSocket [] activeSockets ;
53+ volatile Subscription s ;
5854
5955 public RSocketPool (
6056 RSocketConnector connector ,
@@ -85,31 +81,27 @@ public void onSubscribe(Subscription s) {
8581 }
8682 }
8783
88- /**
89- * This operation should happen rarely relatively compares the number of the {@link #select()}
90- * method invocations, therefore it is acceptable to have it algorithmically inefficient. The
91- * algorithmic complexity of this method is
92- *
93- * @param targets set which represents RSocket targets to balance on
94- */
9584 @ Override
9685 public void onNext (List <LoadbalanceTarget > targets ) {
9786 if (isDisposed ()) {
9887 return ;
9988 }
10089
90+ // This operation should happen less frequently than calls to select() (which are per request)
91+ // and therefore it is acceptable somewhat less efficient.
92+
10193 PooledRSocket [] previouslyActiveSockets ;
102- PooledRSocket [] activeSockets ;
10394 PooledRSocket [] inactiveSockets ;
95+ PooledRSocket [] socketsToUse ;
10496 for (; ; ) {
105- HashMap <LoadbalanceTarget , Integer > rSocketSuppliersCopy = new HashMap <>();
97+ HashMap <LoadbalanceTarget , Integer > rSocketSuppliersCopy = new HashMap <>(targets . size () );
10698
10799 int j = 0 ;
108100 for (LoadbalanceTarget target : targets ) {
109101 rSocketSuppliersCopy .put (target , j ++);
110102 }
111103
112- // checking intersection of active RSocket with the newly received set
104+ // Intersect current and new list of targets and find the ones to keep vs dispose
113105 previouslyActiveSockets = this .activeSockets ;
114106 inactiveSockets = new PooledRSocket [previouslyActiveSockets .length ];
115107 PooledRSocket [] nextActiveSockets =
@@ -141,20 +133,18 @@ public void onNext(List<LoadbalanceTarget> targets) {
141133 }
142134 }
143135
144- // going though brightly new rsocket
136+ // The remainder are the brand new targets
145137 for (LoadbalanceTarget target : rSocketSuppliersCopy .keySet ()) {
146138 nextActiveSockets [activeSocketsPosition ++] =
147139 new PooledRSocket (this , this .connector .connect (target .getTransport ()), target );
148140 }
149141
150- // shrank to actual length
151142 if (activeSocketsPosition == 0 ) {
152- activeSockets = EMPTY ;
143+ socketsToUse = EMPTY ;
153144 } else {
154- activeSockets = Arrays .copyOf (nextActiveSockets , activeSocketsPosition );
145+ socketsToUse = Arrays .copyOf (nextActiveSockets , activeSocketsPosition );
155146 }
156-
157- if (ACTIVE_SOCKETS .compareAndSet (this , previouslyActiveSockets , activeSockets )) {
147+ if (ACTIVE_SOCKETS .compareAndSet (this , previouslyActiveSockets , socketsToUse )) {
158148 break ;
159149 }
160150 }
@@ -169,7 +159,7 @@ public void onNext(List<LoadbalanceTarget> targets) {
169159
170160 if (isPending ()) {
171161 // notifies that upstream is resolved
172- if (activeSockets != EMPTY ) {
162+ if (socketsToUse != EMPTY ) {
173163 //noinspection ConstantConditions
174164 complete (this );
175165 }
0 commit comments