@@ -22,6 +22,7 @@ import (
2222const (
2323 stateConnected = 1
2424 stateDataStale = 2
25+ stateLoading = 3
2526)
2627
2728type BackendConn struct {
@@ -137,6 +138,55 @@ func (bc *BackendConn) KeepAlive() bool {
137138 bc , bc .addr , bc .database )
138139 }
139140 }
141+
142+ case stateLoading :
143+ m := & Request {}
144+ m .Multi = []* redis.Resp {
145+ redis .NewBulkBytes ([]byte ("INFO" )),
146+ }
147+ m .Batch = & sync.WaitGroup {}
148+ bc .PushBack (m )
149+
150+ keepAliveCallback <- func () {
151+ m .Batch .Wait ()
152+ var err = func () error {
153+ if err := m .Err ; err != nil {
154+ return err
155+ }
156+ switch resp := m .Resp ; {
157+ case resp == nil :
158+ return ErrRespIsRequired
159+ case resp .IsError ():
160+ return fmt .Errorf ("bad info resp: %s" , resp .Value )
161+ case resp .IsBulkBytes ():
162+ var info = make (map [string ]string )
163+ for _ , line := range strings .Split (string (resp .Value ), "\n " ) {
164+ kv := strings .SplitN (line , ":" , 2 )
165+ if len (kv ) != 2 {
166+ continue
167+ }
168+ if key := strings .TrimSpace (kv [0 ]); key != "" {
169+ info [key ] = strings .TrimSpace (kv [1 ])
170+ }
171+ }
172+ if info ["loading" ] == "1" {
173+ return nil
174+ }
175+ if bc .state .CompareAndSwap (stateLoading , stateConnected ) {
176+ log .Warnf ("backend conn [%p] to %s, db-%d state = Connected (keepalive)" ,
177+ bc , bc .addr , bc .database )
178+ }
179+ return nil
180+ default :
181+ return fmt .Errorf ("bad info resp: should be string, but got %s" , resp .Type )
182+ }
183+ }()
184+ if err != nil && bc .closed .IsFalse () {
185+ log .WarnErrorf (err , "backend conn [%p] to %s, db-%d recover from Loading failed" ,
186+ bc , bc .addr , bc .database )
187+ }
188+ }
189+
140190 }
141191 return true
142192}
@@ -266,6 +316,7 @@ func (bc *BackendConn) run() {
266316}
267317
268318var errMasterDown = []byte ("MASTERDOWN" )
319+ var errLoading = []byte ("LOADING" )
269320
270321func (bc * BackendConn ) loopReader (tasks <- chan * Request , c * redis.Conn , round int ) (err error ) {
271322 defer func () {
@@ -288,6 +339,11 @@ func (bc *BackendConn) loopReader(tasks <-chan *Request, c *redis.Conn, round in
288339 log .Warnf ("backend conn [%p] to %s, db-%d state = DataStale, caused by 'MASTERDOWN'" ,
289340 bc , bc .addr , bc .database )
290341 }
342+ case bytes .HasPrefix (resp .Value , errLoading ):
343+ if bc .state .CompareAndSwap (stateConnected , stateLoading ) {
344+ log .Warnf ("backend conn [%p] to %s, db-%d state = Loading" ,
345+ bc , bc .addr , bc .database )
346+ }
291347 }
292348 }
293349 bc .setResponse (r , resp , nil )
0 commit comments