Skip to content

Commit 1353954

Browse files
authored
[client] Fix/grpc retry (#5750)
* [client] Fix flow client Receive retry loop not stopping after Close Use backoff.Permanent for canceled gRPC errors so Receive returns immediately instead of retrying until context deadline when the connection is already closed. Add TestNewClient_PermanentClose to verify the behavior. The connectivity.Shutdown check was meaningless because when the connection is shut down, c.realClient.Events(ctx, grpc.WaitForReady(true)) on the nex line already fails with codes.Canceled — which is now handled as a permanent error. The explicit state check was just duplicating what gRPC already reports through its normal error path. * [client] remove WaitForReady from stream open call grpc.WaitForReady(true) parks the RPC call internally until the connection reaches READY, only unblocking on ctx cancellation. This means the external backoff.Retry loop in Receive() never gets control back during a connection outage — it cannot tick, log, or apply its retry intervals while WaitForReady is blocking. Removing it restores fail-fast behaviour: Events() returns immediately with codes.Unavailable when the connection is not ready, which is exactly what the backoff loop expects. The backoff becomes the single authority over retry timing and cadence, as originally intended. * [client] Add connection recreation and improve flow client error handling Store gRPC dial options on the client to enable connection recreation on Internal errors (RST_STREAM/PROTOCOL_ERROR). Treat Unauthenticated, PermissionDenied, and Unimplemented as permanent failures. Unify mutex usage and add reconnection logging for better observability. * [client] Remove Unauthenticated, PermissionDenied, and Unimplemented from permanent error handling * [client] Fix error handling in Receive to properly re-establish stream and improve reconnection messaging * Fix test * [client] Add graceful shutdown handling and test for concurrent Close during Receive Prevent reconnection attempts after client closure by tracking a `closed` flag. Use `backoff.Permanent` for errors caused by operations on a closed client. Add a test to ensure `Close` does not block when `Receive` is actively running. * [client] Fix connection swap to properly close old gRPC connection Close the old `gRPC.ClientConn` after successfully swapping to a new connection during reconnection. * [client] Reset backoff * [client] Ensure stream closure on error during initialization * [client] Add test for handling server-side stream closure and reconnection Introduce `TestReceive_ServerClosesStream` to verify the client's ability to recover and process acknowledgments after the server closes the stream. Enhance test server with a controlled stream closure mechanism. * [client] Add protocol error simulation and enhance reconnection test Introduce `connTrackListener` to simulate HTTP/2 RST_STREAM with PROTOCOL_ERROR for testing. Refactor and rename `TestReceive_ServerClosesStream` to `TestReceive_ProtocolErrorStreamReconnect` to verify client recovery on protocol errors. * [client] Update Close error message in test for clarity * [client] Fine-tune the tests * [client] Adjust connection tracking in reconnection test * [client] Wait for Events handler to exit in RST_STREAM reconnection test Ensure the old `Events` handler exits fully before proceeding in the reconnection test to avoid dropped acknowledgments on a broken stream. Add a `handlerDone` channel to synchronize handler exits. * [client] Prevent panic on nil connection during Close * [client] Refactor connection handling to use explicit target tracking Introduce `target` field to store the gRPC connection target directly, simplifying reconnections and ensuring consistent connection reuse logic. * [client] Rename `isCancellation` to `isContextDone` and extend handling for `DeadlineExceeded` Refactor error handling to include `DeadlineExceeded` scenarios alongside `Canceled`. Update related condition checks for consistency. * [client] Add connection generation tracking to prevent stale reconnections Introduce `connGen` to track connection generations and ensure that stale `recreateConnection` calls do not override newer connections. Update stream establishment and reconnection logic to incorporate generation validation. * [client] Add backoff reset condition to prevent short-lived retry cycles Refine backoff reset logic to ensure it only occurs for sufficiently long-lived stream connections, avoiding interference with `MaxElapsedTime`. * [client] Introduce `minHealthyDuration` to refine backoff reset logic Add `minHealthyDuration` constant to ensure stream retries only reset the backoff timer if the stream survives beyond a minimum duration. Prevents unhealthy, short-lived streams from interfering with `MaxElapsedTime`. * [client] IPv6 friendly connection parsedURL.Hostname() strips IPv6 brackets. For http://[::1]:443, this turns it into ::1:443, which is not a valid host:port target for gRPC. Additionally, fmt.Sprintf("%s:%s", hostname, port) produces a trailing colon when the URL has no explicit port—http://example.com becomes example.com:. Both cases break the initial dial and reconnect paths. Use parsedURL.Host directly instead. * [client] Add `handlerStarted` channel to synchronize stream establishment in tests Introduce `handlerStarted` channel in the test server to signal when the server-side handler begins, ensuring robust synchronization between client and server during stream establishment. Update relevant test cases to wait for this signal before proceeding. * [client] Replace `receivedAcks` map with atomic counter and improve stream establishment sync in tests Refactor acknowledgment tracking in tests to use an `atomic.Int32` counter instead of a map. Replace fixed sleep with robust synchronization by waiting on `handlerStarted` signal for stream establishment. * [client] Extract `handleReceiveError` to simplify receive logic Refactor error handling in `receive` to a dedicated `handleReceiveError` method. Streamlines the main logic and isolates error recovery, including backoff reset and connection recreation. * [client] recreate gRPC ClientConn on every retry to prevent dual backoff The flow client had two competing retry loops: our custom exponential backoff and gRPC's internal subchannel reconnection. When establishStream failed, the same ClientConn was reused, allowing gRPC's internal backoff state to accumulate and control dial timing independently. Changes: - Consolidate error handling into handleRetryableError, which now handles context cancellation, permanent errors, backoff reset, and connection recreation in a single path - Call recreateConnection on every retryable error so each retry gets a fresh ClientConn with no internal backoff state - Remove connGen tracking since Receive is sequential and protected by a new receiving guard against concurrent calls - Reduce RandomizationFactor from 1 to 0.5 to avoid near-zero backoff intervals
1 parent 7483fec commit 1353954

2 files changed

Lines changed: 450 additions & 53 deletions

File tree

flow/client/client.go

Lines changed: 154 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
log "github.com/sirupsen/logrus"
1515
"google.golang.org/grpc"
1616
"google.golang.org/grpc/codes"
17-
"google.golang.org/grpc/connectivity"
1817
"google.golang.org/grpc/credentials"
1918
"google.golang.org/grpc/credentials/insecure"
2019
"google.golang.org/grpc/keepalive"
@@ -26,11 +25,22 @@ import (
2625
"github.com/netbirdio/netbird/util/wsproxy"
2726
)
2827

28+
var ErrClientClosed = errors.New("client is closed")
29+
30+
// minHealthyDuration is the minimum time a stream must survive before a failure
31+
// resets the backoff timer. Streams that fail faster are considered unhealthy and
32+
// should not reset backoff, so that MaxElapsedTime can eventually stop retries.
33+
const minHealthyDuration = 5 * time.Second
34+
2935
type GRPCClient struct {
3036
realClient proto.FlowServiceClient
3137
clientConn *grpc.ClientConn
3238
stream proto.FlowService_EventsClient
33-
streamMu sync.Mutex
39+
target string
40+
opts []grpc.DialOption
41+
closed bool // prevent creating conn in the middle of the Close
42+
receiving bool // prevent concurrent Receive calls
43+
mu sync.Mutex // protects clientConn, realClient, stream, closed, and receiving
3444
}
3545

3646
func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCClient, error) {
@@ -65,38 +75,82 @@ func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCCl
6575
grpc.WithDefaultServiceConfig(`{"healthCheckConfig": {"serviceName": ""}}`),
6676
)
6777

68-
conn, err := grpc.NewClient(fmt.Sprintf("%s:%s", parsedURL.Hostname(), parsedURL.Port()), opts...)
78+
target := parsedURL.Host
79+
conn, err := grpc.NewClient(target, opts...)
6980
if err != nil {
7081
return nil, fmt.Errorf("creating new grpc client: %w", err)
7182
}
7283

7384
return &GRPCClient{
7485
realClient: proto.NewFlowServiceClient(conn),
7586
clientConn: conn,
87+
target: target,
88+
opts: opts,
7689
}, nil
7790
}
7891

7992
func (c *GRPCClient) Close() error {
80-
c.streamMu.Lock()
81-
defer c.streamMu.Unlock()
82-
93+
c.mu.Lock()
94+
c.closed = true
8395
c.stream = nil
84-
if err := c.clientConn.Close(); err != nil && !errors.Is(err, context.Canceled) {
96+
conn := c.clientConn
97+
c.clientConn = nil
98+
c.mu.Unlock()
99+
100+
if conn == nil {
101+
return nil
102+
}
103+
104+
if err := conn.Close(); err != nil && !errors.Is(err, context.Canceled) {
85105
return fmt.Errorf("close client connection: %w", err)
86106
}
87107

88108
return nil
89109
}
90110

111+
func (c *GRPCClient) Send(event *proto.FlowEvent) error {
112+
c.mu.Lock()
113+
stream := c.stream
114+
c.mu.Unlock()
115+
116+
if stream == nil {
117+
return errors.New("stream not initialized")
118+
}
119+
120+
if err := stream.Send(event); err != nil {
121+
return fmt.Errorf("send flow event: %w", err)
122+
}
123+
124+
return nil
125+
}
126+
91127
func (c *GRPCClient) Receive(ctx context.Context, interval time.Duration, msgHandler func(msg *proto.FlowEventAck) error) error {
128+
c.mu.Lock()
129+
if c.receiving {
130+
c.mu.Unlock()
131+
return errors.New("concurrent Receive calls are not supported")
132+
}
133+
c.receiving = true
134+
c.mu.Unlock()
135+
defer func() {
136+
c.mu.Lock()
137+
c.receiving = false
138+
c.mu.Unlock()
139+
}()
140+
92141
backOff := defaultBackoff(ctx, interval)
93142
operation := func() error {
94-
if err := c.establishStreamAndReceive(ctx, msgHandler); err != nil {
95-
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
96-
return fmt.Errorf("receive: %w: %w", err, context.Canceled)
97-
}
143+
stream, err := c.establishStream(ctx)
144+
if err != nil {
145+
log.Errorf("failed to establish flow stream, retrying: %v", err)
146+
return c.handleRetryableError(err, time.Time{}, backOff)
147+
}
148+
149+
streamStart := time.Now()
150+
151+
if err := c.receive(stream, msgHandler); err != nil {
98152
log.Errorf("receive failed: %v", err)
99-
return fmt.Errorf("receive: %w", err)
153+
return c.handleRetryableError(err, streamStart, backOff)
100154
}
101155
return nil
102156
}
@@ -108,37 +162,106 @@ func (c *GRPCClient) Receive(ctx context.Context, interval time.Duration, msgHan
108162
return nil
109163
}
110164

111-
func (c *GRPCClient) establishStreamAndReceive(ctx context.Context, msgHandler func(msg *proto.FlowEventAck) error) error {
112-
if c.clientConn.GetState() == connectivity.Shutdown {
113-
return errors.New("connection to flow receiver has been shut down")
165+
// handleRetryableError resets the backoff timer if the stream was healthy long
166+
// enough and recreates the underlying ClientConn so that gRPC's internal
167+
// subchannel backoff does not accumulate and compete with our own retry timer.
168+
// A zero streamStart means the stream was never established.
169+
func (c *GRPCClient) handleRetryableError(err error, streamStart time.Time, backOff backoff.BackOff) error {
170+
if isContextDone(err) {
171+
return backoff.Permanent(err)
114172
}
115173

116-
stream, err := c.realClient.Events(ctx, grpc.WaitForReady(true))
174+
var permErr *backoff.PermanentError
175+
if errors.As(err, &permErr) {
176+
return err
177+
}
178+
179+
// Reset the backoff so the next retry starts with a short delay instead of
180+
// continuing the already-elapsed timer. Only do this if the stream was healthy
181+
// long enough; short-lived connect/drop cycles must not defeat MaxElapsedTime.
182+
if !streamStart.IsZero() && time.Since(streamStart) >= minHealthyDuration {
183+
backOff.Reset()
184+
}
185+
186+
if recreateErr := c.recreateConnection(); recreateErr != nil {
187+
log.Errorf("recreate connection: %v", recreateErr)
188+
return recreateErr
189+
}
190+
191+
log.Infof("connection recreated, retrying stream")
192+
return fmt.Errorf("retrying after error: %w", err)
193+
}
194+
195+
func (c *GRPCClient) recreateConnection() error {
196+
c.mu.Lock()
197+
if c.closed {
198+
c.mu.Unlock()
199+
return backoff.Permanent(ErrClientClosed)
200+
}
201+
202+
conn, err := grpc.NewClient(c.target, c.opts...)
117203
if err != nil {
118-
return fmt.Errorf("create event stream: %w", err)
204+
c.mu.Unlock()
205+
return fmt.Errorf("create new connection: %w", err)
206+
}
207+
208+
old := c.clientConn
209+
c.clientConn = conn
210+
c.realClient = proto.NewFlowServiceClient(conn)
211+
c.stream = nil
212+
c.mu.Unlock()
213+
214+
_ = old.Close()
215+
216+
return nil
217+
}
218+
219+
func (c *GRPCClient) establishStream(ctx context.Context) (proto.FlowService_EventsClient, error) {
220+
c.mu.Lock()
221+
if c.closed {
222+
c.mu.Unlock()
223+
return nil, backoff.Permanent(ErrClientClosed)
119224
}
225+
cl := c.realClient
226+
c.mu.Unlock()
120227

121-
err = stream.Send(&proto.FlowEvent{IsInitiator: true})
228+
// open stream outside the lock — blocking operation
229+
stream, err := cl.Events(ctx)
122230
if err != nil {
123-
log.Infof("failed to send initiator message to flow receiver but will attempt to continue. Error: %s", err)
231+
return nil, fmt.Errorf("create event stream: %w", err)
232+
}
233+
streamReady := false
234+
defer func() {
235+
if !streamReady {
236+
_ = stream.CloseSend()
237+
}
238+
}()
239+
240+
if err = stream.Send(&proto.FlowEvent{IsInitiator: true}); err != nil {
241+
return nil, fmt.Errorf("send initiator: %w", err)
124242
}
125243

126244
if err = checkHeader(stream); err != nil {
127-
return fmt.Errorf("check header: %w", err)
245+
return nil, fmt.Errorf("check header: %w", err)
128246
}
129247

130-
c.streamMu.Lock()
248+
c.mu.Lock()
249+
if c.closed {
250+
c.mu.Unlock()
251+
return nil, backoff.Permanent(ErrClientClosed)
252+
}
131253
c.stream = stream
132-
c.streamMu.Unlock()
254+
c.mu.Unlock()
255+
streamReady = true
133256

134-
return c.receive(stream, msgHandler)
257+
return stream, nil
135258
}
136259

137260
func (c *GRPCClient) receive(stream proto.FlowService_EventsClient, msgHandler func(msg *proto.FlowEventAck) error) error {
138261
for {
139262
msg, err := stream.Recv()
140263
if err != nil {
141-
return fmt.Errorf("receive from stream: %w", err)
264+
return err
142265
}
143266

144267
if msg.IsInitiator {
@@ -169,7 +292,7 @@ func checkHeader(stream proto.FlowService_EventsClient) error {
169292
func defaultBackoff(ctx context.Context, interval time.Duration) backoff.BackOff {
170293
return backoff.WithContext(&backoff.ExponentialBackOff{
171294
InitialInterval: 800 * time.Millisecond,
172-
RandomizationFactor: 1,
295+
RandomizationFactor: 0.5,
173296
Multiplier: 1.7,
174297
MaxInterval: interval / 2,
175298
MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
@@ -178,18 +301,12 @@ func defaultBackoff(ctx context.Context, interval time.Duration) backoff.BackOff
178301
}, ctx)
179302
}
180303

181-
func (c *GRPCClient) Send(event *proto.FlowEvent) error {
182-
c.streamMu.Lock()
183-
stream := c.stream
184-
c.streamMu.Unlock()
185-
186-
if stream == nil {
187-
return errors.New("stream not initialized")
304+
func isContextDone(err error) bool {
305+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
306+
return true
188307
}
189-
190-
if err := stream.Send(event); err != nil {
191-
return fmt.Errorf("send flow event: %w", err)
308+
if s, ok := status.FromError(err); ok {
309+
return s.Code() == codes.Canceled || s.Code() == codes.DeadlineExceeded
192310
}
193-
194-
return nil
311+
return false
195312
}

0 commit comments

Comments
 (0)