Skip to content

Commit

Permalink
feat: async framework handling for failures (rudderlabs#5330)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Dec 3, 2024
1 parent da76f4e commit 7029d7d
Show file tree
Hide file tree
Showing 21 changed files with 899 additions and 329 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
. "github.com/onsi/gomega"

bingads_sdk "github.com/rudderlabs/bing-ads-go-sdk/bingads"
mock_bulkservice "github.com/rudderlabs/bing-ads-go-sdk/mocks"
mockbulkservice "github.com/rudderlabs/bing-ads-go-sdk/mocks"
"github.com/rudderlabs/rudder-go-kit/bytesize"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
mocks_oauth "github.com/rudderlabs/rudder-server/mocks/services/oauth"
mocksoauthservice "github.com/rudderlabs/rudder-server/mocks/services/oauth"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
"github.com/rudderlabs/rudder-server/services/oauth"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand Down Expand Up @@ -69,7 +69,7 @@ var _ = Describe("Bing ads Audience", func() {
It("TestBingAdsUploadPartialSuccessCase", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bingAdsService.EXPECT().GetBulkUploadUrl().Return(&bingads_sdk.GetBulkUploadUrlResponse{
Expand Down Expand Up @@ -136,7 +136,7 @@ var _ = Describe("Bing ads Audience", func() {
It("TestBingAdsUploadFailedGetBulkUploadUrl", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bingAdsService.EXPECT().GetBulkUploadUrl().Return(nil, fmt.Errorf("Error in getting bulk upload url"))
Expand Down Expand Up @@ -189,7 +189,7 @@ var _ = Describe("Bing ads Audience", func() {
It("TestBingAdsUploadEmptyGetBulkUploadUrl", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
ClientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &ClientI)
bingAdsService.EXPECT().GetBulkUploadUrl().Return(nil, fmt.Errorf("unable to get bulk upload url, check your credentials"))
Expand Down Expand Up @@ -243,7 +243,7 @@ var _ = Describe("Bing ads Audience", func() {
It("TestBingAdsUploadFailedUploadBulkFile", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bingAdsService.EXPECT().GetBulkUploadUrl().Return(&bingads_sdk.GetBulkUploadUrlResponse{
Expand Down Expand Up @@ -308,7 +308,7 @@ var _ = Describe("Bing ads Audience", func() {
It("TestBingAdsPollSuccessCase", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)

Expand All @@ -331,7 +331,7 @@ var _ = Describe("Bing ads Audience", func() {
It("TestBingAdsPollFailureCase", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)

Expand All @@ -350,7 +350,7 @@ var _ = Describe("Bing ads Audience", func() {
It("TestBingAdsPollPartialFailureCase", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)

Expand All @@ -364,22 +364,22 @@ var _ = Describe("Bing ads Audience", func() {
}

expectedResp := common.PollStatusResponse{
Complete: true,
StatusCode: 200,
HasFailed: true,
FailedJobURLs: "https://dummy.url.com",
Complete: true,
StatusCode: 200,
HasFailed: true,
FailedJobParameters: "https://dummy.url.com",
}
recievedResponse := bulkUploader.Poll(pollInput)

os.Remove(expectedResp.FailedJobURLs)
os.Remove(expectedResp.FailedJobParameters)

Expect(recievedResponse).To(Equal(expectedResp))
})

It("TestBingAdsPollPendingStatusCase", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)

Expand All @@ -398,15 +398,15 @@ var _ = Describe("Bing ads Audience", func() {
}
recievedResponse := bulkUploader.Poll(pollInput)

os.Remove(expectedResp.FailedJobURLs)
os.Remove(expectedResp.FailedJobParameters)

Expect(recievedResponse).To(Equal(expectedResp))
})

It("TestBingAdsPollFailedStatusCase", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)

Expand All @@ -425,15 +425,15 @@ var _ = Describe("Bing ads Audience", func() {
}
recievedResponse := bulkUploader.Poll(pollInput)

os.Remove(expectedResp.FailedJobURLs)
os.Remove(expectedResp.FailedJobParameters)

Expect(recievedResponse).To(Equal(expectedResp))
})

It("TestBingAdsPollSuccessAndFailedStatusCase", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)

Expand All @@ -454,21 +454,21 @@ var _ = Describe("Bing ads Audience", func() {
}

expectedResp := common.PollStatusResponse{
HasFailed: true,
StatusCode: 500,
FailedJobURLs: ",", // empty file
HasFailed: true,
StatusCode: 500,
FailedJobParameters: ",", // empty file
}
recievedResponse := bulkUploader.Poll(pollInput)

os.Remove(expectedResp.FailedJobURLs)
os.Remove(expectedResp.FailedJobParameters)

Expect(recievedResponse).To(Equal(expectedResp))
})

It("TestBingAdsGetUploadStats", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
errorsTemplateFilePath := filepath.Join(currentDir, "testdata/status-check.zip") // Path of the source file
// Create a test server with a custom handler function
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -484,7 +484,7 @@ var _ = Describe("Bing ads Audience", func() {
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)

UploadStatsInput := common.GetUploadStatsInput{
FailedJobURLs: modifiedURL,
FailedJobParameters: modifiedURL,
ImportingList: []*jobsdb.JobT{
{
JobID: 5,
Expand All @@ -500,8 +500,8 @@ var _ = Describe("Bing ads Audience", func() {
expectedResp := common.GetUploadStatsResponse{
StatusCode: 200,
Metadata: common.EventStatMeta{
FailedKeys: []int64{6},
FailedReasons: map[int64]string{
AbortedKeys: []int64{6},
AbortedReasons: map[int64]string{
6: "EmailMustBeHashed",
},
SucceededKeys: []int64{5, 7},
Expand All @@ -518,7 +518,7 @@ var _ = Describe("Bing ads Audience", func() {
zipPath := "testdata/BulkUpload-02-28-2024-c7a38716-4d65-44a7-bf28-8879ab9b1da0-Results.zip"
err := ZipCSVFile(csvPath, zipPath)
Expect(err).To(BeNil())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
errorsTemplateFilePath := filepath.Join(currentDir, "testdata/BulkUpload-02-28-2024-c7a38716-4d65-44a7-bf28-8879ab9b1da0-Results.zip") // Path of the source file
// Create a test server with a custom handler function
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -534,7 +534,7 @@ var _ = Describe("Bing ads Audience", func() {
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)

UploadStatsInput := common.GetUploadStatsInput{
FailedJobURLs: modifiedURL,
FailedJobParameters: modifiedURL,
ImportingList: []*jobsdb.JobT{
{
JobID: 1,
Expand All @@ -544,8 +544,8 @@ var _ = Describe("Bing ads Audience", func() {
expectedResp := common.GetUploadStatsResponse{
StatusCode: 200,
Metadata: common.EventStatMeta{
FailedKeys: []int64{1},
FailedReasons: map[int64]string{
AbortedKeys: []int64{1},
AbortedReasons: map[int64]string{
1: "InvalidCustomerListId",
},
SucceededKeys: []int64{},
Expand All @@ -559,7 +559,7 @@ var _ = Describe("Bing ads Audience", func() {
It("TestNewManagerInternal", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
oauthService := mocks_oauth.NewMockAuthorizer(ctrl)
oauthService := mocksoauthservice.NewMockAuthorizer(ctrl)
oauthService.EXPECT().FetchToken(gomock.Any()).Return(200, &oauth.AuthResponse{
Account: oauth.AccountSecret{
ExpirationDate: "",
Expand Down Expand Up @@ -593,7 +593,7 @@ var _ = Describe("Bing ads Audience", func() {
It("TestBingAdsUploadNoTrackingId", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mock_bulkservice.NewMockBulkServiceI(ctrl)
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bingAdsService.EXPECT().GetBulkUploadUrl().Return(&bingads_sdk.GetBulkUploadUrlResponse{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ func (b *BingAdsBulkUploader) pollSingleImport(requestId string) common.PollStat
}
case "CompletedWithErrors":
return common.PollStatusResponse{
Complete: true,
StatusCode: 200,
HasFailed: true,
FailedJobURLs: uploadStatusResp.ResultFileUrl,
Complete: true,
StatusCode: 200,
HasFailed: true,
FailedJobParameters: uploadStatusResp.ResultFileUrl,
}
case "FileUploaded", "InProgress", "PendingFileUpload":
return common.PollStatusResponse{
Expand Down Expand Up @@ -185,7 +185,7 @@ func (b *BingAdsBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStatus
*/
pollStatusCode = append(pollStatusCode, resp.StatusCode)
completionStatus = append(completionStatus, resp.Complete)
failedJobURLs = append(failedJobURLs, resp.FailedJobURLs)
failedJobURLs = append(failedJobURLs, resp.FailedJobParameters)
cumulativeProgressStatus = cumulativeProgressStatus || resp.InProgress
cumulativeFailureStatus = cumulativeFailureStatus || resp.HasFailed
}
Expand All @@ -195,11 +195,11 @@ func (b *BingAdsBulkUploader) Poll(pollInput common.AsyncPoll) common.PollStatus
cumulativeStatusCode = 200
}
cumulativeResp = common.PollStatusResponse{
Complete: !lo.Contains(completionStatus, false),
InProgress: cumulativeProgressStatus,
StatusCode: cumulativeStatusCode,
HasFailed: cumulativeFailureStatus,
FailedJobURLs: strings.Join(failedJobURLs, commaSeparator), // creating a comma separated string of all the result file urls
Complete: !lo.Contains(completionStatus, false),
InProgress: cumulativeProgressStatus,
StatusCode: cumulativeStatusCode,
HasFailed: cumulativeFailureStatus,
FailedJobParameters: strings.Join(failedJobURLs, commaSeparator), // creating a comma separated string of all the result file urls
}

return cumulativeResp
Expand All @@ -217,16 +217,16 @@ func (b *BingAdsBulkUploader) getUploadStatsOfSingleImport(filePath string) (com
eventStatsResponse := common.GetUploadStatsResponse{
StatusCode: 200,
Metadata: common.EventStatMeta{
FailedKeys: lo.Keys(clientIDErrors),
FailedReasons: getFailedReasons(clientIDErrors),
AbortedKeys: lo.Keys(clientIDErrors),
AbortedReasons: getAbortedReasons(clientIDErrors),
},
}

eventsAbortedStat := b.statsFactory.NewTaggedStat("failed_job_count", stats.CountType, map[string]string{
"module": "batch_router",
"destType": b.destName,
})
eventsAbortedStat.Count(len(eventStatsResponse.Metadata.FailedKeys))
eventsAbortedStat.Count(len(eventStatsResponse.Metadata.AbortedKeys))

eventsSuccessStat := b.statsFactory.NewTaggedStat("success_job_count", stats.CountType, map[string]string{
"module": "batch_router",
Expand All @@ -245,9 +245,9 @@ func (b *BingAdsBulkUploader) GetUploadStats(uploadStatsInput common.GetUploadSt
initialEventList = append(initialEventList, job.JobID)
}
eventStatsResponse := common.GetUploadStatsResponse{}
var failedJobIds []int64
var cumulativeFailedReasons map[int64]string
fileURLs := lo.Reject(strings.Split(uploadStatsInput.FailedJobURLs, commaSeparator), func(url string, _ int) bool {
var abortedJobIDs []int64
var cumulativeAbortedReasons map[int64]string
fileURLs := lo.Reject(strings.Split(uploadStatsInput.FailedJobParameters, commaSeparator), func(url string, _ int) bool {
return url == ""
})
for _, fileURL := range fileURLs {
Expand All @@ -265,15 +265,15 @@ func (b *BingAdsBulkUploader) GetUploadStats(uploadStatsInput common.GetUploadSt
StatusCode: 500,
}
}
cumulativeFailedReasons = lo.Assign(cumulativeFailedReasons, response.Metadata.FailedReasons)
failedJobIds = append(failedJobIds, response.Metadata.FailedKeys...)
cumulativeAbortedReasons = lo.Assign(cumulativeAbortedReasons, response.Metadata.AbortedReasons)
abortedJobIDs = append(abortedJobIDs, response.Metadata.AbortedKeys...)
eventStatsResponse.StatusCode = response.StatusCode
}

eventStatsResponse.Metadata = common.EventStatMeta{
FailedKeys: failedJobIds,
FailedReasons: cumulativeFailedReasons,
SucceededKeys: getSuccessJobIDs(failedJobIds, initialEventList),
AbortedKeys: abortedJobIDs,
AbortedReasons: cumulativeAbortedReasons,
SucceededKeys: getSuccessJobIDs(abortedJobIDs, initialEventList),
}

return eventStatsResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func processPollStatusData(records [][]string) (map[int64]map[string]struct{}, e
// GetUploadStats Related utils

// get the list of unique error messages for a particular jobId.
func getFailedReasons(clientIDErrors map[int64]map[string]struct{}) map[int64]string {
func getAbortedReasons(clientIDErrors map[int64]map[string]struct{}) map[int64]string {
reasons := make(map[int64]string)
for key, errors := range clientIDErrors {
reasons[key] = strings.Join(lo.Keys(errors), commaSeparator)
Expand Down
Loading

0 comments on commit 7029d7d

Please sign in to comment.