cache: Support WithProgressNotify() for cache watch#21575
cache: Support WithProgressNotify() for cache watch#21575serathius merged 1 commit intoetcd-io:mainfrom
Conversation
|
Hi @akstron. Thanks for your PR. I'm waiting for a etcd-io member to verify that this patch is reasonable to test. If it is, they should reply with Tip We noticed you've done this a few times! Consider joining the org to skip this step and gain Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
232ba16 to
1c9a661
Compare
|
Please add integration tests, I don't care about AI written unit test. They just make implementation rigid. Instead please provide behavior based integration tests. |
| var progressTicker *time.Ticker | ||
| var progressTickC <-chan time.Time | ||
| if progressNotify { | ||
| progressTicker = time.NewTicker(c.cfg.ProgressNotifyInterval) |
There was a problem hiding this comment.
Ticker doesn't work as you think. Please read https://pkg.go.dev/time#NewTicker The ticker will adjust the time interval or drop ticks to make up for slow receivers
There was a problem hiding this comment.
used timer to make sure we send out progress only when ProgressNotifyInterval has passed since last event/prgress
There was a problem hiding this comment.
Still, I see the etcd is using ticker which means that the interval for periodic progress sent is: progressNotifyInterval <= actualProgressSentInterval < 2 * progressNotifyInterval and not almost equal to progressNotifyInterval.
There was a problem hiding this comment.
That might be a unintended behavior on etcd server side. I don't think the Ticker guarantees are widely known. I was only made aware of the difference when implementing progress notifier and ticker causing issues because of unstable period.
1c9a661 to
4d28bad
Compare
| case <-c.internalCtx.Done(): | ||
| return | ||
| case responseChan <- resp: | ||
| if len(resp.Events) > 0 && progressTimer != nil { |
There was a problem hiding this comment.
| if len(resp.Events) > 0 && progressTimer != nil { | |
| if progressTimer != nil { |
Sending a progress notify here also should delay a need for periodic progress notify.
| case <-c.internalCtx.Done(): | ||
| return | ||
| case <-progressChan: | ||
| if needsProgress { |
There was a problem hiding this comment.
Do we need to check it? I think we can just depend on reseting timer.
My understanding of the flow:
T0 Watch is created, let's say a watch event returned reseting timer and setting needsProgress = false
T1 timer sends event to progressChan , no event is send as needsProgress is set to false. Timer is reset and needsProgress = true
T2 timer sends event to progressChan. Event is sent. Timer is reset and needsProgress = true
T3 repeat previous step.
So basically first progress notify is delayed to 2*ProgressNotifyInterval, while all following are sent with ProgressNotifyInterval.
The only question is whether we can depend on just timer, basically what happens if we call "Reset" on timer that already emitted a value. Based on Go documentation we should be good as of Go 1.23 https://pkg.go.dev/time#NewTimer:
Before Go 1.23, the channel associated with a Timer was asynchronous (buffered, capacity 1), which meant that stale time values could be received even after [Timer.Stop](https://pkg.go.dev/time#Timer.Stop) or [Timer.Reset](https://pkg.go.dev/time#Timer.Reset) returned. As of Go 1.23, the channel is synchronous (unbuffered, capacity 0), eliminating the possibility of those stale values.
Please note this flows should be tested with fake timer like progressNotifier to ensure this behavior doesn't change.
There was a problem hiding this comment.
done. added test. Now, the above scenario is handled. The first progress notify as well all following would be sent with interval ProgressNotifyInterval
4d28bad to
941f0ab
Compare
|
/ok-to-test |
| // | ||
| // Note: gRPC proxy is not supported. Cache relies on RequestProgress RPCs, | ||
| // which the gRPC proxy does not forward. | ||
| func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error) { |
There was a problem hiding this comment.
Please split constructor into two layers, first public layer for external users, second internal layer for testing/internal needs allowing us to pass clock.
func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error) {
cfg := defaultConfig()
for _, opt := range opts {
opt(&cfg)
}
return newCache(client, prefix, cfg, realClock{})
}
func newCache(client *clientv3.Client, prefix string, cfg Config, clock Clock) (*Cache, error) {
...
}
There was a problem hiding this comment.
Using newCache would help simplify the code in tests you added. No need to update all tests with it.
| ) | ||
|
|
||
| // watcher holds one client’s buffered stream of events. | ||
| // watcher holds one client's buffered stream of events. |
There was a problem hiding this comment.
Please remove unrelated changes. This prolongs review and obfuscates git history and make cherry-picking harder. Recommend to send separate PR to fix this.
| }, st | ||
| } | ||
|
|
||
| func TestCacheWatchProgressNotify(t *testing.T) { |
There was a problem hiding this comment.
A little hard to follow the tests. Maybe you could take a look into how TestConditionalProgressRequestor in progress_requestor_test.go to see how to implement similar tests in a more readable way?
There was a problem hiding this comment.
Can you split the test in separate test focusing on a single scenario?
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted filessee 35 files with indirect coverage changes @@ Coverage Diff @@
## main #21575 +/- ##
==========================================
+ Coverage 68.35% 68.40% +0.05%
==========================================
Files 428 432 +4
Lines 35397 35413 +16
==========================================
+ Hits 24194 24223 +29
+ Misses 9791 9784 -7
+ Partials 1412 1406 -6 Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
665e8c5 to
cfef4b7
Compare
cfef4b7 to
b278a4d
Compare
Signed-off-by: Alok Kumar Singh <dev.alok.singh123@gmail.com>
b278a4d to
236179a
Compare
|
@akstron: The following test failed, say
Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
| } | ||
| }) | ||
|
|
||
| t.Run("progress notification is elided when events are flowing", func(t *testing.T) { |
There was a problem hiding this comment.
This test doesn't validate anything as it just logs "progress notification received during active writes (elision not guaranteed but noted)" which is not useful. Better to remove than have false test.
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: akstron, serathius The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
Validating in robustness test |
|
Run robustness tests for 20 minutes without an issue. Draft #21485 |
Added support for WithProgressNotify() for cache watch with default interval as 10 minutes
Used AI for writing unit tests which I modified and cleaned up.
Part of #19371