Skip to content

cache: Support WithProgressNotify() for cache watch#21575

Merged
serathius merged 1 commit intoetcd-io:mainfrom
akstron:dev/cache-with-progress-notify
Apr 17, 2026
Merged

cache: Support WithProgressNotify() for cache watch#21575
serathius merged 1 commit intoetcd-io:mainfrom
akstron:dev/cache-with-progress-notify

Conversation

@akstron
Copy link
Copy Markdown
Contributor

@akstron akstron commented Apr 6, 2026

Added support for WithProgressNotify() for cache watch with default interval as 10 minutes

Used AI for writing unit tests which I modified and cleaned up.

Part of #19371

@k8s-ci-robot
Copy link
Copy Markdown

Hi @akstron. Thanks for your PR.

I'm waiting for a etcd-io member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work.

Tip

We noticed you've done this a few times! Consider joining the org to skip this step and gain /lgtm and other bot rights. We recommend asking approvers on your previous PRs to sponsor you.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@akstron akstron force-pushed the dev/cache-with-progress-notify branch from 232ba16 to 1c9a661 Compare April 6, 2026 15:08
@akstron akstron marked this pull request as ready for review April 6, 2026 15:10
@serathius
Copy link
Copy Markdown
Member

serathius commented Apr 6, 2026

Please add integration tests, I don't care about AI written unit test. They just make implementation rigid. Instead please provide behavior based integration tests.

Comment thread cache/cache.go Outdated
var progressTicker *time.Ticker
var progressTickC <-chan time.Time
if progressNotify {
progressTicker = time.NewTicker(c.cfg.ProgressNotifyInterval)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ticker doesn't work as you think. Please read https://pkg.go.dev/time#NewTicker The ticker will adjust the time interval or drop ticks to make up for slow receivers

Copy link
Copy Markdown
Contributor Author

@akstron akstron Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used timer to make sure we send out progress only when ProgressNotifyInterval has passed since last event/prgress

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, I see the etcd is using ticker which means that the interval for periodic progress sent is: progressNotifyInterval <= actualProgressSentInterval < 2 * progressNotifyInterval and not almost equal to progressNotifyInterval.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might be a unintended behavior on etcd server side. I don't think the Ticker guarantees are widely known. I was only made aware of the difference when implementing progress notifier and ticker causing issues because of unstable period.

Comment thread cache/cache.go Outdated
Comment thread cache/cache.go Outdated
Comment thread cache/watcher.go Outdated
@akstron akstron force-pushed the dev/cache-with-progress-notify branch from 1c9a661 to 4d28bad Compare April 7, 2026 13:57
Comment thread cache/cache.go Outdated
case <-c.internalCtx.Done():
return
case responseChan <- resp:
if len(resp.Events) > 0 && progressTimer != nil {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if len(resp.Events) > 0 && progressTimer != nil {
if progressTimer != nil {

Sending a progress notify here also should delay a need for periodic progress notify.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment thread cache/cache.go Outdated
case <-c.internalCtx.Done():
return
case <-progressChan:
if needsProgress {
Copy link
Copy Markdown
Member

@serathius serathius Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check it? I think we can just depend on reseting timer.

My understanding of the flow:

T0 Watch is created, let's say a watch event returned reseting timer and setting needsProgress = false
T1 timer sends event to progressChan , no event is send as needsProgress is set to false. Timer is reset and needsProgress = true
T2 timer sends event to progressChan. Event is sent. Timer is reset and needsProgress = true
T3 repeat previous step.

So basically first progress notify is delayed to 2*ProgressNotifyInterval, while all following are sent with ProgressNotifyInterval.

The only question is whether we can depend on just timer, basically what happens if we call "Reset" on timer that already emitted a value. Based on Go documentation we should be good as of Go 1.23 https://pkg.go.dev/time#NewTimer:

Before Go 1.23, the channel associated with a Timer was asynchronous (buffered, capacity 1), which meant that stale time values could be received even after [Timer.Stop](https://pkg.go.dev/time#Timer.Stop) or [Timer.Reset](https://pkg.go.dev/time#Timer.Reset) returned. As of Go 1.23, the channel is synchronous (unbuffered, capacity 0), eliminating the possibility of those stale values.

Please note this flows should be tested with fake timer like progressNotifier to ensure this behavior doesn't change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. added test. Now, the above scenario is handled. The first progress notify as well all following would be sent with interval ProgressNotifyInterval

@akstron akstron force-pushed the dev/cache-with-progress-notify branch from 4d28bad to 941f0ab Compare April 9, 2026 05:56
@serathius
Copy link
Copy Markdown
Member

/ok-to-test

Comment thread cache/cache.go
//
// Note: gRPC proxy is not supported. Cache relies on RequestProgress RPCs,
// which the gRPC proxy does not forward.
func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please split constructor into two layers, first public layer for external users, second internal layer for testing/internal needs allowing us to pass clock.

func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error) {
  cfg := defaultConfig()
  for _, opt := range opts {
    opt(&cfg)
  }
  return newCache(client, prefix, cfg, realClock{})
}

func newCache(client *clientv3.Client, prefix string, cfg Config, clock Clock) (*Cache, error) {
  ...
}

Copy link
Copy Markdown
Member

@serathius serathius Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using newCache would help simplify the code in tests you added. No need to update all tests with it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done done.

Comment thread cache/watcher.go Outdated
)

// watcher holds one clients buffered stream of events.
// watcher holds one client's buffered stream of events.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove unrelated changes. This prolongs review and obfuscates git history and make cherry-picking harder. Recommend to send separate PR to fix this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Comment thread cache/cache_test.go Outdated
}, st
}

func TestCacheWatchProgressNotify(t *testing.T) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little hard to follow the tests. Maybe you could take a look into how TestConditionalProgressRequestor in progress_requestor_test.go to see how to implement similar tests in a more readable way?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about now?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you split the test in separate test focusing on a single scenario?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. thanks

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 10, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 68.40%. Comparing base (501190c) to head (236179a).
⚠️ Report is 38 commits behind head on main.

Additional details and impacted files

see 35 files with indirect coverage changes

@@            Coverage Diff             @@
##             main   #21575      +/-   ##
==========================================
+ Coverage   68.35%   68.40%   +0.05%     
==========================================
  Files         428      432       +4     
  Lines       35397    35413      +16     
==========================================
+ Hits        24194    24223      +29     
+ Misses       9791     9784       -7     
+ Partials     1412     1406       -6     

Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 501190c...236179a. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@akstron akstron force-pushed the dev/cache-with-progress-notify branch 3 times, most recently from 665e8c5 to cfef4b7 Compare April 15, 2026 16:40
Comment thread cache/cache.go Outdated
Comment thread cache/cache_test.go Outdated
Comment thread cache/cache_test.go Outdated
Comment thread cache/cache_test.go Outdated
Comment thread cache/cache_test.go Outdated
Comment thread cache/cache_test.go Outdated
@akstron akstron force-pushed the dev/cache-with-progress-notify branch from cfef4b7 to b278a4d Compare April 16, 2026 17:11
Signed-off-by: Alok Kumar Singh <dev.alok.singh123@gmail.com>
@akstron akstron force-pushed the dev/cache-with-progress-notify branch from b278a4d to 236179a Compare April 16, 2026 17:16
@k8s-ci-robot
Copy link
Copy Markdown

@akstron: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-etcd-coverage-report 236179a link true /test pull-etcd-coverage-report

Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

}
})

t.Run("progress notification is elided when events are flowing", func(t *testing.T) {
Copy link
Copy Markdown
Member

@serathius serathius Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't validate anything as it just logs "progress notification received during active writes (elision not guaranteed but noted)" which is not useful. Better to remove than have false test.

@k8s-ci-robot
Copy link
Copy Markdown

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: akstron, serathius

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@serathius
Copy link
Copy Markdown
Member

Validating in robustness test

@serathius
Copy link
Copy Markdown
Member

serathius commented Apr 17, 2026

Run robustness tests for 20 minutes without an issue. Draft #21485
Getting close to being able to merge.

@serathius serathius merged commit cf3527b into etcd-io:main Apr 17, 2026
31 of 32 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

3 participants