@@ -75,7 +75,16 @@ public class GrpcClient {
7575 private final long defaultTimeoutMs ;
7676
7777 public interface AuthProvider {
78+ /**
79+ * Called from a synchronized context
80+ */
7881 CallCredentials refreshCredentials ();
82+ default CallCredentials refreshCredentials (Throwable trigger ) {
83+ return refreshCredentials ();
84+ }
85+ /**
86+ * Not called from a synchronized context
87+ */
7988 boolean requiresReauth (Throwable t );
8089 }
8190
@@ -166,7 +175,7 @@ public Executor getResponseExecutor() {
166175
167176 public void authenticateNow () {
168177 assert authProvider != NO_AUTH ;
169- reauthenticate (getCallOptions ());
178+ reauthenticate (getCallOptions (), null );
170179 }
171180
172181 protected CallOptions getCallOptions () {
@@ -201,13 +210,13 @@ public static <R> RetryDecision<R> retryDecision(boolean idempotent) {
201210 public <ReqT ,R > ListenableFuture <R > call (MethodDescriptor <ReqT ,R > method ,
202211 ReqT request , boolean idempotent ) {
203212 return call (method , null , request , null , retryDecision (idempotent ),
204- 0 , false , null , 0L );
213+ 0 , false , false , null , 0L );
205214 }
206215
207216 public <ReqT ,R > ListenableFuture <R > call (MethodDescriptor <ReqT ,R > method ,
208217 ReqT request , boolean idempotent , long timeoutMillis , Executor executor ) {
209218 return call (method , null , request , executor , retryDecision (idempotent ),
210- 0 , false , null , timeoutMillis );
219+ 0 , false , false , null , timeoutMillis );
211220 }
212221
213222 //TODO probably move this
@@ -218,14 +227,14 @@ public static interface RetryDecision<ReqT> {
218227 public <ReqT ,R > ListenableFuture <R > call (MethodDescriptor <ReqT ,R > method , Condition precondition ,
219228 ReqT request , Executor executor , RetryDecision <ReqT > retry , boolean backoff ,
220229 Deadline deadline , long timeoutMs ) {
221- return call (method , precondition , request , executor , retry , 0 , backoff , deadline , timeoutMs );
230+ return call (method , precondition , request , executor , retry , 0 , false , backoff , deadline , timeoutMs );
222231 }
223232
224233 // deadline is for entire request (including retry pauses),
225234 // timeout is per-attempt and 0 means not specified
226235 private <ReqT ,R > ListenableFuture <R > call (MethodDescriptor <ReqT ,R > method ,
227236 Condition precondition , ReqT request , Executor executor , RetryDecision <ReqT > retry ,
228- int attempt , boolean backoff , Deadline deadline , long timeoutMs ) {
237+ int attempt , boolean afterReauth , boolean backoff , Deadline deadline , long timeoutMs ) {
229238 if (precondition != null && !precondition .satisfied ()) {
230239 return failInExecutor (new CancellationException ("precondition false" ), executor );
231240 }
@@ -237,18 +246,29 @@ private <ReqT,R> ListenableFuture<R> call(MethodDescriptor<ReqT,R> method,
237246 callOpts = callOpts .withExecutor (executor );
238247 }
239248 return Futures .catchingAsync (fuCall (method , request , callOpts , timeoutMs ), Exception .class , t -> {
240- boolean reauth ;
241- // first case: if no retries to do then fail
242- if ((!backoff && attempt > 0 ) || (deadline != null && deadline .isExpired ())
243- || (!(reauth = reauthIfRequired (t , baseCallOpts ))
244- && !retry .retry (t , request ))) {
249+ // first cases: determine if we fail immediately
250+ if ((!backoff && attempt > 0 ) || (deadline != null && deadline .isExpired ())) {
251+ // multiple retries disabled or deadline expired
252+ return Futures .immediateFailedFuture (t );
253+ }
254+ boolean reauth = false ;
255+ if (authProvider .requiresReauth (t )) {
256+ if (afterReauth ) {
257+ // if we have an auth failure immediately following a reauth, give up
258+ // (important to avoid infinite loop of auth failures)
259+ return Futures .immediateFailedFuture (t );
260+ }
261+ reauthenticate (baseCallOpts , t );
262+ reauth = true ;
263+ } else if (!retry .retry (t , request )) {
264+ // retry predicate says no (non retryable request and/or error)
245265 return Futures .immediateFailedFuture (t );
246266 }
247267
248268 // second case: immediate retry (first failure or after auth failure + reauth)
249269 if (reauth || attempt == 0 && immediateRetryLimiter .tryAcquire ()) {
250270 return call (method , precondition , request , executor , retry ,
251- reauth ? attempt : 1 , backoff , deadline , timeoutMs );
271+ reauth ? attempt : 1 , reauth , backoff , deadline , timeoutMs );
252272 }
253273 int nextAttempt = attempt <= 1 ? 2 : attempt + 1 ; // skip attempt if we were rate-limited
254274
@@ -257,8 +277,8 @@ private <ReqT,R> ListenableFuture<R> call(MethodDescriptor<ReqT,R> method,
257277 if (deadline != null && deadline .timeRemaining (MILLISECONDS ) < delayMs ) {
258278 return Futures .immediateFailedFuture (t );
259279 }
260- return Futures .scheduleAsync (() -> call (method , precondition , request , executor ,
261- retry , nextAttempt , backoff , deadline , timeoutMs ), delayMs , MILLISECONDS , ses );
280+ return Futures .scheduleAsync (() -> call (method , precondition , request , executor , retry ,
281+ nextAttempt , false , backoff , deadline , timeoutMs ), delayMs , MILLISECONDS , ses );
262282 }, executor != null ? executor : directExecutor ());
263283 }
264284
@@ -313,19 +333,15 @@ protected <ReqT,R> ListenableFuture<R> fuCall(MethodDescriptor<ReqT,R> method, R
313333
314334 protected boolean retryableStreamError (Throwable error ) {
315335 return (Status .fromThrowable (error ).getCode () != Code .INVALID_ARGUMENT
316- && !causedByJavaError (error ));
317- }
318-
319- protected static boolean causedByJavaError (Throwable t ) {
320- return causedBy (t , Error .class );
336+ && !causedBy (error , Error .class ));
321337 }
322338
323339 /**
324340 * @return true if reauthentication was required and attempted
325341 */
326342 protected boolean reauthIfRequired (Throwable error , CallOptions callOpts ) {
327343 if (authProvider .requiresReauth (error )) {
328- reauthenticate (callOpts );
344+ reauthenticate (callOpts , error );
329345 return true ;
330346 }
331347 return false ;
@@ -336,18 +352,18 @@ public static boolean isConnectException(Throwable t) {
336352 }
337353
338354 public static Code codeFromThrowable (Throwable t ) {
339- Status status = Status .fromThrowable (t );
340- return status != null ? status .getCode () : null ;
355+ return Status .fromThrowable (t ).getCode (); // fromThrowable won't return null
341356 }
342357
343358
344- private void reauthenticate (CallOptions failedOpts ) {
359+ private void reauthenticate (CallOptions failedOpts , Throwable authFailure ) {
345360 // assert name != null && password != null;
346361 if (getCallOptions () == failedOpts ) { // obj identity comparison intentional
347362 synchronized (this ) {
348363 CallOptions callOpts = getCallOptions ();
349364 if (callOpts == failedOpts ) {
350- callOptions = callOpts .withCallCredentials (authProvider .refreshCredentials ());
365+ callOptions = callOpts .withCallCredentials (
366+ authProvider .refreshCredentials (authFailure ));
351367 }
352368 }
353369 }
@@ -413,6 +429,8 @@ final class ResilientBiDiStream<ReqT,RespT> {
413429 private boolean finished ;
414430 private Throwable error ;
415431
432+ private boolean lastAuthFailed ;
433+
416434 /**
417435 *
418436 * @param method
@@ -605,6 +623,7 @@ public void beforeStart(ClientCallStreamObserver<ReqT> rs) {
605623 // called from grpc response thread
606624 @ Override
607625 public void onNext (RespT value ) {
626+ lastAuthFailed = false ;
608627 respStream .onNext (value );
609628 }
610629 // called from grpc response thread
@@ -614,9 +633,10 @@ public void onError(Throwable t) {
614633 if (finished ) {
615634 finalError = true ;
616635 } else {
617- reauthed = reauthIfRequired (t , sentCallOptions );
636+ reauthed = ! lastAuthFailed && reauthIfRequired (t , sentCallOptions );
618637 finalError = !reauthed && !retryableStreamError (t );
619638 }
639+ lastAuthFailed = reauthed ;
620640 if (!finalError ) {
621641 int errCount = -1 ;
622642 String msg ;
@@ -670,6 +690,7 @@ public void onError(Throwable t) {
670690 // called from grpc response thread
671691 @ Override
672692 public void onCompleted () {
693+ lastAuthFailed = false ;
673694 if (!finished ) {
674695 logger .warn ("Unexpected onCompleted received"
675696 + " for stream of method " + method .getFullMethodName ());
0 commit comments