Skip to content

Commit 41eec02

Browse files
committed
migrates to RequestInterceptor to track Stats
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
1 parent e122454 commit 41eec02

17 files changed

Lines changed: 1521 additions & 533 deletions

benchmarks/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ Specify extra profilers:
1717
Prominent profilers (for full list call `jmhProfilers` task):
1818
- comp - JitCompilations, tune your iterations
1919
- stack - which methods used most time
20-
- gc - print garbage collection stats
20+
- gc - print garbage collection defaultWeightedStats
2121
- hs_thr - thread usage
2222

2323
Change report format from JSON to one of [CSV, JSON, NONE, SCSV, TEXT]:
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
package io.rsocket.loadbalance;
2+
3+
import io.rsocket.util.Clock;
4+
import java.util.concurrent.TimeUnit;
5+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
6+
7+
/**
8+
* The base implementation of the {@link WeightedStats} interface
9+
*
10+
* @since 1.1
11+
*/
12+
public class BaseWeightedStats implements WeightedStats {
13+
14+
private static final double DEFAULT_LOWER_QUANTILE = 0.5;
15+
private static final double DEFAULT_HIGHER_QUANTILE = 0.8;
16+
private static final int INACTIVITY_FACTOR = 500;
17+
private static final long DEFAULT_INITIAL_INTER_ARRIVAL_TIME =
18+
Clock.unit().convert(1L, TimeUnit.SECONDS);
19+
20+
private static final double STARTUP_PENALTY = Long.MAX_VALUE >> 12;
21+
22+
private final Quantile lowerQuantile;
23+
private final Quantile higherQuantile;
24+
private final Ewma availabilityPercentage;
25+
private final Median median;
26+
private final Ewma interArrivalTime;
27+
28+
private final long tau;
29+
private final long inactivityFactor;
30+
31+
private long errorStamp; // last we got an error
32+
private long stamp; // last timestamp we sent a request
33+
private long stamp0; // last timestamp we sent a request or receive a response
34+
private long duration; // instantaneous cumulative duration
35+
36+
private double availability = 1.0;
37+
38+
private volatile int pendingRequests; // instantaneous rate
39+
private static final AtomicIntegerFieldUpdater<BaseWeightedStats> PENDING_REQUESTS =
40+
AtomicIntegerFieldUpdater.newUpdater(BaseWeightedStats.class, "pendingRequests");
41+
private volatile int pendingStreams; // number of active streams
42+
private static final AtomicIntegerFieldUpdater<BaseWeightedStats> PENDING_STREAMS =
43+
AtomicIntegerFieldUpdater.newUpdater(BaseWeightedStats.class, "pendingStreams");
44+
45+
protected BaseWeightedStats() {
46+
this(
47+
new FrugalQuantile(DEFAULT_LOWER_QUANTILE),
48+
new FrugalQuantile(DEFAULT_HIGHER_QUANTILE),
49+
INACTIVITY_FACTOR);
50+
}
51+
52+
private BaseWeightedStats(
53+
Quantile lowerQuantile, Quantile higherQuantile, long inactivityFactor) {
54+
this.lowerQuantile = lowerQuantile;
55+
this.higherQuantile = higherQuantile;
56+
this.inactivityFactor = inactivityFactor;
57+
58+
long now = Clock.now();
59+
this.stamp = now;
60+
this.errorStamp = now;
61+
this.stamp0 = now;
62+
this.duration = 0L;
63+
this.pendingRequests = 0;
64+
this.median = new Median();
65+
this.interArrivalTime = new Ewma(1, TimeUnit.MINUTES, DEFAULT_INITIAL_INTER_ARRIVAL_TIME);
66+
this.availabilityPercentage = new Ewma(5, TimeUnit.SECONDS, 1.0);
67+
this.tau = Clock.unit().convert((long) (5 / Math.log(2)), TimeUnit.SECONDS);
68+
}
69+
70+
@Override
71+
public double lowerQuantileLatency() {
72+
return lowerQuantile.estimation();
73+
}
74+
75+
@Override
76+
public double higherQuantileLatency() {
77+
return higherQuantile.estimation();
78+
}
79+
80+
@Override
81+
public int pending() {
82+
return pendingRequests + pendingStreams;
83+
}
84+
85+
@Override
86+
public double availability() {
87+
if (Clock.now() - stamp > tau) {
88+
updateAvailability(1.0);
89+
}
90+
return availability * availabilityPercentage.value();
91+
}
92+
93+
@Override
94+
public double predictedLatency() {
95+
final long now = Clock.now();
96+
final long elapsed;
97+
98+
synchronized (this) {
99+
elapsed = Math.max(now - stamp, 1L);
100+
}
101+
102+
final double latency;
103+
final double prediction = median.estimation();
104+
105+
final int pending = this.pending();
106+
if (prediction == 0.0) {
107+
if (pending == 0) {
108+
latency = 0.0; // first request
109+
} else {
110+
// subsequent requests while we don't have any history
111+
latency = STARTUP_PENALTY + pending;
112+
}
113+
} else if (pending == 0 && elapsed > inactivityFactor * interArrivalTime.value()) {
114+
// if we did't see any data for a while, we decay the prediction by inserting
115+
// artificial 0.0 into the median
116+
median.insert(0.0);
117+
latency = median.estimation();
118+
} else {
119+
final double predicted = prediction * pending;
120+
final double instant = instantaneous(now, pending);
121+
122+
if (predicted < instant) { // NB: (0.0 < 0.0) == false
123+
latency = instant / pending; // NB: pending never equal 0 here
124+
} else {
125+
// we are under the predictions
126+
latency = prediction;
127+
}
128+
}
129+
130+
return latency;
131+
}
132+
133+
long instantaneous(long now, int pending) {
134+
return duration + (now - stamp0) * pending;
135+
}
136+
137+
void startStream() {
138+
PENDING_STREAMS.incrementAndGet(this);
139+
}
140+
141+
void stopStream() {
142+
PENDING_STREAMS.decrementAndGet(this);
143+
}
144+
145+
synchronized long startRequest() {
146+
final long now = Clock.now();
147+
final int pendingRequests = this.pendingRequests;
148+
149+
interArrivalTime.insert(now - stamp);
150+
duration += Math.max(0, now - stamp0) * pendingRequests;
151+
PENDING_REQUESTS.lazySet(this, pendingRequests + 1);
152+
stamp = now;
153+
stamp0 = now;
154+
155+
return now;
156+
}
157+
158+
synchronized long stopRequest(long timestamp) {
159+
final long now = Clock.now();
160+
final int pendingRequests = this.pendingRequests;
161+
162+
duration += Math.max(0, now - stamp0) * pendingRequests - (now - timestamp);
163+
PENDING_REQUESTS.lazySet(this, pendingRequests - 1);
164+
stamp0 = now;
165+
166+
return now;
167+
}
168+
169+
synchronized void record(double roundTripTime) {
170+
median.insert(roundTripTime);
171+
lowerQuantile.insert(roundTripTime);
172+
higherQuantile.insert(roundTripTime);
173+
}
174+
175+
void updateAvailability(double value) {
176+
availabilityPercentage.insert(value);
177+
if (value == 0.0d) {
178+
synchronized (this) {
179+
errorStamp = Clock.now();
180+
}
181+
}
182+
}
183+
184+
void setAvailability(double availability) {
185+
this.availability = availability;
186+
}
187+
188+
@Override
189+
public String toString() {
190+
return "Stats{"
191+
+ "lowerQuantile="
192+
+ lowerQuantile.estimation()
193+
+ ", higherQuantile="
194+
+ higherQuantile.estimation()
195+
+ ", inactivityFactor="
196+
+ inactivityFactor
197+
+ ", tau="
198+
+ tau
199+
+ ", errorPercentage="
200+
+ availabilityPercentage.value()
201+
+ ", pending="
202+
+ pendingRequests
203+
+ ", errorStamp="
204+
+ errorStamp
205+
+ ", stamp="
206+
+ stamp
207+
+ ", stamp0="
208+
+ stamp0
209+
+ ", duration="
210+
+ duration
211+
+ ", median="
212+
+ median.estimation()
213+
+ ", interArrivalTime="
214+
+ interArrivalTime.value()
215+
+ ", pendingStreams="
216+
+ pendingStreams
217+
+ ", availability="
218+
+ availability
219+
+ '}';
220+
}
221+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.rsocket.loadbalance;
2+
3+
import io.rsocket.core.RSocketConnector;
4+
5+
/**
6+
* Extension for {@link LoadbalanceStrategy} which allows pre-setup {@link RSocketConnector} for
7+
* {@link LoadbalanceStrategy} needs
8+
*
9+
* @since 1.1
10+
*/
11+
public interface ClientLoadbalanceStrategy extends LoadbalanceStrategy {
12+
13+
void initialize(RSocketConnector connector);
14+
}

rsocket-core/src/main/java/io/rsocket/loadbalance/Ewma.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.rsocket.util.Clock;
2020
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2122

2223
/**
2324
* Compute the exponential weighted moving average of a series of values. The time at which you
@@ -28,20 +29,27 @@
2829
* equal to (200 - 100)/2 = 150 (half of the distance between the new and the old value)
2930
*/
3031
class Ewma {
31-
private final long tau;
32-
private volatile long stamp;
33-
private volatile double ewma;
32+
33+
final long tau;
34+
35+
volatile long stamp;
36+
static final AtomicLongFieldUpdater<Ewma> STAMP =
37+
AtomicLongFieldUpdater.newUpdater(Ewma.class, "stamp");
38+
volatile double ewma;
3439

3540
public Ewma(long halfLife, TimeUnit unit, double initialValue) {
3641
this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit);
37-
stamp = 0L;
38-
ewma = initialValue;
42+
43+
this.ewma = initialValue;
44+
45+
STAMP.lazySet(this, 0L);
3946
}
4047

4148
public synchronized void insert(double x) {
42-
long now = Clock.now();
43-
double elapsed = Math.max(0, now - stamp);
44-
stamp = now;
49+
final long now = Clock.now();
50+
final double elapsed = Math.max(0, now - stamp);
51+
52+
STAMP.lazySet(this, now);
4553

4654
double w = Math.exp(-elapsed / tau);
4755
ewma = w * ewma + (1.0 - w) * x;

rsocket-core/src/main/java/io/rsocket/loadbalance/FluxDeferredResolution.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public final Context currentContext() {
8383

8484
@Nullable
8585
@Override
86-
public Object scanUnsafe(Attr key) {
86+
public final Object scanUnsafe(Attr key) {
8787
long state = this.requested;
8888

8989
if (key == Attr.PARENT) {
@@ -145,7 +145,7 @@ public final void onNext(Payload payload) {
145145
}
146146

147147
@Override
148-
public void onError(Throwable t) {
148+
public final void onError(Throwable t) {
149149
if (this.done) {
150150
Operators.onErrorDropped(t, this.actual.currentContext());
151151
return;
@@ -156,7 +156,7 @@ public void onError(Throwable t) {
156156
}
157157

158158
@Override
159-
public void onComplete() {
159+
public final void onComplete() {
160160
if (this.done) {
161161
return;
162162
}
@@ -206,7 +206,7 @@ public final void request(long n) {
206206
}
207207
}
208208

209-
public void cancel() {
209+
public final void cancel() {
210210
long state = REQUESTED.getAndSet(this, STATE_TERMINATED);
211211
if (state == STATE_TERMINATED) {
212212
return;

rsocket-core/src/main/java/io/rsocket/loadbalance/FrugalQuantile.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@
2626
* <p>More info: http://blog.aggregateknowledge.com/2013/09/16/sketch-of-the-day-frugal-streaming/
2727
*/
2828
class FrugalQuantile implements Quantile {
29-
private final double increment;
30-
volatile double estimate;
29+
final double increment;
30+
final SplittableRandom rnd;
31+
3132
int step;
3233
int sign;
33-
private double quantile;
34-
private SplittableRandom rnd;
34+
double quantile;
35+
36+
volatile double estimate;
3537

3638
public FrugalQuantile(double quantile, double increment) {
3739
this.increment = increment;

0 commit comments

Comments
 (0)