-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathrouter.go
More file actions
298 lines (263 loc) · 6.73 KB
/
router.go
File metadata and controls
298 lines (263 loc) · 6.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
// Copyright 2023 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0
package router
import (
"net"
"strings"
"sync"
"time"
glist "github.com/bahlo/generic-list-go"
"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/lib/util/errors"
"github.com/pingcap/tiproxy/pkg/balance/observer"
"go.uber.org/zap"
)
var (
ErrNoBackend = errors.New("no available backend")
ErrPortConflict = errors.New("port routing conflict")
)
// ConnEventReceiver receives connection events.
type ConnEventReceiver interface {
OnRedirectSucceed(from, to string, conn RedirectableConn) error
OnRedirectFail(from, to string, conn RedirectableConn) error
OnConnClosed(backendID, redirectingBackendID string, conn RedirectableConn) error
}
// Router routes client connections to backends.
type Router interface {
GetBackendSelector(clientInfo ClientInfo) BackendSelector
HealthyBackendCount() int
RefreshBackend()
RedirectConnections() error
ConnCount() int
// ServerVersion returns the TiDB version.
ServerVersion() string
Close()
}
type connPhase int
const (
// The session is never redirected.
phaseNotRedirected connPhase = iota
// The session is redirecting.
phaseRedirectNotify
// The session redirected successfully last time.
phaseRedirectEnd
// The session failed to redirect last time.
phaseRedirectFail
// The connection is closed.
phaseClosed
)
const (
// The interval to rebalance connections.
rebalanceInterval = 10 * time.Millisecond
// After a connection fails to redirect, it may contain some unmigratable status.
// Limit its redirection interval to avoid unnecessary retrial to reduce latency jitter.
redirectFailMinInterval = 3 * time.Second
)
// RedirectableConn indicates a redirect-able connection.
type RedirectableConn interface {
SetEventReceiver(receiver ConnEventReceiver)
SetValue(key, val any)
Value(key any) any
// Redirect returns false if the current conn is not redirectable.
Redirect(backend BackendInst) bool
// ForceClose closes the connection immediately and returns false if it's already closing.
ForceClose() bool
ConnectionID() uint64
ConnInfo() []zap.Field
}
// BackendInst defines a backend that a connection is redirecting to.
type BackendInst interface {
ID() string
Addr() string
Healthy() bool
Local() bool
Keyspace() string
ClusterName() string
}
// backendWrapper contains the connections on the backend.
type backendWrapper struct {
mu struct {
sync.RWMutex
observer.BackendHealth
failoverSince time.Time
}
id string
addr string
podName string
// connScore is used for calculating backend scores and check if the backend can be removed from the list.
// connScore = connList.Len() + incoming connections - outgoing connections.
connScore int
// A list of *connWrapper and is ordered by the connecting or redirecting time.
// connList only includes the connections that are currently on this backend.
connList *glist.List[*connWrapper]
// The group that this backend belongs to.
group *Group
}
func newBackendWrapper(id string, health observer.BackendHealth) *backendWrapper {
wrapper := &backendWrapper{
id: id,
addr: health.Addr,
podName: backendPodNameFromAddr(health.Addr),
connList: glist.New[*connWrapper](),
}
wrapper.setHealth(health)
return wrapper
}
func (b *backendWrapper) setHealth(health observer.BackendHealth) {
b.mu.Lock()
b.mu.BackendHealth = health
b.mu.Unlock()
}
func (b *backendWrapper) getHealth() observer.BackendHealth {
b.mu.RLock()
health := b.mu.BackendHealth
b.mu.RUnlock()
return health
}
func (b *backendWrapper) ConnScore() int {
return b.connScore
}
func (b *backendWrapper) ID() string {
return b.id
}
func (b *backendWrapper) Addr() string {
return b.addr
}
func (b *backendWrapper) Healthy() bool {
b.mu.RLock()
healthy := b.mu.Healthy && b.mu.failoverSince.IsZero()
b.mu.RUnlock()
return healthy
}
func (b *backendWrapper) ObservedHealthy() bool {
b.mu.RLock()
healthy := b.mu.Healthy
b.mu.RUnlock()
return healthy
}
func (b *backendWrapper) PodName() string {
return b.podName
}
func (b *backendWrapper) setFailover(since time.Time) (changed bool, failoverSince time.Time) {
b.mu.Lock()
defer b.mu.Unlock()
if !since.IsZero() {
if !b.mu.failoverSince.IsZero() {
return false, b.mu.failoverSince
}
b.mu.failoverSince = since
return true, b.mu.failoverSince
}
if b.mu.failoverSince.IsZero() {
return false, time.Time{}
}
b.mu.failoverSince = time.Time{}
return true, time.Time{}
}
func (b *backendWrapper) FailoverSince() (since time.Time) {
b.mu.RLock()
since = b.mu.failoverSince
b.mu.RUnlock()
return
}
func (b *backendWrapper) ServerVersion() string {
b.mu.RLock()
version := b.mu.ServerVersion
b.mu.RUnlock()
return version
}
func (b *backendWrapper) ConnCount() int {
return b.connList.Len()
}
func (b *backendWrapper) Local() bool {
b.mu.RLock()
local := b.mu.Local
b.mu.RUnlock()
return local
}
func (b *backendWrapper) GetBackendInfo() observer.BackendInfo {
b.mu.RLock()
info := b.mu.BackendInfo
b.mu.RUnlock()
return info
}
func (b *backendWrapper) SupportRedirection() bool {
b.mu.RLock()
supportRedirection := b.mu.SupportRedirection
b.mu.RUnlock()
return supportRedirection
}
func (b *backendWrapper) Keyspace() string {
b.mu.RLock()
defer b.mu.RUnlock()
labels := b.mu.BackendHealth.Labels
if labels == nil {
return ""
}
return labels[config.KeyspaceLabelName]
}
func (b *backendWrapper) ClusterName() string {
b.mu.RLock()
defer b.mu.RUnlock()
return b.mu.BackendHealth.ClusterName
}
func (b *backendWrapper) Cidr() []string {
labels := b.getHealth().Labels
if len(labels) == 0 {
return nil
}
cidr := labels[config.CidrLabelName]
if len(cidr) == 0 {
return nil
}
cidrs := strings.Split(cidr, ",")
for i := len(cidrs) - 1; i >= 0; i-- {
cidr = strings.TrimSpace(cidrs[i])
if len(cidr) == 0 {
cidrs = append(cidrs[:i], cidrs[i+1:]...)
} else {
cidrs[i] = cidr
}
}
return cidrs
}
func (b *backendWrapper) TiProxyPort() string {
labels := b.getHealth().Labels
if len(labels) == 0 {
return ""
}
return strings.TrimSpace(labels[config.TiProxyPortLabelName])
}
func (b *backendWrapper) String() string {
b.mu.RLock()
str := b.mu.String()
b.mu.RUnlock()
return str
}
// connWrapper wraps RedirectableConn.
type connWrapper struct {
RedirectableConn
// The reason why the redirection happens.
redirectReason string
// Last redirect start time of this connection.
lastRedirect time.Time
createTime time.Time
phase connPhase
forceClosing bool
}
func backendPodNameFromAddr(addr string) string {
host, _, err := net.SplitHostPort(addr)
if err != nil {
host = addr
}
if host == "" {
return ""
}
if ip := net.ParseIP(host); ip != nil {
return host
}
if idx := strings.IndexByte(host, '.'); idx >= 0 {
return host[:idx]
}
return host
}