11using System ;
22using System . Diagnostics ;
3- using System . Text ;
43using System . Threading ;
5- using System . Threading . Tasks ;
64using ServiceStack . Logging ;
75using ServiceStack . Text ;
86
@@ -34,6 +32,7 @@ public class RedisPubSubServer : IRedisPubSubServer
3432
3533 public Action < string > OnControlCommand { get ; set ; }
3634 public Action < string > OnUnSubscribe { get ; set ; }
35+ public Action < string > OnEvent { get ; set ; }
3736 public Action < Exception > OnError { get ; set ; }
3837 public Action < IRedisPubSubServer > OnFailover { get ; set ; }
3938 public bool IsSentinelSubscription { get ; set ; }
@@ -97,6 +96,8 @@ public IRedisPubSubServer Start()
9796 //Only 1 thread allowed past
9897 if ( Interlocked . CompareExchange ( ref status , Status . Starting , Status . Stopped ) == Status . Stopped ) //Should only be 1 thread past this point
9998 {
99+ OnEvent ? . Invoke ( $ "[{ DateTime . UtcNow . TimeOfDay : g} Stopped] Start()> Stopped -> Starting") ;
100+
100101 var initErrors = 0 ;
101102 bool hasInit = false ;
102103 while ( ! hasInit )
@@ -108,6 +109,7 @@ public IRedisPubSubServer Start()
108109 }
109110 catch ( Exception ex )
110111 {
112+ OnEvent ? . Invoke ( $ "[{ DateTime . UtcNow . TimeOfDay : g} { GetStatus ( ) } ] Start().Init()> Exception: { ex . Message } ") ;
111113 OnError ? . Invoke ( ex ) ;
112114 SleepBackOffMultiplier ( initErrors ++ ) ;
113115 }
@@ -188,6 +190,8 @@ void SendHeartbeat(object state)
188190 if ( DateTime . UtcNow - new DateTime ( lastHeartbeatTicks ) > HeartbeatTimeout )
189191 {
190192 currentStatus = Interlocked . CompareExchange ( ref status , 0 , 0 ) ;
193+
194+ OnEvent ? . Invoke ( $ "[{ DateTime . UtcNow . TimeOfDay : g} { Status . GetStatus ( currentStatus ) } ] SendHeartbeat()> Exceeded HeartbeatTimeout") ;
191195 if ( currentStatus == Status . Started )
192196 {
193197 Restart ( ) ;
@@ -210,7 +214,7 @@ private void DisposeHeartbeatTimer()
210214 try
211215 {
212216 if ( Log . IsDebugEnabled )
213- Log . DebugFormat ( "RedisPubServer.DisposeHeartbeatTimer()" ) ;
217+ Log . Debug ( "RedisPubServer.DisposeHeartbeatTimer()" ) ;
214218
215219 heartbeatTimer . Dispose ( ) ;
216220 }
@@ -227,6 +231,8 @@ private void RunLoop()
227231 if ( Interlocked . CompareExchange ( ref status , Status . Started , Status . Starting ) != Status . Starting ) return ;
228232 Interlocked . Increment ( ref timesStarted ) ;
229233
234+ OnEvent ? . Invoke ( $ "[{ DateTime . UtcNow . TimeOfDay : g} Started] RunLoop().Stop> Starting -> Started, timesStarted: { timesStarted } ") ;
235+
230236 try
231237 {
232238 //RESET
@@ -280,22 +286,31 @@ bool IsCtrlMessage(byte[] msg)
280286 if ( Log . IsDebugEnabled )
281287 Log . Debug ( "Stop Command Issued" ) ;
282288
289+ var holdStatus = GetStatus ( ) ;
290+
283291 Interlocked . CompareExchange ( ref status , Status . Stopping , Status . Started ) ;
292+
293+ OnEvent ? . Invoke ( $ "[{ DateTime . UtcNow . TimeOfDay : g} { holdStatus } ] RunLoop().Stop> Started -> Stopping") ;
284294 try
285295 {
286296 if ( Log . IsDebugEnabled )
287297 Log . Debug ( "UnSubscribe From All Channels..." ) ;
288298
299+ OnEvent ? . Invoke ( $ "[{ DateTime . UtcNow . TimeOfDay : g} { GetStatus ( ) } ] RunLoop().Stop> subscription.UnSubscribeFromAllChannels()") ;
300+
289301 // ReSharper disable once AccessToDisposedClosure
290302 subscription . UnSubscribeFromAllChannels ( ) ; //Un block thread.
291303 }
292304 finally
293305 {
306+ OnEvent ? . Invoke ( $ "[{ DateTime . UtcNow . TimeOfDay : g} { GetStatus ( ) } ] RunLoop().Stop> Stopping -> Stopped") ;
294307 Interlocked . CompareExchange ( ref status , Status . Stopped , Status . Stopping ) ;
295308 }
296309 return ;
297310
298311 case Operation . Reset :
312+ OnEvent ? . Invoke ( $ "[{ DateTime . UtcNow . TimeOfDay : g} { GetStatus ( ) } ] RunLoop().Reset> subscription.UnSubscribeFromAllChannels()") ;
313+
299314 // ReSharper disable once AccessToDisposedClosure
300315 subscription . UnSubscribeFromAllChannels ( ) ; //Un block thread.
301316 return ;
@@ -330,10 +345,14 @@ bool IsCtrlMessage(byte[] msg)
330345 lastExMsg = ex . Message ;
331346 Interlocked . Increment ( ref noOfErrors ) ;
332347 Interlocked . Increment ( ref noOfContinuousErrors ) ;
348+
349+ var holdStatus = GetStatus ( ) ;
333350
334351 if ( Interlocked . CompareExchange ( ref status , Status . Stopped , Status . Started ) != Status . Started )
335352 Interlocked . CompareExchange ( ref status , Status . Stopped , Status . Stopping ) ;
336353
354+ OnEvent ? . Invoke ( $ "[{ DateTime . UtcNow . TimeOfDay : g} { holdStatus } ] RunLoop().Stop> Started|Stopping -> Stopped") ;
355+
337356 OnStop ? . Invoke ( ) ;
338357
339358 OnError ? . Invoke ( ex ) ;
@@ -343,6 +362,8 @@ bool IsCtrlMessage(byte[] msg)
343362 {
344363 if ( WaitBeforeNextRestart != null )
345364 TaskUtils . Sleep ( WaitBeforeNextRestart . Value ) ;
365+
366+ OnEvent ? . Invoke ( $ "[{ DateTime . UtcNow . TimeOfDay : g} { GetStatus ( ) } ] RunLoop().AutoRestart> Start()") ;
346367 Start ( ) ;
347368 }
348369 }
@@ -361,6 +382,8 @@ private void Stop(bool shouldRestart)
361382
362383 if ( Interlocked . CompareExchange ( ref status , Status . Stopping , Status . Started ) == Status . Started )
363384 {
385+ OnEvent ? . Invoke ( $ "[{ DateTime . UtcNow . TimeOfDay : g} { GetStatus ( ) } ] Stop()> Started -> Stopping") ;
386+
364387 if ( Log . IsDebugEnabled )
365388 Log . Debug ( "Stopping RedisPubSubServer..." ) ;
366389
@@ -392,7 +415,7 @@ private void NotifyAllSubscribers(string commandType=null)
392415 catch ( Exception ex )
393416 {
394417 OnError ? . Invoke ( ex ) ;
395- Log . Warn ( "Could not send '{0}' message to bg thread: {1}" . Fmt ( msg , ex . Message ) ) ;
418+ Log . WarnFormat ( "Could not send '{0}' message to bg thread: {1}" , msg , ex . Message ) ;
396419 }
397420 }
398421
@@ -447,10 +470,12 @@ private void KillBgThreadIfExists()
447470 {
448471#if ! NETSTANDARD2_0
449472 //Ideally we shouldn't get here, but lets try our hardest to clean it up
473+ OnEvent ? . Invoke ( $ "[{ DateTime . UtcNow . TimeOfDay : g} { GetStatus ( ) } ] KillBgThreadIfExists()> bgThread.Interrupt()") ;
450474 Log . Warn ( "Interrupting previous Background Thread: " + bgThread . Name ) ;
451475 bgThread . Interrupt ( ) ;
452476 if ( ! bgThread . Join ( TimeSpan . FromSeconds ( 3 ) ) )
453477 {
478+ OnEvent ? . Invoke ( $ "[{ DateTime . UtcNow . TimeOfDay : g} { GetStatus ( ) } ] KillBgThreadIfExists()> bgThread.Abort()") ;
454479 Log . Warn ( bgThread . Name + " just wont die, so we're now aborting it..." ) ;
455480 bgThread . Abort ( ) ;
456481 }
@@ -471,7 +496,7 @@ private void SleepBackOffMultiplier(int continuousErrorsCount)
471496 maxSleepMs ) ;
472497
473498 if ( Log . IsDebugEnabled )
474- Log . Debug ( "Sleeping for {0}ms after {1} continuous errors" . Fmt ( nextTry , continuousErrorsCount ) ) ;
499+ Log . DebugFormat ( "Sleeping for {0}ms after {1} continuous errors" , nextTry , continuousErrorsCount ) ;
475500
476501 TaskUtils . Sleep ( nextTry ) ;
477502 }
@@ -514,26 +539,22 @@ class Status //dep-free copy of WorkerStatus
514539 public const int Stopping = 1 ;
515540 public const int Starting = 2 ;
516541 public const int Started = 3 ;
517- }
518542
519- public string GetStatus ( )
520- {
521- switch ( Interlocked . CompareExchange ( ref status , 0 , 0 ) )
543+ public static string GetStatus ( int status )
522544 {
523- case Status . Disposed :
524- return "Disposed" ;
525- case Status . Stopped :
526- return "Stopped" ;
527- case Status . Stopping :
528- return "Stopping" ;
529- case Status . Starting :
530- return "Starting" ;
531- case Status . Started :
532- return "Started" ;
545+ return status switch {
546+ Disposed => nameof ( Disposed ) ,
547+ Stopped => nameof ( Stopped ) ,
548+ Stopping => nameof ( Stopping ) ,
549+ Starting => nameof ( Starting ) ,
550+ Started => nameof ( Started ) ,
551+ _ => throw new NotSupportedException ( "Unknown status: " + status )
552+ } ;
533553 }
534- return null ;
535554 }
536555
556+ public string GetStatus ( ) => Status . GetStatus ( Interlocked . CompareExchange ( ref status , 0 , 0 ) ) ;
557+
537558 public string GetStatsDescription ( )
538559 {
539560 var sb = StringBuilderCache . Allocate ( ) ;
@@ -555,11 +576,17 @@ public virtual void Dispose()
555576 if ( Log . IsDebugEnabled )
556577 Log . Debug ( "RedisPubServer.Dispose()..." ) ;
557578
579+ OnEvent ? . Invoke ( $ "[{ DateTime . UtcNow . TimeOfDay : g} { GetStatus ( ) } ] Dispose()>") ;
580+
558581 Stop ( ) ;
559582
583+ var holdStatus = GetStatus ( ) ;
584+
560585 if ( Interlocked . CompareExchange ( ref status , Status . Disposed , Status . Stopped ) != Status . Stopped )
561586 Interlocked . CompareExchange ( ref status , Status . Disposed , Status . Stopping ) ;
562587
588+ OnEvent ? . Invoke ( $ "[{ DateTime . UtcNow . TimeOfDay : g} { holdStatus } ] Dispose()> -> Disposed") ;
589+
563590 try
564591 {
565592 OnDispose ? . Invoke ( ) ;
0 commit comments