@@ -376,7 +376,7 @@ public void onError(Throwable t) {
376376 }
377377
378378 // called only from listenerExecutor context
379- protected void revisionUpdate (long upToRev ) {
379+ protected void revisionUpdate (final long upToRev ) {
380380 if (seenUpToRev .get () >= upToRev ) {
381381 return ;
382382 }
@@ -511,31 +511,37 @@ protected KeyValue offerUpdate(final KeyValue keyValue, boolean watchThread) {
511511 continue ; // update failed
512512 }
513513 // update succeeded
514+ boolean deletionReplaced = isDeleted (existKv );
515+ if (deletionReplaced ) {
516+ deletionQueue .remove (existKv );
517+ }
514518 if (isDeleted ) {
515519 deletionQueue .add (keyValue );
516- if (!isDeleted ( existKv ) ) { // previous value
520+ if (!deletionReplaced ) { // previous value
517521 notifyListeners (EventType .DELETED , existKv , watchThread );
518522 }
519523 return null ;
520524 }
521- // added or updated
522- notifyListeners (EventType .UPDATED , keyValue , false );
523- return keyValue ;
525+ break ; // added or updated
524526 }
525527 // here existKv == null
526528 if (modRevision <= seenUpToRev .get ()) {
527529 return null ;
528530 }
531+ //TODO in some cases it might be better to return null here
532+ // when isDeleted == true, rather than inserting deletion record
533+ // (e.g. when this comes from a getRemote() or keyExistsRemote() call)
529534 if ((existKv = entries .putIfAbsent (key , keyValue )) == null ) {
530535 // update succeeded
531536 if (isDeleted ) {
532537 deletionQueue .add (keyValue );
533538 return null ;
534539 }
535- notifyListeners (EventType .UPDATED , keyValue , false );
536- return keyValue ;
540+ break ; // added
537541 }
538542 }
543+ notifyListeners (EventType .UPDATED , keyValue , false );
544+ return keyValue ;
539545 }
540546
541547 protected static KeyValue kvOrNullIfDeleted (KeyValue fromCache ) {
@@ -588,7 +594,22 @@ public KeyValue getRemoteWeak(ByteString key) {
588594 }
589595
590596 public int size () {
591- return Math .max (0 , entries .size () - deletionQueue .size ());
597+ int total = entries .size ();
598+ if (total > 0 ) {
599+ // We need to exclude deletion records but can't just subtract
600+ // deletionQueue.size() since it can contain "stale" records
601+ // which persist until the next watch update flushes them
602+ KeyValue deletion = deletionQueue .pollFirst ();
603+ while (deletion != null ) {
604+ if (entries .get (deletion .getKey ()) != deletion ) {
605+ deletionQueue .remove (deletion );
606+ } else if (--total == 0 ) {
607+ return 0 ;
608+ }
609+ deletion = deletionQueue .higher (deletion );
610+ }
611+ }
612+ return total ;
592613 }
593614
594615 //TODO maybe add sizeRemote() ?
0 commit comments