Skip to content

Commit 47fb96e

Browse files
committed
feat: enable refresh token rotation for salesforce
previously, we were not using the new refresh token if one was provided
1 parent 494e7d2 commit 47fb96e

4 files changed

Lines changed: 92 additions & 18 deletions

File tree

apps/api/src/app/routes/route.middleware.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -361,9 +361,6 @@ export async function getOrgForRequest(
361361
// Refresh event will be fired when renewed access token
362362
// to store it in your storage for next request
363363
try {
364-
if (!refreshToken) {
365-
return;
366-
}
367364
await salesforceOrgsDb.updateAccessToken_UNSAFE({ accessToken, refreshToken, org, userId: user.id });
368365
} catch (ex) {
369366
logger.error({ requestId, ...getExceptionLog(ex) }, '[ORG][REFRESH] Error saving refresh token');
@@ -378,6 +375,25 @@ export async function getOrgForRequest(
378375
}
379376
};
380377

378+
// Re-reads current tokens from DB so concurrent workers that lose the refresh token rotation race
379+
// can retry with the tokens written by the worker that won.
380+
const getFreshTokens = async () => {
381+
try {
382+
const freshOrg = await salesforceOrgsDb.findByUniqueId_UNSAFE(user.id, uniqueId);
383+
if (!freshOrg) {
384+
return null;
385+
}
386+
const [freshAccessToken, freshRefreshToken] = await sfdcEncService.decryptAccessToken({
387+
encryptedAccessToken: freshOrg.accessToken,
388+
userId: user.id,
389+
});
390+
return { accessToken: freshAccessToken, refreshToken: freshRefreshToken };
391+
} catch (ex) {
392+
logger.error({ requestId, ...getExceptionLog(ex) }, '[ORG][REFRESH] Error fetching fresh tokens for race condition check');
393+
return null;
394+
}
395+
};
396+
381397
const jetstreamConn = new ApiConnection(
382398
{
383399
apiRequestAdapter: getApiRequestFactoryFn(fetch),
@@ -392,6 +408,7 @@ export async function getOrgForRequest(
392408
logger,
393409
sfdcClientId: ENV.SFDC_CONSUMER_KEY,
394410
sfdcClientSecret: ENV.SFDC_CONSUMER_SECRET,
411+
getFreshTokens,
395412
},
396413
handleRefresh,
397414
handleConnectionError,
@@ -574,7 +591,6 @@ export function setPermissionPolicy(_req: express.Request, res: express.Response
574591
next();
575592
}
576593

577-
578594
export function setCacheControlForApiRoutes(_req: express.Request, res: express.Response, next: express.NextFunction) {
579595
res.setHeader('Cache-Control', 'no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0');
580596
next();

apps/jetstream-desktop/src/utils/route.utils.ts

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,6 @@ export function initApiConnection(
204204
// Refresh event will be fired when renewed access token
205205
// to store it in your storage for next request
206206
try {
207-
if (!refreshToken) {
208-
return;
209-
}
210207
await updateAccessTokens(org.uniqueId, { accessToken, refreshToken });
211208
} catch (ex) {
212209
logger.error('[ORG][REFRESH] Error saving refresh token', getErrorMessage(ex));
@@ -221,6 +218,26 @@ export function initApiConnection(
221218
}
222219
};
223220

221+
// Re-reads current tokens from the in-memory store so concurrent requests that lose the
222+
// refresh token rotation race can retry with the tokens written by the request that won.
223+
const getFreshTokens = async () => {
224+
try {
225+
const freshOrg = getSalesforceOrgById(org.uniqueId);
226+
if (!freshOrg) {
227+
return null;
228+
}
229+
const plaintext = decryptTokenPortable(freshOrg.accessToken);
230+
const spaceIndex = plaintext.indexOf(' ');
231+
if (spaceIndex === -1) {
232+
return null;
233+
}
234+
return { accessToken: plaintext.slice(0, spaceIndex), refreshToken: plaintext.slice(spaceIndex + 1) };
235+
} catch (ex) {
236+
logger.error('[ORG][REFRESH] Error fetching fresh tokens for race condition check', getErrorMessage(ex));
237+
return null;
238+
}
239+
};
240+
224241
const jetstreamConn = new ApiConnection(
225242
{
226243
apiRequestAdapter: getApiRequestFactoryFn(fetch),
@@ -234,6 +251,7 @@ export function initApiConnection(
234251
logger: logger as any,
235252
enableLogging: false,
236253
sfdcClientId: ENV.DESKTOP_SFDC_CLIENT_ID,
254+
getFreshTokens,
237255
},
238256
handleRefresh,
239257
handleConnectionError,

libs/salesforce-api/src/lib/callout-adapter.ts

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { ERROR_MESSAGES, HTTP } from '@jetstream/shared/constants';
2+
import { getErrorMessageAndStackObj } from '@jetstream/shared/utils';
23
import { parse } from '@jetstreamapp/simple-xml';
34
import isObject from 'lodash/isObject';
45
import { ApiRequestOptions, ApiRequestOutputType, BulkXmlErrorResponse, FetchFn, FetchResponse, Logger, SoapErrorResponse } from './types';
@@ -46,9 +47,14 @@ function parseXml(value: string) {
4647
export function getApiRequestFactoryFn(fetch: FetchFn) {
4748
return (
4849
logger: Logger,
49-
onRefresh?: (accessToken: string) => void,
50-
onConnectionError?: (accessToken: string) => void,
50+
onRefresh?: (accessToken: string, refreshToken?: string) => void,
51+
onConnectionError?: (error: string) => void,
52+
/**
53+
* Enable logging only applies to request/response data
54+
* other logging for refresh flow and logic errors will still be logged
55+
*/
5156
enableLogging?: boolean,
57+
getFreshTokens?: () => Promise<{ accessToken: string; refreshToken: string } | null>,
5258
) => {
5359
const apiRequest = async <Response = unknown>(options: ApiRequestOptions, attemptRefresh = true): Promise<Response> => {
5460
// eslint-disable-next-line prefer-const
@@ -125,18 +131,37 @@ export function getApiRequestFactoryFn(fetch: FetchFn) {
125131
sessionInfo.refreshToken
126132
) {
127133
try {
128-
// if 401 and we have a refresh token, then attempt to refresh the token
129-
const { access_token: newAccessToken } = await exchangeRefreshToken(fetch, sessionInfo);
130-
onRefresh?.(newAccessToken);
134+
logger.debug({ url, method, status: response.status }, '[TOKEN REFRESH] Attempting token refresh');
135+
const { access_token: newAccessToken, refresh_token: newRefreshToken } = await exchangeRefreshToken(fetch, sessionInfo);
136+
logger.debug({ url, method, tokenRotated: !!newRefreshToken }, '[TOKEN REFRESH] Token refresh successful');
137+
onRefresh?.(newAccessToken, newRefreshToken);
131138
// replace token in body
132139
if (typeof options.body === 'string' && options.body.includes(accessToken)) {
133140
// if the response is soap, we need to return the response as is
134141
options.body = options.body.replace(accessToken, newAccessToken);
135142
}
136143

137144
return apiRequest({ ...options, sessionInfo: { ...sessionInfo, accessToken: newAccessToken } }, false);
138-
} catch {
139-
logger.warn('Unable to refresh accessToken');
145+
} catch (ex) {
146+
logger.warn({ url, method, ...getErrorMessageAndStackObj(ex) }, '[TOKEN REFRESH] Unable to refresh accessToken');
147+
148+
// Check if another worker already refreshed (race condition on token rotation).
149+
// If the DB has a different access token, a concurrent request won the race — retry with fresh tokens.
150+
if (getFreshTokens) {
151+
try {
152+
const freshTokens = await getFreshTokens();
153+
if (freshTokens && freshTokens.accessToken !== accessToken) {
154+
logger.info({ url, method }, '[TOKEN REFRESH] Concurrent refresh detected — retrying with tokens from another worker');
155+
return apiRequest({ ...options, sessionInfo: { ...sessionInfo, ...freshTokens } }, false);
156+
}
157+
} catch (freshEx) {
158+
logger.warn(
159+
{ url, method, ...getErrorMessageAndStackObj(freshEx) },
160+
'[TOKEN REFRESH] Failed to retrieve fresh tokens for race condition check',
161+
);
162+
}
163+
}
164+
140165
responseText = ERROR_MESSAGES.SFDC_EXPIRED_TOKEN;
141166
onConnectionError?.(ERROR_MESSAGES.SFDC_EXPIRED_TOKEN);
142167
}
@@ -192,7 +217,10 @@ function handleSalesforceApiError(outputType: ApiRequestOutputType, responseText
192217
return output;
193218
}
194219

195-
function exchangeRefreshToken(fetch: FetchFn, sessionInfo: ApiRequestOptions['sessionInfo']): Promise<{ access_token: string }> {
220+
function exchangeRefreshToken(
221+
fetch: FetchFn,
222+
sessionInfo: ApiRequestOptions['sessionInfo'],
223+
): Promise<{ access_token: string; refresh_token?: string }> {
196224
const searchParams = new URLSearchParams({
197225
grant_type: 'refresh_token',
198226
});
@@ -221,6 +249,6 @@ function exchangeRefreshToken(fetch: FetchFn, sessionInfo: ApiRequestOptions['se
221249
})
222250
.then((response) => response.json())
223251
.then((response) => {
224-
return response as { access_token: string };
252+
return response as { access_token: string; refresh_token?: string };
225253
});
226254
}

libs/salesforce-api/src/lib/connection.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ export interface ApiConnectionOptions {
2323
sfdcClientId?: string;
2424
sfdcClientSecret?: string;
2525
logger: Logger;
26+
/** Re-reads current tokens from the source of truth (e.g. DB) to handle concurrent refresh token rotation across workers */
27+
getFreshTokens?: () => Promise<{ accessToken: string; refreshToken: string } | null>;
2628
}
2729

2830
export class ApiConnection {
@@ -56,12 +58,19 @@ export class ApiConnection {
5658
sfdcClientId,
5759
sfdcClientSecret,
5860
logger,
61+
getFreshTokens,
5962
}: ApiConnectionOptions,
6063
refreshCallback?: (accessToken: string, refreshToken: string) => void,
6164
onConnectionError?: (error: string) => void,
6265
) {
6366
this.logger = logger;
64-
this.apiRequest = apiRequestAdapter(logger, this.handleRefresh.bind(this), this.handleConnectionError.bind(this), enableLogging);
67+
this.apiRequest = apiRequestAdapter(
68+
logger,
69+
this.handleRefresh.bind(this),
70+
this.handleConnectionError.bind(this),
71+
enableLogging,
72+
getFreshTokens,
73+
);
6574
this.refreshCallback = refreshCallback;
6675
this.onConnectionError = onConnectionError;
6776
this.sessionInfo = {
@@ -122,8 +131,11 @@ export class ApiConnection {
122131
this.sessionInfo.userId = userId ?? this.sessionInfo.userId;
123132
}
124133

125-
public handleRefresh(accessToken: string) {
134+
public handleRefresh(accessToken: string, newRefreshToken?: string) {
126135
this.sessionInfo.accessToken = accessToken;
136+
if (newRefreshToken) {
137+
this.sessionInfo.refreshToken = newRefreshToken;
138+
}
127139
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
128140
this.refreshCallback?.(accessToken, this.sessionInfo.refreshToken!);
129141
}

0 commit comments

Comments
 (0)