@@ -109,9 +109,10 @@ func (s *Sentinel) dispatch(ctx context.Context, sentinel string, timeout time.D
109109func (s * Sentinel ) subscribeCommand (client * Client , sentinel string ,
110110 onSubscribed func ()) error {
111111 var channels = []interface {}{"+switch-master" }
112- if err := client .Flush ("SUBSCRIBE" , channels ... ); err != nil {
113- return errors .Trace (err )
114- }
112+ go func () {
113+ client .conn .Send ("SUBSCRIBE" , channels ... )
114+ client .conn .Flush ()
115+ }()
115116 for _ , sub := range channels {
116117 values , err := redigo .Values (client .Receive ())
117118 if err != nil {
@@ -217,46 +218,66 @@ func (s *Sentinel) Subscribe(sentinels []string, timeout time.Duration, onMajori
217218 }
218219}
219220
220- func (s * Sentinel ) existsCommand (client * Client , name string ) (bool , error ) {
221- r , err := client .Do ("SENTINEL" , "get-master-addr-by-name" , name )
222- if err != nil {
223- return false , errors .Trace (err )
221+ func (s * Sentinel ) existsCommand (client * Client , names []string ) (map [string ]bool , error ) {
222+ go func () {
223+ var pending int
224+ for _ , name := range names {
225+ pending ++
226+ client .conn .Send ("SENTINEL" , "get-master-addr-by-name" , name )
227+ }
228+ if pending != 0 {
229+ client .conn .Flush ()
230+ }
231+ }()
232+ exists := make (map [string ]bool , len (names ))
233+ for _ , name := range names {
234+ r , err := client .Receive ()
235+ if err != nil {
236+ return nil , errors .Trace (err )
237+ }
238+ exists [name ] = (r != nil )
224239 }
225- return r != nil , nil
240+ return exists , nil
226241}
227242
228- func (s * Sentinel ) masterCommand (client * Client , name string ) (map [string ]string , error ) {
229- if exists , err := s .existsCommand (client , name ); err != nil {
230- return nil , err
231- } else if ! exists {
232- return nil , nil
233- }
234- m , err := redigo .StringMap (client .Do ("SENTINEL" , "master" , name ))
243+ func (s * Sentinel ) slavesCommand (client * Client , names []string ) (map [string ][]map [string ]string , error ) {
244+ exists , err := s .existsCommand (client , names )
235245 if err != nil {
236- return nil , errors .Trace (err )
237- }
238- return m , nil
239- }
240-
241- func (s * Sentinel ) slavesCommand (client * Client , name string ) ([]map [string ]string , error ) {
242- if exists , err := s .existsCommand (client , name ); err != nil {
243246 return nil , err
244- } else if ! exists {
245- return nil , nil
246247 }
247- values , err := redigo .Values (client .Do ("SENTINEL" , "slaves" , name ))
248- if err != nil {
249- return nil , errors .Trace (err )
250- }
251- var slaves []map [string ]string
252- for i := range values {
253- m , err := redigo .StringMap (values [i ], nil )
248+ go func () {
249+ var pending int
250+ for _ , name := range names {
251+ if ! exists [name ] {
252+ continue
253+ }
254+ pending ++
255+ client .conn .Send ("SENTINEL" , "slaves" , name )
256+ }
257+ if pending != 0 {
258+ client .conn .Flush ()
259+ }
260+ }()
261+ results := make (map [string ][]map [string ]string , len (names ))
262+ for _ , name := range names {
263+ if ! exists [name ] {
264+ continue
265+ }
266+ values , err := redigo .Values (client .Receive ())
254267 if err != nil {
255268 return nil , errors .Trace (err )
256269 }
257- slaves = append (slaves , m )
270+ var slaves []map [string ]string
271+ for i := range values {
272+ m , err := redigo .StringMap (values [i ], nil )
273+ if err != nil {
274+ return nil , errors .Trace (err )
275+ }
276+ slaves = append (slaves , m )
277+ }
278+ results [name ] = slaves
258279 }
259- return slaves , nil
280+ return results , nil
260281}
261282
262283func (s * Sentinel ) mastersCommand (client * Client ) (map [int ]map [string ]string , error ) {
@@ -266,13 +287,13 @@ func (s *Sentinel) mastersCommand(client *Client) (map[int]map[string]string, er
266287 }
267288 var masters = make (map [int ]map [string ]string )
268289 for i := range values {
269- m , err := redigo .StringMap (values [i ], nil )
290+ p , err := redigo .StringMap (values [i ], nil )
270291 if err != nil {
271292 return nil , errors .Trace (err )
272293 }
273- gid , yes := s .isSameProduct (m ["name" ])
294+ gid , yes := s .isSameProduct (p ["name" ])
274295 if yes {
275- masters [gid ] = m
296+ masters [gid ] = p
276297 }
277298 }
278299 return masters , nil
@@ -281,11 +302,11 @@ func (s *Sentinel) mastersCommand(client *Client) (map[int]map[string]string, er
281302func (s * Sentinel ) mastersDispatch (ctx context.Context , sentinel string , timeout time.Duration ) (map [int ]* SentinelMaster , error ) {
282303 var masters = make (map [int ]* SentinelMaster )
283304 var err = s .dispatch (ctx , sentinel , timeout , func (c * Client ) error {
284- m , err := s .mastersCommand (c )
305+ p , err := s .mastersCommand (c )
285306 if err != nil {
286307 return err
287308 }
288- for gid , master := range m {
309+ for gid , master := range p {
289310 epoch , err := strconv .ParseInt (master ["config-epoch" ], 10 , 64 )
290311 if err != nil {
291312 s .printf ("sentinel-[%s] masters parse %s failed, config-epoch = '%s', %s" ,
@@ -398,22 +419,32 @@ type MonitorConfig struct {
398419}
399420
400421func (s * Sentinel ) monitorGroupsCommand (client * Client , sentniel string , config * MonitorConfig , groups map [int ]* net.TCPAddr ) error {
401- for gid , tcpAddr := range groups {
402- var name = s .NodeName (gid )
403- if exists , err := s .existsCommand (client , name ); err != nil {
404- return err
405- } else if exists {
406- _ , err := client .Do ("SENTINEL" , "remove" , name )
407- if err != nil {
408- return errors .Trace (err )
409- }
422+ var names []string
423+ for gid := range groups {
424+ names = append (names , s .NodeName (gid ))
425+ }
426+ if err := s .removeCommand (client , names ); err != nil {
427+ return err
428+ }
429+ go func () {
430+ for gid , tcpAddr := range groups {
431+ var ip , port = tcpAddr .IP .String (), tcpAddr .Port
432+ client .conn .Send ("SENTINEL" , "monitor" , s .NodeName (gid ), ip , port , config .Quorum )
433+ }
434+ if len (groups ) != 0 {
435+ client .conn .Flush ()
410436 }
411- var ip , port = tcpAddr .IP .String (), tcpAddr .Port
412- _ , err := client .Do ("SENTINEL" , "monitor" , name , ip , port , config .Quorum )
437+ }()
438+ for _ = range groups {
439+ _ , err := client .Receive ()
413440 if err != nil {
414441 return errors .Trace (err )
415- } else {
416- var args = []interface {}{"set" , name }
442+ }
443+ }
444+ go func () {
445+ var pending int
446+ for gid := range groups {
447+ var args = []interface {}{"set" , s .NodeName (gid )}
417448 if config .ParallelSyncs != 0 {
418449 args = append (args , "parallel-syncs" , config .ParallelSyncs )
419450 }
@@ -432,10 +463,20 @@ func (s *Sentinel) monitorGroupsCommand(client *Client, sentniel string, config
432463 if config .ClientReconfigScript != "" {
433464 args = append (args , "client-reconfig-script" , config .ClientReconfigScript )
434465 }
435- _ , err := client .Do ("SENTINEL" , args ... )
436- if err != nil {
437- return errors .Trace (err )
466+ if len (args ) == 2 {
467+ continue
438468 }
469+ pending ++
470+ client .conn .Send ("SENTINEL" , args ... )
471+ }
472+ if pending != 0 {
473+ client .conn .Flush ()
474+ }
475+ }()
476+ for _ = range groups {
477+ _ , err := client .Receive ()
478+ if err != nil {
479+ return errors .Trace (err )
439480 }
440481 }
441482 return nil
@@ -527,25 +568,44 @@ func (s *Sentinel) MonitorGroups(sentinels []string, timeout time.Duration, conf
527568 return last
528569}
529570
530- func (s * Sentinel ) removeGroupsCommand (client * Client , groups map [int ]bool ) error {
531- for gid := range groups {
532- var name = s .NodeName (gid )
533- if exists , err := s .existsCommand (client , name ); err != nil {
534- return err
535- } else if exists {
536- _ , err := client .Do ("SENTINEL" , "remove" , name )
537- if err != nil {
538- return errors .Trace (err )
571+ func (s * Sentinel ) removeCommand (client * Client , names []string ) error {
572+ exists , err := s .existsCommand (client , names )
573+ if err != nil {
574+ return err
575+ }
576+ go func () {
577+ var pending int
578+ for _ , name := range names {
579+ if ! exists [name ] {
580+ continue
539581 }
582+ pending ++
583+ client .conn .Send ("SENTINEL" , "remove" , name )
584+ }
585+ if pending != 0 {
586+ client .conn .Flush ()
587+ }
588+ }()
589+ for _ , name := range names {
590+ if ! exists [name ] {
591+ continue
592+ }
593+ _ , err := client .Receive ()
594+ if err != nil {
595+ return errors .Trace (err )
540596 }
541597 }
542598 return nil
543599}
544600
545601func (s * Sentinel ) removeGroupsDispatch (ctx context.Context , sentinel string , timeout time.Duration ,
546602 groups map [int ]bool ) error {
603+ var names []string
604+ for gid := range groups {
605+ names = append (names , s .NodeName (gid ))
606+ }
547607 var err = s .dispatch (ctx , sentinel , timeout , func (c * Client ) error {
548- return s .removeGroupsCommand (c , groups )
608+ return s .removeCommand (c , names )
549609 })
550610 if err != nil {
551611 switch errors .Cause (err ) {
@@ -594,15 +654,15 @@ func (s *Sentinel) RemoveGroups(sentinels []string, timeout time.Duration, group
594654
595655func (s * Sentinel ) removeGroupsAllDispatch (ctx context.Context , sentinel string , timeout time.Duration ) error {
596656 var err = s .dispatch (ctx , sentinel , timeout , func (c * Client ) error {
597- m , err := s .mastersCommand (c )
657+ masters , err := s .mastersCommand (c )
598658 if err != nil {
599659 return err
600660 }
601- var groups = make ( map [ int ] bool )
602- for gid := range m {
603- groups [ gid ] = true
661+ var names [] string
662+ for gid := range masters {
663+ names = append ( names , s . NodeName ( gid ))
604664 }
605- return s .removeGroupsCommand (c , groups )
665+ return s .removeCommand (c , names )
606666 })
607667 if err != nil {
608668 switch errors .Cause (err ) {
@@ -659,15 +719,19 @@ func (s *Sentinel) MastersAndSlavesClient(client *Client) (map[string]*SentinelG
659719 if err != nil {
660720 return nil , err
661721 }
662- results := make (map [string ]* SentinelGroup )
663- for _ , master := range masters {
664- var name = master ["name" ]
665- slaves , err := s .slavesCommand (client , name )
666- if err != nil {
667- return nil , err
668- }
722+ var names []string
723+ for gid := range masters {
724+ names = append (names , s .NodeName (gid ))
725+ }
726+ slaves , err := s .slavesCommand (client , names )
727+ if err != nil {
728+ return nil , err
729+ }
730+ results := make (map [string ]* SentinelGroup , len (masters ))
731+ for gid , master := range masters {
732+ var name = s .NodeName (gid )
669733 results [name ] = & SentinelGroup {
670- Master : master , Slaves : slaves ,
734+ Master : master , Slaves : slaves [ name ] ,
671735 }
672736 }
673737 return results , nil
0 commit comments