Skip to content

Commit 104690d

Browse files
committed
fix(polling): retry failed idempotency keys, fix drive cursor overshoot, fix calendar inclusive updatedMin
1 parent e14ff04 commit 104690d

3 files changed

Lines changed: 39 additions & 19 deletions

File tree

apps/sim/lib/core/idempotency/service.ts

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ const logger = createLogger('IdempotencyService')
1313
export interface IdempotencyConfig {
1414
ttlSeconds?: number
1515
namespace?: string
16+
/** When true, failed keys are deleted rather than stored so the operation is retried on the next attempt. */
17+
retryFailures?: boolean
1618
}
1719

1820
export interface IdempotencyResult {
@@ -58,6 +60,7 @@ export class IdempotencyService {
5860
this.config = {
5961
ttlSeconds: config.ttlSeconds ?? DEFAULT_TTL,
6062
namespace: config.namespace ?? 'default',
63+
retryFailures: config.retryFailures ?? false,
6164
}
6265
this.storageMethod = getStorageMethod()
6366
logger.info(`IdempotencyService using ${this.storageMethod} storage`, {
@@ -340,6 +343,21 @@ export class IdempotencyService {
340343
logger.debug(`Stored idempotency result in database: ${normalizedKey}`)
341344
}
342345

346+
private async deleteKey(
347+
normalizedKey: string,
348+
storageMethod: 'redis' | 'database'
349+
): Promise<void> {
350+
if (storageMethod === 'redis') {
351+
const redis = getRedisClient()
352+
if (redis) await redis.del(`${REDIS_KEY_PREFIX}${normalizedKey}`).catch(() => {})
353+
} else {
354+
await db
355+
.delete(idempotencyKey)
356+
.where(eq(idempotencyKey.key, normalizedKey))
357+
.catch(() => {})
358+
}
359+
}
360+
343361
async executeWithIdempotency<T>(
344362
provider: string,
345363
identifier: string,
@@ -360,6 +378,10 @@ export class IdempotencyService {
360378
}
361379

362380
if (existingResult?.status === 'failed') {
381+
if (this.config.retryFailures) {
382+
await this.deleteKey(claimResult.normalizedKey, claimResult.storageMethod)
383+
return this.executeWithIdempotency(provider, identifier, operation, additionalContext)
384+
}
363385
logger.info(`Previous operation failed for: ${claimResult.normalizedKey}`)
364386
throw new Error(existingResult.error || 'Previous operation failed')
365387
}
@@ -391,11 +413,15 @@ export class IdempotencyService {
391413
} catch (error) {
392414
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
393415

394-
await this.storeResult(
395-
claimResult.normalizedKey,
396-
{ success: false, error: errorMessage, status: 'failed' },
397-
claimResult.storageMethod
398-
)
416+
if (this.config.retryFailures) {
417+
await this.deleteKey(claimResult.normalizedKey, claimResult.storageMethod)
418+
} else {
419+
await this.storeResult(
420+
claimResult.normalizedKey,
421+
{ success: false, error: errorMessage, status: 'failed' },
422+
claimResult.storageMethod
423+
)
424+
}
399425

400426
logger.warn(`Operation failed: ${claimResult.normalizedKey} - ${errorMessage}`)
401427
throw error
@@ -454,4 +480,5 @@ export const webhookIdempotency = new IdempotencyService({
454480
export const pollingIdempotency = new IdempotencyService({
455481
namespace: 'polling',
456482
ttlSeconds: 60 * 60 * 24 * 3, // 3 days
483+
retryFailures: true,
457484
})

apps/sim/lib/webhooks/polling/google-calendar.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,9 @@ export const googleCalendarPollingHandler: PollingProviderHandler = {
143143
const newTimestamp =
144144
processedCount === 0 && failedCount > 0
145145
? config.lastCheckedTimestamp
146-
: latestUpdated || now.toISOString()
146+
: latestUpdated
147+
? new Date(new Date(latestUpdated).getTime() + 1).toISOString()
148+
: now.toISOString()
147149
await updateWebhookProviderConfig(webhookId, { lastCheckedTimestamp: newTimestamp }, logger)
148150

149151
if (failedCount > 0 && processedCount === 0) {

apps/sim/lib/webhooks/polling/google-drive.ts

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -255,24 +255,12 @@ async function fetchChanges(
255255
newStartPageToken = data.newStartPageToken as string
256256
}
257257

258-
// Only advance the resume cursor when we'll actually use all changes from this page.
259-
// If allChanges exceeds maxFiles, we'll slice off the extras — so we must NOT
260-
// advance past this page, otherwise the sliced changes are lost permanently.
261258
const hasMore = !!data.nextPageToken
262259
const overLimit = allChanges.length >= maxFiles
263260

264261
if (!hasMore || overLimit || pages >= MAX_PAGES) {
265-
// If we stopped mid-stream and haven't consumed all changes from this page,
266-
// keep currentPageToken so the next poll re-fetches this page.
267-
// If we consumed everything on this page but there are more pages,
268-
// advance to nextPageToken so we don't re-process this page.
269262
if (hasMore && !overLimit) {
270263
lastNextPageToken = data.nextPageToken as string
271-
} else if (hasMore && overLimit && allChanges.length > maxFiles) {
272-
// We got more changes than maxFiles from this page — don't advance,
273-
// re-fetch this page next time (idempotency deduplicates already-processed ones)
274-
} else if (hasMore) {
275-
lastNextPageToken = data.nextPageToken as string
276264
}
277265
break
278266
}
@@ -281,7 +269,10 @@ async function fetchChanges(
281269
currentPageToken = data.nextPageToken as string
282270
}
283271

284-
const resumeToken = newStartPageToken ?? lastNextPageToken ?? config.pageToken!
272+
const slicingOccurs = allChanges.length > maxFiles
273+
const resumeToken = slicingOccurs
274+
? (lastNextPageToken ?? config.pageToken!)
275+
: (newStartPageToken ?? lastNextPageToken ?? config.pageToken!)
285276

286277
return { changes: allChanges.slice(0, maxFiles), newStartPageToken: resumeToken }
287278
}

0 commit comments

Comments
 (0)