From e36663aa758c6d1c9e259d1711b8b756d5dcf7f7 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Thu, 2 Apr 2026 15:43:26 +0800 Subject: [PATCH 1/4] evict backend --- conf/proxy.toml | 8 ++ lib/config/proxy.go | 24 ++++ lib/config/proxy_test.go | 22 +++ pkg/balance/router/group.go | 35 +++++ pkg/balance/router/mock_test.go | 10 ++ pkg/balance/router/router.go | 67 ++++++++- pkg/balance/router/router_score.go | 64 +++++++-- pkg/balance/router/router_score_test.go | 158 +++++++++++++++++++++ pkg/manager/config/config_test.go | 20 +++ pkg/proxy/backend/backend_conn_mgr.go | 19 +++ pkg/proxy/backend/backend_conn_mgr_test.go | 31 ++++ 11 files changed, 448 insertions(+), 10 deletions(-) diff --git a/conf/proxy.toml b/conf/proxy.toml index d05b6c9a7..d1fa70e57 100644 --- a/conf/proxy.toml +++ b/conf/proxy.toml @@ -23,6 +23,14 @@ graceful-close-conn-timeout = 15 +# fail-backend-list marks backend pod names or backend addresses as failed. TiProxy will stop routing new +# connections to them and migrate existing connections away. +# fail-backend-list = ["db-2033841436272623616-0f6e346b-tidb-0", "10.0.0.10:4000"] + +# failover-timeout is measured in seconds. If a failed backend still has remaining connections after the timeout, +# TiProxy will force close them. +# failover-timeout = 60 + # possible values: # "" => enable static routing. # "pd-addr:pd-port" => automatically tidb discovery. diff --git a/lib/config/proxy.go b/lib/config/proxy.go index 3ba73fba3..80b0c3056 100644 --- a/lib/config/proxy.go +++ b/lib/config/proxy.go @@ -70,6 +70,11 @@ type ProxyServerOnline struct { // BackendClusters represents multiple backend clusters that the proxy can route to. It can be reloaded // online. BackendClusters []BackendCluster `yaml:"backend-clusters,omitempty" toml:"backend-clusters,omitempty" json:"backend-clusters,omitempty" reloadable:"true"` + // FailBackendList contains backend pod names or backend addresses (IP:port) that should be drained immediately + // and excluded from new routing. + FailBackendList []string `yaml:"fail-backend-list,omitempty" toml:"fail-backend-list,omitempty" json:"fail-backend-list,omitempty" reloadable:"true"` + // FailoverTimeout is the grace period in seconds before force closing the remaining connections on failed backends. + FailoverTimeout int `yaml:"failover-timeout,omitempty" toml:"failover-timeout,omitempty" json:"failover-timeout,omitempty" reloadable:"true"` } type ProxyServer struct { @@ -136,6 +141,7 @@ func NewConfig() *Config { cfg.Proxy.FrontendKeepalive, cfg.Proxy.BackendHealthyKeepalive, cfg.Proxy.BackendUnhealthyKeepalive = DefaultKeepAlive() cfg.Proxy.PDAddrs = "127.0.0.1:2379" cfg.Proxy.GracefulCloseConnTimeout = 15 + cfg.Proxy.FailoverTimeout = 60 cfg.API.Addr = "0.0.0.0:3080" @@ -162,6 +168,7 @@ func (cfg *Config) Clone() *Config { newCfg.Labels = maps.Clone(cfg.Labels) newCfg.Proxy.PublicEndpoints = slices.Clone(cfg.Proxy.PublicEndpoints) newCfg.Proxy.BackendClusters = slices.Clone(cfg.Proxy.BackendClusters) + newCfg.Proxy.FailBackendList = slices.Clone(cfg.Proxy.FailBackendList) for i := range newCfg.Proxy.BackendClusters { newCfg.Proxy.BackendClusters[i].NSServers = slices.Clone(newCfg.Proxy.BackendClusters[i].NSServers) } @@ -281,6 +288,23 @@ func (ps *ProxyServer) Check() error { return errors.Wrapf(ErrInvalidConfigValue, "invalid proxy.backend-clusters.ns-servers: %s", err.Error()) } } + if ps.FailoverTimeout < 0 { + return errors.Wrapf(ErrInvalidConfigValue, "proxy.failover-timeout must be greater than or equal to 0") + } + failBackends := ps.FailBackendList[:0] + failBackendSet := make(map[string]struct{}, len(ps.FailBackendList)) + for i, backendName := range ps.FailBackendList { + backendName = strings.TrimSpace(backendName) + if backendName == "" { + return errors.Wrapf(ErrInvalidConfigValue, "proxy.fail-backend-list[%d] is empty", i) + } + if _, ok := failBackendSet[backendName]; ok { + return errors.Wrapf(ErrInvalidConfigValue, "duplicate proxy.fail-backend-list entry %s", backendName) + } + failBackendSet[backendName] = struct{}{} + failBackends = append(failBackends, backendName) + } + ps.FailBackendList = failBackends return nil } diff --git a/lib/config/proxy_test.go b/lib/config/proxy_test.go index 14a3bec55..62c91b348 100644 --- a/lib/config/proxy_test.go +++ b/lib/config/proxy_test.go @@ -26,6 +26,8 @@ var testProxyConfig = Config{ FrontendKeepalive: KeepAlive{Enabled: true}, ProxyProtocol: "v2", GracefulWaitBeforeShutdown: 10, + FailBackendList: []string{"db-tidb-0", "db-tidb-1"}, + FailoverTimeout: 60, ConnBufferSize: 32 * 1024, BackendClusters: []BackendCluster{ { @@ -188,6 +190,24 @@ func TestProxyCheck(t *testing.T) { }, err: ErrInvalidConfigValue, }, + { + pre: func(t *testing.T, c *Config) { + c.Proxy.FailBackendList = []string{"db-tidb-0", " "} + }, + err: ErrInvalidConfigValue, + }, + { + pre: func(t *testing.T, c *Config) { + c.Proxy.FailBackendList = []string{"db-tidb-0", "db-tidb-0"} + }, + err: ErrInvalidConfigValue, + }, + { + pre: func(t *testing.T, c *Config) { + c.Proxy.FailoverTimeout = -1 + }, + err: ErrInvalidConfigValue, + }, } for _, tc := range testcases { cfg := testProxyConfig @@ -311,10 +331,12 @@ func TestCloneConfig(t *testing.T) { require.Equal(t, cfg, *clone) cfg.Labels["c"] = "d" cfg.Proxy.PublicEndpoints[0] = "2.2.2.0/24" + cfg.Proxy.FailBackendList[0] = "db-tidb-9" cfg.Proxy.BackendClusters[0].Name = "cluster-updated" cfg.Proxy.BackendClusters[0].NSServers[0] = "10.0.0.9" require.NotContains(t, clone.Labels, "c") require.Equal(t, []string{"1.1.1.0/24"}, clone.Proxy.PublicEndpoints) + require.Equal(t, []string{"db-tidb-0", "db-tidb-1"}, clone.Proxy.FailBackendList) require.Equal(t, "cluster-a", clone.Proxy.BackendClusters[0].Name) require.Equal(t, []string{"10.0.0.2", "10.0.0.3"}, clone.Proxy.BackendClusters[0].NSServers) } diff --git a/pkg/balance/router/group.go b/pkg/balance/router/group.go index 924fe5981..431dccd9f 100644 --- a/pkg/balance/router/group.go +++ b/pkg/balance/router/group.go @@ -256,6 +256,9 @@ func (g *Group) Balance(ctx context.Context) { i := 0 for ele := fromBackend.connList.Front(); ele != nil && ctx.Err() == nil && i < count; ele = ele.Next() { conn := ele.Value + if conn.forceClosing { + continue + } switch conn.phase { case phaseRedirectNotify: // A connection cannot be redirected again when it has not finished redirecting. @@ -282,6 +285,7 @@ func (g *Group) onCreateConn(backendInst BackendInst, conn RedirectableConn, suc RedirectableConn: conn, createTime: time.Now(), phase: phaseNotRedirected, + forceClosing: false, } g.addConn(backend, connWrapper) conn.SetEventReceiver(g) @@ -290,6 +294,37 @@ func (g *Group) onCreateConn(backendInst BackendInst, conn RedirectableConn, suc } } +func (g *Group) CloseTimedOutFailoverConnections(now time.Time, timeout time.Duration) { + g.Lock() + defer g.Unlock() + for _, backend := range g.backends { + active, since := backend.Failover() + if !active { + continue + } + if timeout > 0 && since.Add(timeout).After(now) { + continue + } + for ele := backend.connList.Front(); ele != nil; ele = ele.Next() { + conn := ele.Value + if conn.phase == phaseClosed || conn.forceClosing { + continue + } + fields := []zap.Field{ + zap.Uint64("connID", conn.ConnectionID()), + zap.String("backend_addr", backend.addr), + zap.String("backend_pod", backend.PodName()), + zap.Duration("failover_timeout", timeout), + zap.Duration("failover_elapsed", now.Sub(since)), + } + if conn.ForceClose() { + conn.forceClosing = true + g.lg.Warn("force close connection on failover backend", fields...) + } + } + } +} + func (g *Group) removeConn(backend *backendWrapper, ce *glist.Element[*connWrapper]) { backend.connList.Remove(ce) setBackendConnMetrics(backend.addr, backend.connList.Len()) diff --git a/pkg/balance/router/mock_test.go b/pkg/balance/router/mock_test.go index b701657ee..216bec1e6 100644 --- a/pkg/balance/router/mock_test.go +++ b/pkg/balance/router/mock_test.go @@ -69,6 +69,16 @@ func (conn *mockRedirectableConn) Redirect(inst BackendInst) bool { return true } +func (conn *mockRedirectableConn) ForceClose() bool { + conn.Lock() + defer conn.Unlock() + if conn.closing { + return false + } + conn.closing = true + return true +} + func (conn *mockRedirectableConn) GetRedirectingBackendID() string { conn.Lock() defer conn.Unlock() diff --git a/pkg/balance/router/router.go b/pkg/balance/router/router.go index 9f62833ad..998d76620 100644 --- a/pkg/balance/router/router.go +++ b/pkg/balance/router/router.go @@ -4,6 +4,7 @@ package router import ( + "net" "strings" "sync" "time" @@ -69,6 +70,8 @@ type RedirectableConn interface { Value(key any) any // Redirect returns false if the current conn is not redirectable. Redirect(backend BackendInst) bool + // ForceClose closes the connection immediately and returns false if it's already closing. + ForceClose() bool ConnectionID() uint64 ConnInfo() []zap.Field } @@ -88,9 +91,12 @@ type backendWrapper struct { mu struct { sync.RWMutex observer.BackendHealth + failoverActive bool + failoverSince time.Time } - id string - addr string + id string + addr string + podName string // connScore is used for calculating backend scores and check if the backend can be removed from the list. // connScore = connList.Len() + incoming connections - outgoing connections. connScore int @@ -105,6 +111,7 @@ func newBackendWrapper(id string, health observer.BackendHealth) *backendWrapper wrapper := &backendWrapper{ id: id, addr: health.Addr, + podName: backendPodNameFromAddr(health.Addr), connList: glist.New[*connWrapper](), } wrapper.setHealth(health) @@ -137,12 +144,50 @@ func (b *backendWrapper) Addr() string { } func (b *backendWrapper) Healthy() bool { + b.mu.RLock() + healthy := b.mu.Healthy && !b.mu.failoverActive + b.mu.RUnlock() + return healthy +} + +func (b *backendWrapper) ObservedHealthy() bool { b.mu.RLock() healthy := b.mu.Healthy b.mu.RUnlock() return healthy } +func (b *backendWrapper) PodName() string { + return b.podName +} + +func (b *backendWrapper) setFailover(active bool, since time.Time) (changed bool, failoverSince time.Time) { + b.mu.Lock() + defer b.mu.Unlock() + if active { + if b.mu.failoverActive { + return false, b.mu.failoverSince + } + b.mu.failoverActive = true + b.mu.failoverSince = since + return true, b.mu.failoverSince + } + if !b.mu.failoverActive { + return false, time.Time{} + } + b.mu.failoverActive = false + b.mu.failoverSince = time.Time{} + return true, time.Time{} +} + +func (b *backendWrapper) Failover() (active bool, since time.Time) { + b.mu.RLock() + active = b.mu.failoverActive + since = b.mu.failoverSince + b.mu.RUnlock() + return +} + func (b *backendWrapper) ServerVersion() string { b.mu.RLock() version := b.mu.ServerVersion @@ -236,4 +281,22 @@ type connWrapper struct { lastRedirect time.Time createTime time.Time phase connPhase + forceClosing bool +} + +func backendPodNameFromAddr(addr string) string { + host, _, err := net.SplitHostPort(addr) + if err != nil { + host = addr + } + if host == "" { + return "" + } + if ip := net.ParseIP(host); ip != nil { + return host + } + if idx := strings.IndexByte(host, '.'); idx >= 0 { + return host[:idx] + } + return host } diff --git a/pkg/balance/router/router_score.go b/pkg/balance/router/router_score.go index 97b22da67..8fb5f4b36 100644 --- a/pkg/balance/router/router_score.go +++ b/pkg/balance/router/router_score.go @@ -51,14 +51,17 @@ type ScoreBasedRouter struct { serverVersion string // The backend supports redirection only when they have signing certs. supportRedirection bool + failoverBackends map[string]struct{} + failoverTimeout time.Duration } // NewScoreBasedRouter creates a ScoreBasedRouter. func NewScoreBasedRouter(logger *zap.Logger) *ScoreBasedRouter { return &ScoreBasedRouter{ - logger: logger, - backends: make(map[string]*backendWrapper), - groups: make([]*Group, 0), + logger: logger, + backends: make(map[string]*backendWrapper), + groups: make([]*Group, 0), + failoverBackends: make(map[string]struct{}), } } @@ -83,6 +86,9 @@ func (r *ScoreBasedRouter) Init(ctx context.Context, ob observer.BackendObserver default: r.logger.Error("unsupported routing rule, use the default rule", zap.String("rule", cfg.Balance.RoutingRule)) } + r.Lock() + r.setFailoverConfigLocked(cfg) + r.Unlock() childCtx, cancelFunc := context.WithCancel(ctx) r.cancelFunc = cancelFunc @@ -214,12 +220,15 @@ func (router *ScoreBasedRouter) updateBackendHealth(healthResults observer.Healt } var serverVersion string supportRedirection := true + now := time.Now() for backendID, health := range backends { backend, ok := router.backends[backendID] if !ok && health.Healthy { router.logger.Debug("add new backend to router", zap.String("backend_id", backendID), zap.String("addr", health.Addr), zap.Stringer("health", health)) - router.backends[backendID] = newBackendWrapper(backendID, *health) + backend = newBackendWrapper(backendID, *health) + router.backends[backendID] = backend + router.setBackendFailoverLocked(backend, now) serverVersion = health.ServerVersion } else if ok { if !health.Equals(backend.getHealth()) { @@ -227,6 +236,7 @@ func (router *ScoreBasedRouter) updateBackendHealth(healthResults observer.Healt zap.String("backend_id", backendID), zap.String("addr", health.Addr), zap.Stringer("health", health)) } backend.setHealth(*health) + router.setBackendFailoverLocked(backend, now) if health.Healthy { serverVersion = health.ServerVersion } @@ -292,7 +302,7 @@ func (router *ScoreBasedRouter) updateGroups() { for _, backend := range router.backends { // If connList.Len() == 0, there won't be any outgoing connections. // And if also connScore == 0, there won't be any incoming connections. - if !backend.Healthy() && backend.connList.Len() == 0 && backend.connScore <= 0 { + if !backend.ObservedHealthy() && backend.connList.Len() == 0 && backend.connScore <= 0 { delete(router.backends, backend.id) if backend.group != nil { backend.group.RemoveBackend(backend.id) @@ -392,6 +402,7 @@ func (router *ScoreBasedRouter) rebalanceLoop(ctx context.Context) { func (router *ScoreBasedRouter) setConfig(cfg *config.Config) { router.Lock() defer router.Unlock() + router.setFailoverConfigLocked(cfg) for _, group := range router.groups { group.SetConfig(cfg) } @@ -404,12 +415,49 @@ func (router *ScoreBasedRouter) rebalance(ctx context.Context) { router.Lock() defer router.Unlock() - if !router.supportRedirection { - return + if router.supportRedirection { + for _, group := range router.groups { + group.Balance(ctx) + } } for _, group := range router.groups { - group.Balance(ctx) + group.CloseTimedOutFailoverConnections(time.Now(), router.failoverTimeout) + } +} + +func (router *ScoreBasedRouter) setFailoverConfigLocked(cfg *config.Config) { + failoverBackends := make(map[string]struct{}, len(cfg.Proxy.FailBackendList)) + for _, backend := range cfg.Proxy.FailBackendList { + failoverBackends[backend] = struct{}{} + } + router.failoverBackends = failoverBackends + router.failoverTimeout = time.Duration(cfg.Proxy.FailoverTimeout) * time.Second + now := time.Now() + for _, backend := range router.backends { + router.setBackendFailoverLocked(backend, now) + } +} + +func (router *ScoreBasedRouter) setBackendFailoverLocked(backend *backendWrapper, now time.Time) { + _, active := router.failoverBackends[backend.PodName()] + if !active { + _, active = router.failoverBackends[backend.Addr()] + } + changed, since := backend.setFailover(active, now) + if !changed { + return + } + fields := []zap.Field{ + zap.String("backend_addr", backend.Addr()), + zap.String("backend_pod", backend.PodName()), + zap.Duration("failover_timeout", router.failoverTimeout), + } + if active { + fields = append(fields, zap.Time("failover_since", since)) + router.logger.Warn("backend enters failover", fields...) + return } + router.logger.Info("backend exits failover", fields...) } func (router *ScoreBasedRouter) ConnCount() int { diff --git a/pkg/balance/router/router_score_test.go b/pkg/balance/router/router_score_test.go index fd2a0877a..efca4d9e2 100644 --- a/pkg/balance/router/router_score_test.go +++ b/pkg/balance/router/router_score_test.go @@ -136,6 +136,13 @@ func (tester *routerTester) updateBackendLocalityByAddr(addr string, local bool) tester.notifyHealth() } +func (tester *routerTester) updateBackendRedirectSupportByAddr(addr string, support bool) { + health, ok := tester.backends[addr] + require.True(tester.t, ok) + health.SupportRedirection = support + tester.notifyHealth() +} + func (tester *routerTester) getBackendByIndex(index int) *backendWrapper { addr := strconv.Itoa(index + 1) backend := tester.router.backends[addr] @@ -735,6 +742,94 @@ func TestSetBackendStatus(t *testing.T) { } } +func TestBackendPodNameFromAddr(t *testing.T) { + require.Equal(t, "db-2033841436272623616-0f6e346b-tidb-0", backendPodNameFromAddr("db-2033841436272623616-0f6e346b-tidb-0.peer.ns.svc:4000")) + require.Equal(t, "127.0.0.1", backendPodNameFromAddr("127.0.0.1:4000")) + require.Equal(t, "backend-host", backendPodNameFromAddr("backend-host")) +} + +func TestFailoverBackend(t *testing.T) { + tester := newRouterTester(t, nil) + tester.addBackends(2) + tester.addConnections(20) + + fromBackend := tester.getBackendByIndex(0) + toBackend := tester.getBackendByIndex(1) + tester.router.setConfig(&config.Config{ + Proxy: config.ProxyServer{ + ProxyServerOnline: config.ProxyServerOnline{ + FailBackendList: []string{fromBackend.PodName()}, + FailoverTimeout: 60, + }, + }, + }) + + require.False(t, fromBackend.Healthy()) + selector := tester.router.GetBackendSelector(ClientInfo{}) + backend, err := selector.Next() + require.NoError(t, err) + selector.Finish(nil, false) + require.NotNil(t, backend) + require.Equal(t, toBackend.Addr(), backend.Addr()) + + tester.rebalance(1) + require.Equal(t, 10, fromBackend.ConnCount()) + tester.checkRedirectingNum(10) + tester.redirectFinish(10, true) + require.Equal(t, 0, fromBackend.ConnCount()) + require.Equal(t, 20, toBackend.ConnCount()) + tester.checkBackendConnMetrics() +} + +func TestFailoverBackendByAddr(t *testing.T) { + tester := newRouterTester(t, nil) + tester.addBackends(2) + + fromBackend := tester.getBackendByIndex(0) + toBackend := tester.getBackendByIndex(1) + tester.router.setConfig(&config.Config{ + Proxy: config.ProxyServer{ + ProxyServerOnline: config.ProxyServerOnline{ + FailBackendList: []string{fromBackend.Addr()}, + FailoverTimeout: 60, + }, + }, + }) + + require.False(t, fromBackend.Healthy()) + require.True(t, toBackend.Healthy()) + selector := tester.router.GetBackendSelector(ClientInfo{}) + backend, err := selector.Next() + require.NoError(t, err) + selector.Finish(nil, false) + require.NotNil(t, backend) + require.Equal(t, toBackend.Addr(), backend.Addr()) +} + +func TestFailoverTimeoutForceClose(t *testing.T) { + tester := newRouterTester(t, nil) + tester.addBackends(1) + tester.addConnections(3) + + backend := tester.getBackendByIndex(0) + tester.updateBackendRedirectSupportByAddr(backend.Addr(), false) + tester.router.setConfig(&config.Config{ + Proxy: config.ProxyServer{ + ProxyServerOnline: config.ProxyServerOnline{ + FailBackendList: []string{backend.PodName()}, + FailoverTimeout: 0, + }, + }, + }) + + tester.rebalance(1) + for _, conn := range tester.conns { + require.True(t, conn.closing) + } + tester.closeConnections(3, false) + tester.checkBackendConnMetrics() +} + func TestGetServerVersion(t *testing.T) { lg, _ := logger.CreateLoggerForTest(t) rt := NewScoreBasedRouter(lg) @@ -907,6 +1002,69 @@ func TestChannelClosed(t *testing.T) { } } +func TestWatchFailoverConfig(t *testing.T) { + lg, _ := logger.CreateLoggerForTest(t) + router := NewScoreBasedRouter(lg) + cfgCh := make(chan *config.Config) + addr := "db-2033841436272623616-0f6e346b-tidb-0.peer.ns.svc:4000" + cfgGetter := newMockConfigGetter(&config.Config{ + Proxy: config.ProxyServer{ + ProxyServerOnline: config.ProxyServerOnline{ + FailoverTimeout: 60, + }, + }, + }) + bo := newMockBackendObserver() + router.Init(context.Background(), bo, simpleBpCreator, cfgGetter, cfgCh) + t.Cleanup(bo.Close) + t.Cleanup(router.Close) + + bo.addBackend(addr, nil) + bo.notify(nil) + require.Eventually(t, func() bool { + backend := router.backends[addr] + return backend != nil && backend.Healthy() + }, 3*time.Second, 10*time.Millisecond) + + cfgCh <- &config.Config{ + Proxy: config.ProxyServer{ + ProxyServerOnline: config.ProxyServerOnline{ + FailBackendList: []string{"db-2033841436272623616-0f6e346b-tidb-0"}, + FailoverTimeout: 60, + }, + }, + } + require.Eventually(t, func() bool { + backend := router.backends[addr] + return backend != nil && !backend.Healthy() + }, 3*time.Second, 10*time.Millisecond) + + cfgCh <- &config.Config{ + Proxy: config.ProxyServer{ + ProxyServerOnline: config.ProxyServerOnline{ + FailBackendList: []string{addr}, + FailoverTimeout: 60, + }, + }, + } + require.Eventually(t, func() bool { + backend := router.backends[addr] + return backend != nil && !backend.Healthy() + }, 3*time.Second, 10*time.Millisecond) + + cfgCh <- &config.Config{ + Proxy: config.ProxyServer{ + ProxyServerOnline: config.ProxyServerOnline{ + FailoverTimeout: 60, + }, + }, + } + require.Eventually(t, func() bool { + backend := router.backends[addr] + return backend != nil && backend.Healthy() + }, 3*time.Second, 10*time.Millisecond) +} + func TestControlSpeed(t *testing.T) { tests := []struct { balanceCount float64 diff --git a/pkg/manager/config/config_test.go b/pkg/manager/config/config_test.go index 2ea3557eb..8569553d4 100644 --- a/pkg/manager/config/config_test.go +++ b/pkg/manager/config/config_test.go @@ -88,6 +88,26 @@ func TestConfigReload(t *testing.T) { return c.API.Addr == "0.0.0.0:3081" }, }, + { + name: "failover override", + precfg: ` +proxy.fail-backend-list = ["db-tidb-0", "db-tidb-1"] +proxy.failover-timeout = 90 +`, + precheck: func(c *config.Config) bool { + return c.Proxy.FailoverTimeout == 90 && + len(c.Proxy.FailBackendList) == 2 && + c.Proxy.FailBackendList[0] == "db-tidb-0" && + c.Proxy.FailBackendList[1] == "db-tidb-1" + }, + postcfg: ` +proxy.fail-backend-list = [] +proxy.failover-timeout = 0 +`, + postcheck: func(c *config.Config) bool { + return c.Proxy.FailoverTimeout == 0 && len(c.Proxy.FailBackendList) == 0 + }, + }, { name: "non empty fields should not be override by empty fields", precfg: `proxy.addr = "gg"`, diff --git a/pkg/proxy/backend/backend_conn_mgr.go b/pkg/proxy/backend/backend_conn_mgr.go index c490817aa..fc6521e08 100644 --- a/pkg/proxy/backend/backend_conn_mgr.go +++ b/pkg/proxy/backend/backend_conn_mgr.go @@ -710,6 +710,25 @@ func (mgr *BackendConnManager) Redirect(backendInst router.BackendInst) bool { return true } +func (mgr *BackendConnManager) ForceClose() bool { + for { + status := mgr.closeStatus.Load() + if status >= statusClosing { + return false + } + if mgr.closeStatus.CompareAndSwap(status, statusClosing) { + break + } + } + mgr.quitSource = SrcProxyQuit + if mgr.clientIO != nil { + if err := mgr.clientIO.Close(); err != nil && !pnet.IsDisconnectError(err) { + mgr.logger.Warn("force close client IO error", zap.Error(err)) + } + } + return true +} + func (mgr *BackendConnManager) notifyRedirectResult(ctx context.Context, rs *redirectResult) { _ = ctx if rs == nil { diff --git a/pkg/proxy/backend/backend_conn_mgr_test.go b/pkg/proxy/backend/backend_conn_mgr_test.go index c9c7a0581..db8b09d13 100644 --- a/pkg/proxy/backend/backend_conn_mgr_test.go +++ b/pkg/proxy/backend/backend_conn_mgr_test.go @@ -904,6 +904,37 @@ func TestGracefulCloseBeforeHandshake(t *testing.T) { ts.runTests(runners) } +func TestForceClose(t *testing.T) { + ts := newBackendMgrTester(t) + runners := []runner{ + // 1st handshake + { + client: ts.mc.authenticate, + proxy: ts.firstHandshake4Proxy, + backend: ts.handshake4Backend, + }, + // force close + { + proxy: func(_, _ pnet.PacketIO) error { + require.True(t, ts.mp.ForceClose()) + return nil + }, + }, + // really closed + { + proxy: ts.checkConnClosed4Proxy, + }, + { + proxy: func(clientIO, backendIO pnet.PacketIO) error { + require.Equal(t, SrcProxyQuit, ts.mp.QuitSource()) + require.False(t, ts.mp.ForceClose()) + return nil + }, + }, + } + ts.runTests(runners) +} + func TestHandlerReturnError(t *testing.T) { tests := []struct { cfg cfgOverrider From 5b0b1e1031f2b5b25c1993539ca5c69ffe51905d Mon Sep 17 00:00:00 2001 From: djshow832 Date: Tue, 14 Apr 2026 10:43:53 +0800 Subject: [PATCH 2/4] remove active --- lib/config/proxy.go | 7 ++----- lib/config/proxy_test.go | 6 ------ pkg/balance/router/group.go | 6 +++--- pkg/balance/router/router.go | 14 +++++--------- 4 files changed, 10 insertions(+), 23 deletions(-) diff --git a/lib/config/proxy.go b/lib/config/proxy.go index 80b0c3056..371a01a43 100644 --- a/lib/config/proxy.go +++ b/lib/config/proxy.go @@ -267,10 +267,6 @@ func (ps *ProxyServer) Check() error { if _, err := ps.GetSQLAddrs(); err != nil { return errors.Wrapf(ErrInvalidConfigValue, "invalid proxy.addr or proxy.port-range: %s", err.Error()) } - if len(ps.BackendClusters) == 0 { - return nil - } - clusterNames := make(map[string]struct{}, len(ps.BackendClusters)) for i, cluster := range ps.BackendClusters { name := strings.TrimSpace(cluster.Name) @@ -288,6 +284,7 @@ func (ps *ProxyServer) Check() error { return errors.Wrapf(ErrInvalidConfigValue, "invalid proxy.backend-clusters.ns-servers: %s", err.Error()) } } + if ps.FailoverTimeout < 0 { return errors.Wrapf(ErrInvalidConfigValue, "proxy.failover-timeout must be greater than or equal to 0") } @@ -299,7 +296,7 @@ func (ps *ProxyServer) Check() error { return errors.Wrapf(ErrInvalidConfigValue, "proxy.fail-backend-list[%d] is empty", i) } if _, ok := failBackendSet[backendName]; ok { - return errors.Wrapf(ErrInvalidConfigValue, "duplicate proxy.fail-backend-list entry %s", backendName) + continue } failBackendSet[backendName] = struct{}{} failBackends = append(failBackends, backendName) diff --git a/lib/config/proxy_test.go b/lib/config/proxy_test.go index 62c91b348..196bd5f47 100644 --- a/lib/config/proxy_test.go +++ b/lib/config/proxy_test.go @@ -196,12 +196,6 @@ func TestProxyCheck(t *testing.T) { }, err: ErrInvalidConfigValue, }, - { - pre: func(t *testing.T, c *Config) { - c.Proxy.FailBackendList = []string{"db-tidb-0", "db-tidb-0"} - }, - err: ErrInvalidConfigValue, - }, { pre: func(t *testing.T, c *Config) { c.Proxy.FailoverTimeout = -1 diff --git a/pkg/balance/router/group.go b/pkg/balance/router/group.go index 431dccd9f..e87936743 100644 --- a/pkg/balance/router/group.go +++ b/pkg/balance/router/group.go @@ -298,8 +298,8 @@ func (g *Group) CloseTimedOutFailoverConnections(now time.Time, timeout time.Dur g.Lock() defer g.Unlock() for _, backend := range g.backends { - active, since := backend.Failover() - if !active { + since := backend.FailoverSince() + if since.IsZero() { continue } if timeout > 0 && since.Add(timeout).After(now) { @@ -319,7 +319,7 @@ func (g *Group) CloseTimedOutFailoverConnections(now time.Time, timeout time.Dur } if conn.ForceClose() { conn.forceClosing = true - g.lg.Warn("force close connection on failover backend", fields...) + g.lg.Info("force close connection on failover backend", fields...) } } } diff --git a/pkg/balance/router/router.go b/pkg/balance/router/router.go index 998d76620..073a6227e 100644 --- a/pkg/balance/router/router.go +++ b/pkg/balance/router/router.go @@ -91,8 +91,7 @@ type backendWrapper struct { mu struct { sync.RWMutex observer.BackendHealth - failoverActive bool - failoverSince time.Time + failoverSince time.Time } id string addr string @@ -145,7 +144,7 @@ func (b *backendWrapper) Addr() string { func (b *backendWrapper) Healthy() bool { b.mu.RLock() - healthy := b.mu.Healthy && !b.mu.failoverActive + healthy := b.mu.Healthy && b.mu.failoverSince.IsZero() b.mu.RUnlock() return healthy } @@ -165,24 +164,21 @@ func (b *backendWrapper) setFailover(active bool, since time.Time) (changed bool b.mu.Lock() defer b.mu.Unlock() if active { - if b.mu.failoverActive { + if !b.mu.failoverSince.IsZero() { return false, b.mu.failoverSince } - b.mu.failoverActive = true b.mu.failoverSince = since return true, b.mu.failoverSince } - if !b.mu.failoverActive { + if b.mu.failoverSince.IsZero() { return false, time.Time{} } - b.mu.failoverActive = false b.mu.failoverSince = time.Time{} return true, time.Time{} } -func (b *backendWrapper) Failover() (active bool, since time.Time) { +func (b *backendWrapper) FailoverSince() (since time.Time) { b.mu.RLock() - active = b.mu.failoverActive since = b.mu.failoverSince b.mu.RUnlock() return From 7ff9c560352fc336b875c4eeca8e37a0cad9b4a3 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Tue, 14 Apr 2026 11:32:30 +0800 Subject: [PATCH 3/4] refactor --- pkg/balance/router/router.go | 4 +-- pkg/balance/router/router_score.go | 53 ++++++++++++++++-------------- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/pkg/balance/router/router.go b/pkg/balance/router/router.go index 073a6227e..f32a68b9e 100644 --- a/pkg/balance/router/router.go +++ b/pkg/balance/router/router.go @@ -160,10 +160,10 @@ func (b *backendWrapper) PodName() string { return b.podName } -func (b *backendWrapper) setFailover(active bool, since time.Time) (changed bool, failoverSince time.Time) { +func (b *backendWrapper) setFailover(since time.Time) (changed bool, failoverSince time.Time) { b.mu.Lock() defer b.mu.Unlock() - if active { + if !since.IsZero() { if !b.mu.failoverSince.IsZero() { return false, b.mu.failoverSince } diff --git a/pkg/balance/router/router_score.go b/pkg/balance/router/router_score.go index 8fb5f4b36..70e164ac3 100644 --- a/pkg/balance/router/router_score.go +++ b/pkg/balance/router/router_score.go @@ -228,7 +228,6 @@ func (router *ScoreBasedRouter) updateBackendHealth(healthResults observer.Healt zap.String("backend_id", backendID), zap.String("addr", health.Addr), zap.Stringer("health", health)) backend = newBackendWrapper(backendID, *health) router.backends[backendID] = backend - router.setBackendFailoverLocked(backend, now) serverVersion = health.ServerVersion } else if ok { if !health.Equals(backend.getHealth()) { @@ -236,7 +235,6 @@ func (router *ScoreBasedRouter) updateBackendHealth(healthResults observer.Healt zap.String("backend_id", backendID), zap.String("addr", health.Addr), zap.Stringer("health", health)) } backend.setHealth(*health) - router.setBackendFailoverLocked(backend, now) if health.Healthy { serverVersion = health.ServerVersion } @@ -247,6 +245,7 @@ func (router *ScoreBasedRouter) updateBackendHealth(healthResults observer.Healt supportRedirection = health.SupportRedirection && supportRedirection } + router.updateBackendFailoverLocked(now) router.updateGroups() if len(serverVersion) > 0 { router.serverVersion = serverVersion @@ -432,32 +431,36 @@ func (router *ScoreBasedRouter) setFailoverConfigLocked(cfg *config.Config) { } router.failoverBackends = failoverBackends router.failoverTimeout = time.Duration(cfg.Proxy.FailoverTimeout) * time.Second - now := time.Now() - for _, backend := range router.backends { - router.setBackendFailoverLocked(backend, now) - } + router.updateBackendFailoverLocked(time.Now()) } -func (router *ScoreBasedRouter) setBackendFailoverLocked(backend *backendWrapper, now time.Time) { - _, active := router.failoverBackends[backend.PodName()] - if !active { - _, active = router.failoverBackends[backend.Addr()] - } - changed, since := backend.setFailover(active, now) - if !changed { - return - } - fields := []zap.Field{ - zap.String("backend_addr", backend.Addr()), - zap.String("backend_pod", backend.PodName()), - zap.Duration("failover_timeout", router.failoverTimeout), - } - if active { - fields = append(fields, zap.Time("failover_since", since)) - router.logger.Warn("backend enters failover", fields...) - return +func (router *ScoreBasedRouter) updateBackendFailoverLocked(now time.Time) { + for _, backend := range router.backends { + _, active := router.failoverBackends[backend.PodName()] + if !active { + // Also support IP:port. + _, active = router.failoverBackends[backend.Addr()] + } + since := time.Time{} + if active { + since = now + } + changed, since := backend.setFailover(since) + if !changed { + return + } + fields := []zap.Field{ + zap.String("backend_addr", backend.Addr()), + zap.String("backend_pod", backend.PodName()), + zap.Duration("failover_timeout", router.failoverTimeout), + } + if active { + fields = append(fields, zap.Time("failover_since", since)) + router.logger.Warn("backend enters failover", fields...) + return + } + router.logger.Info("backend exits failover", fields...) } - router.logger.Info("backend exits failover", fields...) } func (router *ScoreBasedRouter) ConnCount() int { From cfcc8c7a8faa1a6b465160f665af555995165d97 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Tue, 14 Apr 2026 22:21:46 +0800 Subject: [PATCH 4/4] add comment --- pkg/proxy/backend/backend_conn_mgr.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/proxy/backend/backend_conn_mgr.go b/pkg/proxy/backend/backend_conn_mgr.go index fc6521e08..ac0e4e2f5 100644 --- a/pkg/proxy/backend/backend_conn_mgr.go +++ b/pkg/proxy/backend/backend_conn_mgr.go @@ -710,6 +710,7 @@ func (mgr *BackendConnManager) Redirect(backendInst router.BackendInst) bool { return true } +// ForceClose forces closing the connection when the failover times out. func (mgr *BackendConnManager) ForceClose() bool { for { status := mgr.closeStatus.Load()