Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: salesforce v2 refactor #3773

Closed
wants to merge 14 commits into from
2 changes: 2 additions & 0 deletions src/v0/destinations/salesforce/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const DESTINATION = 'Salesforce';
const SALESFORCE_OAUTH_SANDBOX = 'salesforce_oauth_sandbox';
const OAUTH = 'oauth';
const LEGACY = 'legacy';
const SALESFORCE_OAUTH = 'salesforce_oauth';

const mappingConfig = getMappingConfig(ConfigCategory, __dirname);

Expand All @@ -42,5 +43,6 @@ module.exports = {
DESTINATION,
OAUTH,
LEGACY,
SALESFORCE_OAUTH,
SALESFORCE_OAUTH_SANDBOX,
};
16 changes: 10 additions & 6 deletions src/v0/destinations/salesforce/networkHandler.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
const { proxyRequest, prepareProxyRequest } = require('../../../adapters/network');
const { processAxiosResponse } = require('../../../adapters/utils/networkUtils');
const { isHttpStatusSuccess } = require('../../util');
const { LEGACY } = require('./config');
const { salesforceResponseHandler } = require('./utils');

const responseHandler = (responseParams) => {
const { destinationResponse, destType, rudderJobMetadata } = responseParams;
const message = `Request for destination: ${destType} Processed Successfully`;
const { status } = destinationResponse;

salesforceResponseHandler(
destinationResponse,
'during Salesforce Response Handling',
rudderJobMetadata?.destInfo?.authKey,
LEGACY,
);
if (!isHttpStatusSuccess(status) && status >= 400) {
salesforceResponseHandler(
destinationResponse,
'during Salesforce Response Handling',
rudderJobMetadata?.destInfo?.authKey,
LEGACY,
);
}

// else successfully return status as 200, message and original destination response
return {
Expand Down
177 changes: 119 additions & 58 deletions src/v0/destinations/salesforce/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@
ThrottledError,
AbortedError,
OAuthSecretError,
isDefinedAndNotNull,
} = require('@rudderstack/integrations-lib');
const { handleHttpRequest } = require('../../../adapters/network');
const {
isHttpStatusSuccess,
getAuthErrCategoryFromStCode,
isDefinedAndNotNull,
} = require('../../util');
const { getAuthErrCategoryFromStCode } = require('../../util');
const Cache = require('../../util/cache');
const {
ACCESS_TOKEN_CACHE_TTL,
Expand All @@ -18,27 +15,42 @@
DESTINATION,
LEGACY,
OAUTH,
SALESFORCE_OAUTH,
SALESFORCE_OAUTH_SANDBOX,
} = require('./config');

const ACCESS_TOKEN_CACHE = new Cache(ACCESS_TOKEN_CACHE_TTL);

// ref: https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/errorcodes.htm?q=error%20code
/**
* ref: https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/errorcodes.htm
* handles Salesforce application level failures
* @param {*} destResponse
* @param {*} sourceMessage
* @param {*} stage
* @param {String} authKey
*
* @param {*} response is of structure
* [
* {
* "message" : "The requested resource does not exist",
* "errorCode" : "NOT_FOUND"
}
]
* @returns error message
*/
const salesforceResponseHandler = (destResponse, sourceMessage, authKey, authorizationFlow) => {
const { status, response } = destResponse;
const getErrorMessage = (response) => {
if (response && Array.isArray(response) && response[0]?.message?.length > 0) {
return response[0].message;
}
return JSON.stringify(response);
};

// if the response from destination is not a success case build an explicit error
if (!isHttpStatusSuccess(status) && status >= 400) {
const matchErrorCode = (errorCode) =>
response && Array.isArray(response) && response.some((resp) => resp?.errorCode === errorCode);
if (status === 401 && authKey && matchErrorCode('INVALID_SESSION_ID')) {
const handleAuthError = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we can rewrite the function in a simpler way like below

const handleAuthError = (
  errorCode,
  authKey,
  authorizationFlow,
  sourceMessage,
  destResponse,
  status,
) => {
  if (errorCode === 'INVALID_SESSION_ID') {
    let authErrCategory = '';
    if (authorizationFlow === OAUTH) {
      authErrCategory = getAuthErrCategoryFromStCode(status);
    } else {
      ACCESS_TOKEN_CACHE.del(authKey);
    }
    throw new RetryableError(
      `${DESTINATION} Request Failed - due to "INVALID_SESSION_ID", (${authErrCategory}) ${sourceMessage}`,
      500,
      destResponse,
      authErrCategory,
    );
  }

  throw new AbortedError(
    `${DESTINATION} Request Failed: "${status}" due to "${getErrorMessage(destResponse.response)}", (Aborted) ${sourceMessage}`,
    400,
    destResponse,
  );
};

errorCode,
authKey,
authorizationFlow,
sourceMessage,
destResponse,
status,
) => {
// eslint-disable-next-line sonarjs/no-small-switch
switch (errorCode) {
case 'INVALID_SESSION_ID':
if (authorizationFlow === OAUTH) {
throw new RetryableError(
`${DESTINATION} Request Failed - due to "INVALID_SESSION_ID", (Retryable) ${sourceMessage}`,
Expand All @@ -47,59 +59,108 @@
getAuthErrCategoryFromStCode(status),
);
}
// checking for invalid/expired token errors and evicting cache in that case
// rudderJobMetadata contains some destination info which is being used to evict the cache
ACCESS_TOKEN_CACHE.del(authKey);
throw new RetryableError(
`${DESTINATION} Request Failed - due to "INVALID_SESSION_ID", (Retryable) ${sourceMessage}`,
500,
destResponse,
);
} else if (status === 403 && matchErrorCode('REQUEST_LIMIT_EXCEEDED')) {
// If the error code is REQUEST_LIMIT_EXCEEDED, you’ve exceeded API request limits in your org.
throw new ThrottledError(
`${DESTINATION} Request Failed - due to "REQUEST_LIMIT_EXCEEDED", (Throttled) ${sourceMessage}`,
default:
throw new AbortedError(
`${DESTINATION} Request Failed: "${status}" due to "${getErrorMessage(destResponse.response)}", (Aborted) ${sourceMessage}`,
400,
destResponse,
);
} else if (
status === 400 &&
matchErrorCode('CANNOT_INSERT_UPDATE_ACTIVATE_ENTITY') &&
(response?.message?.includes('UNABLE_TO_LOCK_ROW') ||
response?.message?.includes('Too many SOQL queries'))
) {
// handling the error case where the record is locked by another background job
// this is a retryable error
}
};

const handleCommonAbortableError = (destResponse, sourceMessage, status) => {
throw new AbortedError(
`${DESTINATION} Request Failed: "${status}" due to "${getErrorMessage(destResponse.response)}", (Aborted) ${sourceMessage}`,
400,
destResponse,
);
};

/**
* ref: https://developer.salesforce.com/docs/atlas.en-us.api_rest.meta/api_rest/errorcodes.htm
* handles Salesforce application level failures
* @param {*} destResponse
* @param {*} sourceMessage
* @param {*} stage
* @param {String} authKey
*/
const salesforceResponseHandler = (destResponse, sourceMessage, authKey, authorizationFlow) => {
const { status, response } = destResponse;

/**
*
* @param {*} errorCode
* response is of structure
* [
* {
* "message" : "Request limit exceeded",
* "errorCode" : "REQUEST_LIMIT_EXCEEDED"
* }
* ]
* @returns true if errorCode is found in the response
*/
const matchErrorCode = (errorCode) =>
shrouti1507 marked this conversation as resolved.
Show resolved Hide resolved
response && Array.isArray(response) && response.some((resp) => resp?.errorCode === errorCode);

switch (status) {
case 401:
if (authKey && matchErrorCode('INVALID_SESSION_ID')) {
handleAuthError(
'INVALID_SESSION_ID',
authKey,
authorizationFlow,
sourceMessage,
destResponse,
status,
);
}
handleAuthError('DEFAULT', authKey, authorizationFlow, sourceMessage, destResponse, status);
shrouti1507 marked this conversation as resolved.
Show resolved Hide resolved
break;

Check warning on line 124 in src/v0/destinations/salesforce/utils.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/salesforce/utils.js#L124

Added line #L124 was not covered by tests
case 403:
if (matchErrorCode('REQUEST_LIMIT_EXCEEDED')) {
throw new ThrottledError(
`${DESTINATION} Request Failed - due to "REQUEST_LIMIT_EXCEEDED", (Throttled) ${sourceMessage}`,
destResponse,
);
}
shrouti1507 marked this conversation as resolved.
Show resolved Hide resolved
break;

Check warning on line 132 in src/v0/destinations/salesforce/utils.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/salesforce/utils.js#L132

Added line #L132 was not covered by tests

case 400:
if (
matchErrorCode('CANNOT_INSERT_UPDATE_ACTIVATE_ENTITY') &&
(response?.message?.includes('UNABLE_TO_LOCK_ROW') ||
response?.message?.includes('Too many SOQL queries'))

Check warning on line 138 in src/v0/destinations/salesforce/utils.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/salesforce/utils.js#L137-L138

Added lines #L137 - L138 were not covered by tests
) {
throw new RetryableError(

Check warning on line 140 in src/v0/destinations/salesforce/utils.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/salesforce/utils.js#L140

Added line #L140 was not covered by tests
`${DESTINATION} Request Failed - "${response.message}", (Retryable) ${sourceMessage}`,
500,
destResponse,
);
}
handleCommonAbortableError(destResponse, sourceMessage, status);
break;

Check warning on line 147 in src/v0/destinations/salesforce/utils.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/salesforce/utils.js#L147

Added line #L147 was not covered by tests

case 503:
case 500:
throw new RetryableError(
`${DESTINATION} Request Failed - "${response.message}", (Retryable) ${sourceMessage}`,
`${DESTINATION} Request Failed - due to "${getErrorMessage(response)}", (Retryable) ${sourceMessage}`,
500,
destResponse,
);
sanpj2292 marked this conversation as resolved.
Show resolved Hide resolved
} else if (status === 503 || status === 500) {
// The salesforce server is unavailable to handle the request. Typically this occurs if the server is down
// for maintenance or is currently overloaded.
throw new RetryableError(
`${DESTINATION} Request Failed - due to "${
response && Array.isArray(response) && response[0]?.message?.length > 0
? response[0].message
: JSON.stringify(response)
}", (Retryable) ${sourceMessage}`,
500,

default:

Check warning on line 157 in src/v0/destinations/salesforce/utils.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/salesforce/utils.js#L157

Added line #L157 was not covered by tests
// Default case: aborting for all other error codes
throw new AbortedError(

Check warning on line 159 in src/v0/destinations/salesforce/utils.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/salesforce/utils.js#L159

Added line #L159 was not covered by tests
`${DESTINATION} Request Failed: "${status}" due to "${getErrorMessage(response)}", (Aborted) ${sourceMessage}`,
400,
destResponse,
);
}
// check the error message
let errorMessage = '';
if (response && Array.isArray(response)) {
errorMessage = response[0].message;
}
// aborting for all other error codes
throw new AbortedError(
`${DESTINATION} Request Failed: "${status}" due to "${
errorMessage || JSON.stringify(response)
}", (Aborted) ${sourceMessage}`,
400,
destResponse,
);
}
};

Expand Down Expand Up @@ -182,7 +243,7 @@
let authorizationData;
const { Name } = event.destination.DestinationDefinition;
const lowerCaseName = Name?.toLowerCase?.();
if (isDefinedAndNotNull(event?.metadata?.secret) || lowerCaseName === SALESFORCE_OAUTH_SANDBOX) {
if (lowerCaseName === SALESFORCE_OAUTH_SANDBOX || lowerCaseName === SALESFORCE_OAUTH) {
authorizationFlow = OAUTH;
authorizationData = getAccessTokenOauth(event.metadata);
} else {
Expand Down
16 changes: 10 additions & 6 deletions src/v0/destinations/salesforce_oauth/networkHandler.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
const { proxyRequest, prepareProxyRequest } = require('../../../adapters/network');
const { processAxiosResponse } = require('../../../adapters/utils/networkUtils');
const { isHttpStatusSuccess } = require('../../util');
const { OAUTH } = require('../salesforce/config');
const { salesforceResponseHandler } = require('../salesforce/utils');

const responseHandler = (responseParams) => {
const { destinationResponse, destType, rudderJobMetadata } = responseParams;
const message = `Request for destination: ${destType} Processed Successfully`;
const { status } = destinationResponse;

salesforceResponseHandler(
destinationResponse,
'during Salesforce Response Handling',
rudderJobMetadata?.destInfo?.authKey,
OAUTH,
);
if (!isHttpStatusSuccess(status) && status >= 400) {
salesforceResponseHandler(
destinationResponse,
'during Salesforce Response Handling',
rudderJobMetadata?.destInfo?.authKey,
OAUTH,
);
}

// else successfully return status as 200, message and original destination response
return {
Expand Down
Loading
Loading