Skip to content

Commit 248210c

Browse files
authored
balance, config: add configs to tune the migration speed (#1032)
1 parent 628c546 commit 248210c

15 files changed

Lines changed: 412 additions & 50 deletions

lib/config/balance.go

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,26 @@ const (
1010
BalancePolicyLocation = "location"
1111
BalancePolicyConnection = "connection"
1212

13-
RoutePolicyPreferIdle = "prefer-idle"
14-
RoutePolicyRandom = "random"
13+
RoutingPolicyPreferIdle = "prefer-idle"
14+
RoutingPolicyRandom = "random"
15+
RoutingPolicyIdlest = "idlest"
1516
)
1617

1718
type Balance struct {
18-
LabelName string `yaml:"label-name,omitempty" toml:"label-name,omitempty" json:"label-name,omitempty" reloadable:"true"`
19-
RoutingRule string `yaml:"routing-rule,omitempty" toml:"routing-rule,omitempty" json:"routing-rule,omitempty" reloadable:"false"`
20-
Policy string `yaml:"policy,omitempty" toml:"policy,omitempty" json:"policy,omitempty" reloadable:"true"`
21-
RoutePolicy string `yaml:"route-policy,omitempty" toml:"route-policy,omitempty" json:"route-policy,omitempty" reloadable:"true"`
19+
LabelName string `yaml:"label-name,omitempty" toml:"label-name,omitempty" json:"label-name,omitempty" reloadable:"true"`
20+
RoutingRule string `yaml:"routing-rule,omitempty" toml:"routing-rule,omitempty" json:"routing-rule,omitempty" reloadable:"false"`
21+
Policy string `yaml:"policy,omitempty" toml:"policy,omitempty" json:"policy,omitempty" reloadable:"true"`
22+
RoutingPolicy string `yaml:"routing-policy,omitempty" toml:"routing-policy,omitempty" json:"routing-policy,omitempty" reloadable:"true"`
23+
Status Factor `yaml:"status,omitempty" toml:"status,omitempty" json:"status,omitempty" reloadable:"true"`
24+
Health Factor `yaml:"health,omitempty" toml:"health,omitempty" json:"health,omitempty" reloadable:"true"`
25+
Memory Factor `yaml:"memory,omitempty" toml:"memory,omitempty" json:"memory,omitempty" reloadable:"true"`
26+
CPU Factor `yaml:"cpu,omitempty" toml:"cpu,omitempty" json:"cpu,omitempty" reloadable:"true"`
27+
Location Factor `yaml:"location,omitempty" toml:"location,omitempty" json:"location,omitempty" reloadable:"true"`
28+
ConnCount Factor `yaml:"conn-count,omitempty" toml:"conn-count,omitempty" json:"conn-count,omitempty" reloadable:"true"`
29+
}
30+
31+
type Factor struct {
32+
MigrationsPerSecond float64 `yaml:"migrations-per-second,omitempty" toml:"migrations-per-second,omitempty" json:"migrations-per-second,omitempty" reloadable:"true"`
2233
}
2334

2435
func (b *Balance) Check() error {
@@ -30,20 +41,38 @@ func (b *Balance) Check() error {
3041
default:
3142
return errors.Wrapf(ErrInvalidConfigValue, "invalid balance.policy")
3243
}
33-
switch b.RoutePolicy {
34-
case RoutePolicyPreferIdle, RoutePolicyRandom:
44+
switch b.RoutingPolicy {
45+
case RoutingPolicyPreferIdle, RoutingPolicyRandom:
3546
return nil
3647
case "":
37-
b.RoutePolicy = RoutePolicyPreferIdle
48+
b.RoutingPolicy = RoutingPolicyPreferIdle
3849
default:
3950
return errors.Wrapf(ErrInvalidConfigValue, "invalid balance.route-policy")
4051
}
52+
if b.Status.MigrationsPerSecond < 0 {
53+
return errors.Wrapf(ErrInvalidConfigValue, "invalid balance.status.migrations-per-second")
54+
}
55+
if b.Health.MigrationsPerSecond < 0 {
56+
return errors.Wrapf(ErrInvalidConfigValue, "invalid balance.health.migrations-per-second")
57+
}
58+
if b.Memory.MigrationsPerSecond < 0 {
59+
return errors.Wrapf(ErrInvalidConfigValue, "invalid balance.memory.migrations-per-second")
60+
}
61+
if b.CPU.MigrationsPerSecond < 0 {
62+
return errors.Wrapf(ErrInvalidConfigValue, "invalid balance.cpu.migrations-per-second")
63+
}
64+
if b.Location.MigrationsPerSecond < 0 {
65+
return errors.Wrapf(ErrInvalidConfigValue, "invalid balance.location.migrations-per-second")
66+
}
67+
if b.ConnCount.MigrationsPerSecond < 0 {
68+
return errors.Wrapf(ErrInvalidConfigValue, "invalid balance.conn-count.migrations-per-second")
69+
}
4170
return nil
4271
}
4372

4473
func DefaultBalance() Balance {
4574
return Balance{
46-
Policy: BalancePolicyResource,
47-
RoutePolicy: RoutePolicyPreferIdle,
75+
Policy: BalancePolicyResource,
76+
RoutingPolicy: RoutingPolicyPreferIdle,
4877
}
4978
}

pkg/balance/factor/factor_balance.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,13 +193,32 @@ func (fbb *FactorBasedBalance) BackendToRoute(backends []policy.BackendCtx) poli
193193
}
194194

195195
switch fbb.routePolicy {
196-
case config.RoutePolicyRandom:
196+
case config.RoutingPolicyRandom:
197197
return fbb.routeRandom(scoredBackends, &fields)
198+
case config.RoutingPolicyIdlest:
199+
return fbb.routeIdlest(scoredBackends, &fields)
198200
default:
199201
return fbb.routePreferIdle(scoredBackends, &fields)
200202
}
201203
}
202204

205+
func (fbb *FactorBasedBalance) routeIdlest(scoredBackends []scoredBackend, fields *[]zap.Field) policy.BackendCtx {
206+
// Evict the backends that are can't be routed to, and then choose the idlest one.
207+
// It's like least-connection algorithm.
208+
idx := -1
209+
for i := range scoredBackends {
210+
if fbb.canBeRouted(scoredBackends[i].scoreBits) {
211+
idx = i
212+
break
213+
}
214+
}
215+
if idx < 0 {
216+
return nil
217+
}
218+
*fields = append(*fields, zap.String("target", scoredBackends[idx].Addr()))
219+
return scoredBackends[idx].BackendCtx
220+
}
221+
203222
func (fbb *FactorBasedBalance) routeRandom(scoredBackends []scoredBackend, fields *[]zap.Field) policy.BackendCtx {
204223
// Evict the backends that are can't be routed to, and then choose one randomly, with the idlest one having 10% higher priority.
205224
// It works when new connections are more intensive. If new connections are all routed to the newly scaled backend,
@@ -370,7 +389,7 @@ func (fbb *FactorBasedBalance) SetConfig(cfg *config.Config) {
370389
fbb.Lock()
371390
defer fbb.Unlock()
372391
fbb.setFactors(cfg)
373-
fbb.routePolicy = cfg.Balance.RoutePolicy
392+
fbb.routePolicy = cfg.Balance.RoutingPolicy
374393
}
375394

376395
func (fbb *FactorBasedBalance) Close() {

pkg/balance/factor/factor_balance_test.go

Lines changed: 75 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,62 +33,67 @@ func TestRouteWithOneFactor(t *testing.T) {
3333
{
3434
scores: []int{10},
3535
idxRange: []int{0},
36-
policy: config.RoutePolicyRandom,
36+
policy: config.RoutingPolicyRandom,
3737
},
3838
{
3939
scores: []int{10, 20},
4040
idxRange: []int{0, 1},
41-
policy: config.RoutePolicyRandom,
41+
policy: config.RoutingPolicyRandom,
4242
},
4343
{
4444
scores: []int{10, 20, 30},
4545
idxRange: []int{0, 1, 2},
46-
policy: config.RoutePolicyRandom,
46+
policy: config.RoutingPolicyRandom,
4747
},
4848
{
4949
scores: []int{30, 20, 10},
5050
idxRange: []int{0, 1, 2},
51-
policy: config.RoutePolicyRandom,
51+
policy: config.RoutingPolicyRandom,
5252
},
5353
{
5454
scores: []int{30, 11, 10},
5555
idxRange: []int{0, 1, 2},
56-
policy: config.RoutePolicyRandom,
56+
policy: config.RoutingPolicyRandom,
5757
},
5858
{
5959
scores: []int{11, 11, 10},
6060
idxRange: []int{0, 1, 2},
61-
policy: config.RoutePolicyRandom,
61+
policy: config.RoutingPolicyRandom,
6262
},
6363
{
6464
scores: []int{10},
6565
idxRange: []int{0},
66-
policy: config.RoutePolicyPreferIdle,
66+
policy: config.RoutingPolicyPreferIdle,
6767
},
6868
{
6969
scores: []int{10, 20},
7070
idxRange: []int{0},
71-
policy: config.RoutePolicyPreferIdle,
71+
policy: config.RoutingPolicyPreferIdle,
7272
},
7373
{
7474
scores: []int{10, 20, 30},
7575
idxRange: []int{0},
76-
policy: config.RoutePolicyPreferIdle,
76+
policy: config.RoutingPolicyPreferIdle,
7777
},
7878
{
7979
scores: []int{30, 20, 10},
8080
idxRange: []int{2},
81-
policy: config.RoutePolicyPreferIdle,
81+
policy: config.RoutingPolicyPreferIdle,
8282
},
8383
{
8484
scores: []int{30, 11, 10},
8585
idxRange: []int{1, 2},
86-
policy: config.RoutePolicyPreferIdle,
86+
policy: config.RoutingPolicyPreferIdle,
8787
},
8888
{
8989
scores: []int{11, 11, 10},
9090
idxRange: []int{0, 1, 2},
91-
policy: config.RoutePolicyPreferIdle,
91+
policy: config.RoutingPolicyPreferIdle,
92+
},
93+
{
94+
scores: []int{10, 10, 9},
95+
idxRange: []int{2},
96+
policy: config.RoutingPolicyIdlest,
9297
},
9398
{
9499
scores: []int{10, 20},
@@ -114,10 +119,10 @@ func TestRouteWithOneFactor(t *testing.T) {
114119
}
115120
}
116121

117-
func TestRouteIdlestWith2Factors(t *testing.T) {
122+
func TestRoutePreferIdleWith2Factors(t *testing.T) {
118123
lg, _ := logger.CreateLoggerForTest(t)
119124
fm := NewFactorBasedBalance(lg, newMockMetricsReader())
120-
fm.routePolicy = config.RoutePolicyPreferIdle
125+
fm.routePolicy = config.RoutingPolicyPreferIdle
121126
factor1 := &mockFactor{bitNum: 4, balanceCount: 1, threshold: 1, canBeRouted: true}
122127
factor2 := &mockFactor{bitNum: 12, balanceCount: 1, threshold: 1, canBeRouted: true}
123128
fm.factors = []Factor{factor1, factor2}
@@ -189,7 +194,7 @@ func TestRouteIdlestWith2Factors(t *testing.T) {
189194
func TestRouteRandomWith2Factors(t *testing.T) {
190195
lg, _ := logger.CreateLoggerForTest(t)
191196
fm := NewFactorBasedBalance(lg, newMockMetricsReader())
192-
fm.routePolicy = config.RoutePolicyRandom
197+
fm.routePolicy = config.RoutingPolicyRandom
193198
factor1 := &mockFactor{bitNum: 4, balanceCount: 1, threshold: 1, canBeRouted: false}
194199
factor2 := &mockFactor{bitNum: 12, balanceCount: 1, threshold: 1, canBeRouted: true}
195200
fm.factors = []Factor{factor1, factor2}
@@ -253,6 +258,58 @@ func TestRouteRandomWith2Factors(t *testing.T) {
253258
}
254259
}
255260

261+
func TestRouteIdlestWith2Factors(t *testing.T) {
262+
lg, _ := logger.CreateLoggerForTest(t)
263+
fm := NewFactorBasedBalance(lg, newMockMetricsReader())
264+
fm.routePolicy = config.RoutingPolicyIdlest
265+
factor1 := &mockFactor{bitNum: 4, balanceCount: 1, threshold: 1, canBeRouted: true}
266+
factor2 := &mockFactor{bitNum: 12, balanceCount: 1, threshold: 1, canBeRouted: true}
267+
fm.factors = []Factor{factor1, factor2}
268+
require.NoError(t, fm.updateBitNum())
269+
tests := []struct {
270+
scores1 []int
271+
scores2 []int
272+
idxRange []int
273+
}{
274+
{
275+
scores1: []int{10, 0, 0},
276+
scores2: []int{0, 100, 200},
277+
idxRange: []int{1},
278+
},
279+
{
280+
scores1: []int{10, 10, 0},
281+
scores2: []int{0, 100, 200},
282+
idxRange: []int{2},
283+
},
284+
{
285+
scores1: []int{10, 10, 10},
286+
scores2: []int{0, 100, 200},
287+
idxRange: []int{0},
288+
},
289+
}
290+
for tIdx, test := range tests {
291+
factor1.updateScore = func(backends []scoredBackend) {
292+
for i := range backends {
293+
backends[i].addScore(test.scores1[i], factor1.bitNum)
294+
}
295+
}
296+
factor2.updateScore = func(backends []scoredBackend) {
297+
for i := range backends {
298+
backends[i].addScore(test.scores2[i], factor2.bitNum)
299+
}
300+
}
301+
backends := createBackends(len(test.scores1))
302+
targets := make(map[int]struct{}, len(test.scores1))
303+
for i := 0; len(targets) < len(test.idxRange) || i < 100; i++ {
304+
require.Less(t, i, 100000, "test index %d", tIdx)
305+
backend := fm.BackendToRoute(backends)
306+
idx := slices.Index(backends, backend)
307+
require.Contains(t, test.idxRange, idx, "test index %d", tIdx)
308+
targets[idx] = struct{}{}
309+
}
310+
}
311+
}
312+
256313
func TestBalanceWithOneFactor(t *testing.T) {
257314
lg, _ := logger.CreateLoggerForTest(t)
258315
fm := NewFactorBasedBalance(lg, newMockMetricsReader())
@@ -639,11 +696,11 @@ func TestSetConfigsConcurrently(t *testing.T) {
639696
wg.Run(func() {
640697
defer wg.Done()
641698
policies := []string{config.BalancePolicyConnection, config.BalancePolicyResource, config.BalancePolicyLocation}
642-
routePolicies := []string{config.RoutePolicyRandom, config.RoutePolicyPreferIdle}
699+
routePolicies := []string{config.RoutingPolicyRandom, config.RoutingPolicyPreferIdle, config.RoutingPolicyIdlest}
643700
for i := 0; ctx.Err() != nil; i++ {
644701
cfg.Balance = config.Balance{
645-
Policy: policies[i%len(policies)],
646-
RoutePolicy: routePolicies[i%len(routePolicies)],
702+
Policy: policies[i%len(policies)],
703+
RoutingPolicy: routePolicies[i%len(routePolicies)],
647704
}
648705
fbb.SetConfig(cfg)
649706
}

pkg/balance/factor/factor_conn.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ var _ Factor = (*FactorConnCount)(nil)
2525
// - The backend CPU usages are unavailable
2626
// - The workload just starts and the backend CPU usages are low
2727
type FactorConnCount struct {
28-
bitNum int
28+
bitNum int
29+
migrationsPerSecond float64
2930
}
3031

3132
func NewFactorConnCount() *FactorConnCount {
@@ -58,6 +59,9 @@ func (fcc *FactorConnCount) BalanceCount(from, to scoredBackend) (BalanceAdvice,
5859
if float64(from.ConnScore()) <= float64(to.ConnScore()+1)*connBalancedRatio {
5960
return AdviceNeutral, 0, nil
6061
}
62+
if fcc.migrationsPerSecond > 0 {
63+
return AdvicePositive, fcc.migrationsPerSecond, nil
64+
}
6165
targetTo := float64(from.ConnScore()+to.ConnScore()+1) / (1 + connBalancedRatio)
6266
count := (targetTo - float64(to.ConnScore()+1)) / balanceSeconds4Conn
6367
if count < 0 {
@@ -67,6 +71,7 @@ func (fcc *FactorConnCount) BalanceCount(from, to scoredBackend) (BalanceAdvice,
6771
}
6872

6973
func (fcc *FactorConnCount) SetConfig(cfg *config.Config) {
74+
fcc.migrationsPerSecond = cfg.Balance.ConnCount.MigrationsPerSecond
7075
}
7176

7277
func (fcc *FactorConnCount) CanBeRouted(_ uint64) bool {

pkg/balance/factor/factor_conn_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package factor
66
import (
77
"testing"
88

9+
"github.com/pingcap/tiproxy/lib/config"
910
"github.com/stretchr/testify/require"
1011
"go.uber.org/zap"
1112
)
@@ -94,3 +95,36 @@ func TestFactorConnSpeed(t *testing.T) {
9495
require.LessOrEqual(t, backend2.connScore, test.targetRange[1], "case id: %d", i)
9596
}
9697
}
98+
99+
func TestFactorConnConfig(t *testing.T) {
100+
tests := []struct {
101+
score1 int
102+
score2 int
103+
speed float64
104+
}{
105+
{
106+
score1: 120,
107+
score2: 100,
108+
speed: 0,
109+
},
110+
{
111+
score1: 150,
112+
score2: 100,
113+
speed: 10,
114+
},
115+
}
116+
117+
factor := NewFactorConnCount()
118+
factor.SetConfig(&config.Config{Balance: config.Balance{ConnCount: config.Factor{MigrationsPerSecond: 10}}})
119+
require.EqualValues(t, 10, factor.migrationsPerSecond)
120+
backend1 := newMockBackend(true, 0)
121+
backend2 := newMockBackend(true, 0)
122+
scoredBackend1 := newScoredBackend(backend1, zap.NewNop())
123+
scoredBackend2 := newScoredBackend(backend2, zap.NewNop())
124+
for i, test := range tests {
125+
backend1.connScore = test.score1
126+
backend2.connScore = test.score2
127+
_, balanceCount, _ := factor.BalanceCount(scoredBackend1, scoredBackend2)
128+
require.EqualValues(t, balanceCount, test.speed, "case id: %d", i)
129+
}
130+
}

pkg/balance/factor/factor_cpu.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,11 @@ type FactorCPU struct {
9494
// The updated time of the metric that we've read last time.
9595
lastMetricTime time.Time
9696
// The estimated average CPU usage used by one connection.
97-
usagePerConn float64
98-
mr metricsreader.MetricsReader
99-
bitNum int
100-
lg *zap.Logger
97+
usagePerConn float64
98+
mr metricsreader.MetricsReader
99+
bitNum int
100+
migrationsPerSecond float64
101+
lg *zap.Logger
101102
}
102103

103104
func NewFactorCPU(mr metricsreader.MetricsReader, lg *zap.Logger) *FactorCPU {
@@ -278,10 +279,14 @@ func (fc *FactorCPU) BalanceCount(from, to scoredBackend) (BalanceAdvice, float6
278279
if 1.3-toAvgUsage < (1.3-fromAvgUsage)*cpuBalancedRatio || 1.3-toLatestUsage < (1.3-fromLatestUsage)*cpuBalancedRatio {
279280
return AdviceNeutral, 0, fields
280281
}
282+
if fc.migrationsPerSecond > 0 {
283+
return AdvicePositive, fc.migrationsPerSecond, fields
284+
}
281285
return AdvicePositive, 1 / fc.usagePerConn / balanceRatio4Cpu, fields
282286
}
283287

284288
func (fc *FactorCPU) SetConfig(cfg *config.Config) {
289+
fc.migrationsPerSecond = cfg.Balance.CPU.MigrationsPerSecond
285290
}
286291

287292
func (fc *FactorCPU) CanBeRouted(_ uint64) bool {

0 commit comments

Comments
 (0)