Skip to content

Commit bd1e5c0

Browse files
committed
MEDIUM: add retry mechanism to bootstrap cluster procedure
this will add exponential backoff to the process
1 parent b71f406 commit bd1e5c0

2 files changed

Lines changed: 85 additions & 40 deletions

File tree

configuration/cluster_sync.go

Lines changed: 45 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"strings"
3434
"time"
3535

36-
"github.com/GehirnInc/crypt"
3736
"github.com/google/renameio"
3837
client_native "github.com/haproxytech/client-native/v3"
3938
"github.com/haproxytech/config-parser/v4/types"
@@ -115,7 +114,7 @@ func (c *ClusterSync) monitorCertificateRefresh() {
115114
log.Warning(err)
116115
continue
117116
}
118-
err = renameio.WriteFile(path.Join(c.cfg.GetClusterCertDir(), fmt.Sprintf("dataplane-%s-csr.crt", c.cfg.Name.Load())), []byte(csr), 0644)
117+
err = renameio.WriteFile(path.Join(c.cfg.GetClusterCertDir(), fmt.Sprintf("dataplane-%s-csr.crt", c.cfg.Name.Load())), []byte(csr), 0o644)
119118
if err != nil {
120119
log.Warning(err)
121120
continue
@@ -174,12 +173,12 @@ func (c *ClusterSync) issueRefreshRequest(url, port, basePath string, nodesPath
174173
return err
175174
}
176175
log.Infof("Cluster re joined, status: %s", responseData.Status)
177-
err = renameio.WriteFile(path.Join(c.cfg.GetClusterCertDir(), fmt.Sprintf("dataplane-%s.crt", c.cfg.Name.Load())), []byte(csr), 0644)
176+
err = renameio.WriteFile(path.Join(c.cfg.GetClusterCertDir(), fmt.Sprintf("dataplane-%s.crt", c.cfg.Name.Load())), []byte(csr), 0o644)
178177
if err != nil {
179178
log.Warning(err)
180179
return err
181180
}
182-
err = renameio.WriteFile(path.Join(c.cfg.GetClusterCertDir(), fmt.Sprintf("dataplane-%s.key", c.cfg.Name.Load())), []byte(key), 0644)
181+
err = renameio.WriteFile(path.Join(c.cfg.GetClusterCertDir(), fmt.Sprintf("dataplane-%s.key", c.cfg.Name.Load())), []byte(key), 0o644)
183182
if err != nil {
184183
log.Warning(err)
185184
return err
@@ -254,12 +253,12 @@ func (c *ClusterSync) monitorBootstrapKey() {
254253
log.Warning(err)
255254
break
256255
}
257-
err = renameio.WriteFile(path.Join(c.cfg.GetClusterCertDir(), fmt.Sprintf("dataplane-%s.key", c.cfg.Name.Load())), []byte(key), 0644)
256+
err = renameio.WriteFile(path.Join(c.cfg.GetClusterCertDir(), fmt.Sprintf("dataplane-%s.key", c.cfg.Name.Load())), []byte(key), 0o644)
258257
if err != nil {
259258
log.Warning(err)
260259
break
261260
}
262-
err = renameio.WriteFile(path.Join(c.cfg.GetClusterCertDir(), fmt.Sprintf("dataplane-%s-csr.crt", c.cfg.Name.Load())), []byte(csr), 0644)
261+
err = renameio.WriteFile(path.Join(c.cfg.GetClusterCertDir(), fmt.Sprintf("dataplane-%s-csr.crt", c.cfg.Name.Load())), []byte(csr), 0o644)
263262
if err != nil {
264263
log.Warning(err)
265264
break
@@ -273,11 +272,45 @@ func (c *ClusterSync) monitorBootstrapKey() {
273272
registerMerhod = method
274273
}
275274
log.Warningf("issuing cluster join request to cluster %s at %s", data["name"], data["address"])
276-
err = c.issueJoinRequest(url, data["port"], data["api-base-path"], c.cfg.Cluster.APIRegisterPath.Load(), registerMerhod, csr, key)
275+
userStore := GetUsersStore()
276+
user, pwd, err := misc.CreateClusterUser()
277+
if err != nil {
278+
log.Error(err)
279+
break
280+
}
281+
err = userStore.AddUser(user)
282+
if err != nil {
283+
log.Error(err)
284+
break
285+
}
286+
backOff := 1
287+
numTries := 0
288+
maxTries := 10
289+
for {
290+
err = c.issueJoinRequest(url, data["port"], data["api-base-path"], c.cfg.Cluster.APIRegisterPath.Load(), registerMerhod, csr, key, user, pwd)
291+
if err == nil {
292+
break
293+
}
294+
log.Error(err)
295+
if !misc.IsNetworkErr(err) {
296+
break
297+
}
298+
numTries++
299+
backOff *= 2
300+
if backOff > 60 {
301+
backOff = 60
302+
}
303+
if numTries > maxTries {
304+
log.Error("Joining cluster failed")
305+
break
306+
}
307+
log.Warningf("Joining cluster will be retried after %d seconds [%d/%d]", backOff, numTries, maxTries)
308+
time.Sleep(time.Second * time.Duration(backOff))
309+
}
277310
if err != nil {
278-
log.Warning(err)
279311
break
280312
}
313+
281314
if !c.cfg.Cluster.CertificateFetched.Load() {
282315
log.Warningf("starting certificate fetch")
283316
c.certFetch <- struct{}{}
@@ -288,37 +321,9 @@ func (c *ClusterSync) monitorBootstrapKey() {
288321
}
289322
}
290323

291-
func (c *ClusterSync) issueJoinRequest(url, port, basePath string, registerPath string, registerMethod string, csr, key string) error {
324+
func (c *ClusterSync) issueJoinRequest(url, port, basePath string, registerPath string, registerMethod string, csr, key string, user types.User, userPWD string) error {
292325
url = fmt.Sprintf("%s:%s/%s", url, port, strings.TrimLeft(path.Join(basePath, registerPath), "/"))
293326
apiCfg := c.cfg.APIOptions
294-
userStore := GetUsersStore()
295-
296-
// create a new user for connecting to cluster
297-
name, err := misc.RandomString(8)
298-
if err != nil {
299-
return err
300-
}
301-
pwd, err := misc.RandomString(24)
302-
if err != nil {
303-
return err
304-
}
305-
306-
cryptAlg := crypt.New(crypt.SHA512)
307-
hash, err := cryptAlg.Generate([]byte(pwd), nil)
308-
if err != nil {
309-
return err
310-
}
311-
name = fmt.Sprintf("dpapi-c-%s", name)
312-
log.Infof("Creating user %s for cluster connection", name)
313-
user := types.User{
314-
Name: name,
315-
IsInsecure: false,
316-
Password: hash,
317-
}
318-
err = userStore.AddUser(user)
319-
if err != nil {
320-
return err
321-
}
322327

323328
apiAddress := apiCfg.APIAddress
324329
if apiAddress == "" {
@@ -333,7 +338,7 @@ func (c *ClusterSync) issueJoinRequest(url, port, basePath string, registerPath
333338
// ID: "",
334339
Address: apiAddress,
335340
APIBasePath: c.cfg.RuntimeData.APIBasePath,
336-
APIPassword: pwd,
341+
APIPassword: userPWD,
337342
APIUser: user.Name,
338343
Certificate: csr,
339344
Description: "",
@@ -380,7 +385,7 @@ func (c *ClusterSync) issueJoinRequest(url, port, basePath string, registerPath
380385
return errCfg
381386
}
382387
// write id to file
383-
errFID := ioutil.WriteFile(c.cfg.HAProxy.NodeIDFile, []byte(responseData.ID), 0644) // nolint:gosec
388+
errFID := ioutil.WriteFile(c.cfg.HAProxy.NodeIDFile, []byte(responseData.ID), 0o644) // nolint:gosec
384389
if errFID != nil {
385390
return errFID
386391
}
@@ -459,7 +464,7 @@ func (c *ClusterSync) checkCertificate(node Node) (fetched bool, err error) {
459464
c.cfg.Status.Store("unconfigured")
460465
return false, nil
461466
}
462-
err = renameio.WriteFile(path.Join(c.cfg.GetClusterCertDir(), fmt.Sprintf("dataplane-%s.crt", c.cfg.Name.Load())), []byte(node.Certificate), 0644)
467+
err = renameio.WriteFile(path.Join(c.cfg.GetClusterCertDir(), fmt.Sprintf("dataplane-%s.crt", c.cfg.Name.Load())), []byte(node.Certificate), 0o644)
463468
if err != nil {
464469
c.cfg.Status.Store("unconfigured")
465470
return false, err

misc/misc.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,21 @@ import (
2121
"encoding/json"
2222
"errors"
2323
"fmt"
24+
"net"
2425
"net/http"
2526
"os"
2627
"strconv"
2728
"strings"
2829

30+
"github.com/GehirnInc/crypt"
2931
"github.com/haproxytech/client-native/v3/configuration"
3032
client_errors "github.com/haproxytech/client-native/v3/errors"
3133
"github.com/haproxytech/client-native/v3/models"
34+
"github.com/haproxytech/config-parser/v4/types"
3235
jsoniter "github.com/json-iterator/go"
3336

3437
"github.com/haproxytech/dataplaneapi/haproxy"
38+
"github.com/haproxytech/dataplaneapi/log"
3539
"github.com/haproxytech/dataplaneapi/rate"
3640
)
3741

@@ -260,3 +264,39 @@ func RandomString(size int) (string, error) {
260264
result = strings.ReplaceAll(result, `_`, ``)
261265
return result[:size], err
262266
}
267+
268+
func IsNetworkErr(err error) bool {
269+
if err == nil {
270+
return false
271+
}
272+
if _, ok := err.(net.Error); ok {
273+
return true
274+
}
275+
return false
276+
}
277+
278+
func CreateClusterUser() (types.User, string, error) {
279+
// create a new user for connecting to cluster
280+
name, err := RandomString(8)
281+
if err != nil {
282+
return types.User{}, "", err
283+
}
284+
pwd, err := RandomString(24)
285+
if err != nil {
286+
return types.User{}, "", err
287+
}
288+
289+
cryptAlg := crypt.New(crypt.SHA512)
290+
hash, err := cryptAlg.Generate([]byte(pwd), nil)
291+
if err != nil {
292+
return types.User{}, "", err
293+
}
294+
name = fmt.Sprintf("dpapi-c-%s", name)
295+
log.Infof("Creating user %s for cluster connection", name)
296+
user := types.User{
297+
Name: name,
298+
IsInsecure: false,
299+
Password: hash,
300+
}
301+
return user, pwd, nil
302+
}

0 commit comments

Comments
 (0)