Skip to content

Commit 6433e1d

Browse files
committed
feat: introduce new utils
1 parent 1605a6e commit 6433e1d

6 files changed

Lines changed: 1414 additions & 3 deletions

File tree

src/servers/server.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ export class ExpressServer {
460460
spec: {
461461
content: await fs.promises.readFile(openApiFilePath, 'utf8')
462462
}
463-
})
463+
} as any)
464464
);
465465
if (this.config.openApi?.verbose) {
466466
getLogger().info(`Mounted OpenAPI docs at ${openApiMountPath}`);
@@ -811,7 +811,7 @@ export class ExpressServer {
811811
*
812812
* @param signals Array of process signals to listen for (default: SIGINT, SIGTERM)
813813
*/
814-
public enableGracefulShutdown(signals: NodeJS.Signals[] = ['SIGINT', 'SIGTERM']): void {
814+
public enableGracefulShutdown(signals: NodeJS.Signals[] = ['SIGINT', 'SIGTERM']): this {
815815
signals.forEach(signal => {
816816
process.on(signal, async () => {
817817
getLogger().info(`Received ${signal}, initiating graceful shutdown...`);
@@ -830,14 +830,16 @@ export class ExpressServer {
830830
}
831831
});
832832
});
833+
return this;
833834
}
834835

835836
/**
836837
* Set an externally created base router.
837838
* This will override the internal rootRouter.
838839
*/
839-
public setBaseRouter(router: Router): void {
840+
public setBaseRouter(router: Router): this {
840841
this.externalRouter = router;
842+
return this;
841843
}
842844

843845
/**

src/utils/async.utils.ts

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
* SOFTWARE.
2323
*/
2424

25+
import { getLogger } from './logger.utils';
26+
2527
/**
2628
* Delays execution for a specified number of milliseconds.
2729
*
@@ -494,3 +496,241 @@ export function rateLimit<T>(
494496
return fn(...args);
495497
};
496498
}
499+
500+
/**
501+
* Circuit breaker pattern implementation for protecting against cascading failures.
502+
* Tracks failures and prevents calling the function when too many failures occur.
503+
*
504+
* @param fn - Function to protect with circuit breaker
505+
* @param options - Circuit breaker options
506+
* @returns Function wrapped with circuit breaker logic
507+
*
508+
* @example
509+
* ```typescript
510+
* const protectedFetch = circuitBreaker(
511+
* async (url) => {
512+
* const response = await fetch(url);
513+
* if (!response.ok) throw new Error(`HTTP error: ${response.status}`);
514+
* return response.json();
515+
* },
516+
* {
517+
* failureThreshold: 3,
518+
* resetTimeout: 30000,
519+
* onOpen: () => console.log('Circuit breaker opened'),
520+
* onClose: () => console.log('Circuit breaker closed')
521+
* }
522+
* );
523+
*
524+
* // Will throw CircuitBreakerOpenError after failureThreshold consecutive failures
525+
* try {
526+
* const data = await protectedFetch('https://api.example.com');
527+
* } catch (error) {
528+
* if (error instanceof CircuitBreakerOpenError) {
529+
* console.log('Service is currently unavailable, please try again later');
530+
* }
531+
* }
532+
* ```
533+
*/
534+
export class CircuitBreakerOpenError extends Error {
535+
constructor(message = 'Circuit breaker is open') {
536+
super(message);
537+
this.name = 'CircuitBreakerOpenError';
538+
}
539+
}
540+
541+
/**
542+
* Circuit breaker states
543+
*/
544+
export enum CircuitBreakerState {
545+
CLOSED = 'CLOSED',
546+
OPEN = 'OPEN',
547+
HALF_OPEN = 'HALF_OPEN'
548+
}
549+
550+
export interface CircuitBreakerOptions {
551+
/** Number of consecutive failures before opening circuit (default: 5) */
552+
failureThreshold?: number;
553+
/** Time in milliseconds to wait before trying again (default: 10000) */
554+
resetTimeout?: number;
555+
/** Number of successful calls to close the circuit again (default: 1) */
556+
successThreshold?: number;
557+
/** Callback when circuit opens */
558+
onOpen?: () => void;
559+
/** Callback when circuit closes */
560+
onClose?: () => void;
561+
/** Callback when circuit enters half-open state */
562+
onHalfOpen?: () => void;
563+
}
564+
565+
export function circuitBreaker<T, Args extends any[]>(
566+
fn: (...args: Args) => Promise<T>,
567+
options: CircuitBreakerOptions = {}
568+
): (...args: Args) => Promise<T> {
569+
const { failureThreshold = 5, resetTimeout = 10000, successThreshold = 1, onOpen, onClose, onHalfOpen } = options;
570+
571+
let state = CircuitBreakerState.CLOSED;
572+
let failureCount = 0;
573+
let successCount = 0;
574+
let nextAttempt = Date.now();
575+
576+
return async function (...args: Args): Promise<T> {
577+
if (state === CircuitBreakerState.OPEN) {
578+
if (Date.now() < nextAttempt) {
579+
throw new CircuitBreakerOpenError();
580+
}
581+
582+
// Move to half-open state
583+
state = CircuitBreakerState.HALF_OPEN;
584+
if (onHalfOpen) onHalfOpen();
585+
}
586+
587+
try {
588+
const result = await fn(...args);
589+
590+
// On success in half-open state
591+
if (state === CircuitBreakerState.HALF_OPEN) {
592+
successCount++;
593+
if (successCount >= successThreshold) {
594+
successCount = 0;
595+
failureCount = 0;
596+
state = CircuitBreakerState.CLOSED;
597+
if (onClose) onClose();
598+
}
599+
} else {
600+
// Reset failure count on success in closed state
601+
failureCount = 0;
602+
}
603+
604+
return result;
605+
} catch (error) {
606+
// Track failures
607+
failureCount++;
608+
609+
// Check if we need to open the circuit
610+
if (
611+
(state === CircuitBreakerState.CLOSED || state === CircuitBreakerState.HALF_OPEN) &&
612+
failureCount >= failureThreshold
613+
) {
614+
state = CircuitBreakerState.OPEN;
615+
nextAttempt = Date.now() + resetTimeout;
616+
if (onOpen) onOpen();
617+
}
618+
619+
throw error;
620+
}
621+
};
622+
}
623+
624+
/**
625+
* Run multiple async tasks with concurrency control.
626+
*
627+
* @param tasks - Array of async tasks to run
628+
* @param options - Concurrency options
629+
* @returns Promise that resolves when all tasks are complete
630+
*
631+
* @example
632+
* ```typescript
633+
* const urls = ['https://example.com/1', 'https://example.com/2', many more ];
634+
*
635+
* // Process up to 5 requests at a time, with progress reporting
636+
* const results = await runWithConcurrency(
637+
* urls.map(url => () => fetch(url).then(res => res.json())),
638+
* {
639+
* concurrency: 5,
640+
* onProgress: (completed, total) => {
641+
* console.log(`Progress: ${completed}/${total}`);
642+
* }
643+
* }
644+
* );
645+
* ```
646+
*/
647+
export async function runWithConcurrency<T>(
648+
tasks: Array<() => Promise<T>>,
649+
options: {
650+
/** Maximum number of tasks to run at once (default: 3) */
651+
concurrency?: number;
652+
/** Called whenever a task completes */
653+
onProgress?: (completed: number, total: number) => void;
654+
/** Abort signal to cancel execution */
655+
signal?: AbortSignal;
656+
} = {}
657+
): Promise<T[]> {
658+
const { concurrency = 3, onProgress, signal } = options;
659+
const results: T[] = [];
660+
const totalTasks = tasks.length;
661+
let completed = 0;
662+
663+
// If no tasks, return empty array
664+
if (totalTasks === 0) return results;
665+
666+
// Check if execution is already aborted
667+
if (signal?.aborted) {
668+
throw new Error('Aborted');
669+
}
670+
671+
return new Promise((resolve, reject) => {
672+
let taskIndex = 0;
673+
674+
// Process next task function
675+
const processNext = async () => {
676+
// Get current task index and increment
677+
const currentTaskIndex = taskIndex++;
678+
679+
// Skip if we've processed all tasks
680+
if (currentTaskIndex >= totalTasks) return;
681+
682+
try {
683+
// Run the task
684+
const result = await tasks[currentTaskIndex]();
685+
686+
// Check if aborted during task execution
687+
if (signal?.aborted) {
688+
reject(new Error('Aborted'));
689+
return;
690+
}
691+
692+
// Store result and update counters
693+
results[currentTaskIndex] = result;
694+
completed++;
695+
696+
// Call progress callback if provided
697+
if (onProgress) {
698+
try {
699+
onProgress(completed, totalTasks);
700+
} catch (err) {
701+
getLogger().error({ err }, 'Error in onProgress callback');
702+
}
703+
}
704+
} catch (error) {
705+
reject(error);
706+
return;
707+
}
708+
709+
// Check if all tasks are completed
710+
if (completed === totalTasks) {
711+
resolve(results);
712+
return;
713+
}
714+
715+
// Process next task
716+
processNext();
717+
};
718+
719+
// Start initial batch of tasks
720+
const initialBatch = Math.min(concurrency, totalTasks);
721+
for (let i = 0; i < initialBatch; i++) {
722+
processNext();
723+
}
724+
725+
// Set up abort handler
726+
if (signal) {
727+
signal.addEventListener(
728+
'abort',
729+
() => {
730+
reject(new Error('Aborted'));
731+
},
732+
{ once: true }
733+
);
734+
}
735+
});
736+
}

0 commit comments

Comments
 (0)