Skip to content

Commit

Permalink
refactor: error reporting to drive category from stats (rudderlabs#5327)
Browse files Browse the repository at this point in the history
* refactor: error reporting to drive category fron stats

propagate statTags from transformer response to reporting.

* fix: pr comments
  • Loading branch information
koladilip authored Dec 3, 2024
1 parent e4d2abf commit da76f4e
Show file tree
Hide file tree
Showing 16 changed files with 281 additions and 160 deletions.
133 changes: 88 additions & 45 deletions enterprise/reporting/error_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,63 +96,88 @@ func checkForGoMapOrList(value interface{}) bool {
return false
}

func (ext *ExtractorHandle) getSimpleMessage(jsonStr string) string {
if !IsJSON(jsonStr) {
return jsonStr
func (ext *ExtractorHandle) getSimpleMessage(sampleResponse string) string {
if !IsJSON(sampleResponse) {
return sampleResponse
}

var jsonMap map[string]interface{}
er := json.Unmarshal([]byte(jsonStr), &jsonMap)
er := json.Unmarshal([]byte(sampleResponse), &jsonMap)
if er != nil {
ext.log.Debugf("%v is not a unmarshallable into interface{}", jsonStr)
return jsonStr
ext.log.Debugn("sampleResponse is not a unmarshallable into interface{}", logger.NewStringField("sampleResponse", sampleResponse))
return sampleResponse
}

for key, erRes := range jsonMap {
erResStr, isString := erRes.(string)
if !isString {
ext.log.Debugf("Type-assertion failed for %v with value %v: not a string", key, erRes)
if result := ext.handleKey(key, erRes); result != "" {
return result
}
}
return ""
}

func (ext *ExtractorHandle) handleKey(key string, value interface{}) string {
switch key {
case "reason", "Error", responseKey, errorKey:
valueStr, ok := value.(string)
if !ok {
ext.log.Debugn("Handling key", logger.NewStringField("key", key), logger.NewField("value", value))
return ""
}

switch key {
case "reason":
return erResStr
return valueStr
case "Error":
if !IsJSON(erResStr) {
return strings.Split(erResStr, "\n")[0]
}
return ""
return handleError(valueStr)
case responseKey, errorKey:
if IsJSON(erResStr) {
var unmarshalledJson interface{}
unmarshalledErr := json.Unmarshal([]byte(erResStr), &unmarshalledJson)
if unmarshalledErr != nil {
return erResStr
}
return getErrorMessageFromResponse(unmarshalledJson, ext.ErrorMessageKeys)
}
lowerErResStr := strings.ToLower(erResStr)
if strings.Contains(lowerErResStr, "<body") && strings.Contains(lowerErResStr, "</body>") {
return getHTMLErrorMessage(erResStr)
}

if len(erResStr) == 0 {
return ""
}
return erResStr
// Warehouse related errors
case "internal_processing_failed", "fetching_remote_schema_failed", "exporting_data_failed":
valAsMap, isMap := erRes.(map[string]interface{})
if !isMap {
ext.log.Debugf("Failed while type asserting to map[string]interface{} warehouse error with whKey:%s", key)
return ""
}
return getErrorFromWarehouse(valAsMap)
return ext.handleResponseOrErrorKey(valueStr)
}

case "internal_processing_failed", "fetching_remote_schema_failed", "exporting_data_failed":
// Allow handleWarehouseError to process the value, regardless of its type
return ext.handleWarehouseError(value, key)
}

return ""
}

func handleError(valueStr string) string {
if !IsJSON(valueStr) {
firstLine := strings.Split(valueStr, "\n")[0]
return firstLine
}
return ""
}

func (ext *ExtractorHandle) handleResponseOrErrorKey(valueStr string) string {
if IsJSON(valueStr) {
var unmarshalledJSON interface{}
if err := json.Unmarshal([]byte(valueStr), &unmarshalledJSON); err != nil {
return valueStr
}
result := getErrorMessageFromResponse(unmarshalledJSON, ext.ErrorMessageKeys)
return result
}

lowerStr := strings.ToLower(valueStr)
if strings.Contains(lowerStr, "<body") && strings.Contains(lowerStr, "</body>") {
result := getHTMLErrorMessage(valueStr)
return result
}

return valueStr
}

func (ext *ExtractorHandle) handleWarehouseError(value interface{}, key string) string {
valAsMap, isMap := value.(map[string]interface{})
if !isMap {
ext.log.Debugn("Failed type assertion to map[string]interface{} for warehouse error key", logger.NewStringField("key", key), logger.NewField("value", value))
return ""
}
return getErrorFromWarehouse(valAsMap)
}

func getHTMLErrorMessage(erResStr string) string {
return html2text.HTML2Text(erResStr)
}
Expand Down Expand Up @@ -302,10 +327,21 @@ func (ext *ExtractorHandle) CleanUpErrorMessage(errMsg string) string {
return regexdMsg
}

func (ext *ExtractorHandle) GetErrorCode(errorMessage string) string {
// version deprecation logic
func getErrorCodeFromStatTags(statTags map[string]string) string {
var errorCodeParts []string
if len(statTags) > 0 {
if errorCategory, ok := statTags["errorCategory"]; ok {
errorCodeParts = append(errorCodeParts, errorCategory)
}
if errorType, ok := statTags["errorType"]; ok {
errorCodeParts = append(errorCodeParts, errorType)
}
}
return strings.Join(errorCodeParts, ":")
}

func (ext *ExtractorHandle) isVersionDeprecationError(errorMessage string) bool {
var score int
var errorCode string

errorMessage = strings.ToLower(errorMessage)
for keyword, s := range lowercasedDeprecationKeywords {
Expand All @@ -314,8 +350,15 @@ func (ext *ExtractorHandle) GetErrorCode(errorMessage string) string {
}
}

if score > ext.versionDeprecationThresholdScore.Load() {
errorCode = "deprecation"
return score > ext.versionDeprecationThresholdScore.Load()
}

func (ext *ExtractorHandle) GetErrorCode(errorMessage string, statTags map[string]string) string {
if errorCode := getErrorCodeFromStatTags(statTags); errorCode != "" {
return errorCode
}
return errorCode
if ext.isVersionDeprecationError(errorMessage) {
return "deprecation"
}
return ""
}
8 changes: 4 additions & 4 deletions enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR
edr.log.Debugn("DestinationId & DestDetail details", obskit.DestinationID(metric.ConnectionDetails.DestinationID), logger.NewField("destinationDetail", destinationDetail))

// extract error-message & error-code
errDets := edr.extractErrorDetails(metric.StatusDetail.SampleResponse)
errDets := edr.extractErrorDetails(metric.StatusDetail.SampleResponse, metric.StatusDetail.StatTags)

edr.stats.NewTaggedStat("error_detail_reporting_failures", stats.CountType, stats.Tags{
"errorCode": errDets.ErrorCode,
Expand All @@ -250,7 +250,7 @@ func (edr *ErrorDetailReporter) Report(ctx context.Context, metrics []*types.PUR
workspaceID,
edr.namespace,
edr.instanceID,
metric.ConnectionDetails.SourceDefinitionId,
metric.ConnectionDetails.SourceDefinitionID,
metric.ConnectionDetails.SourceID,
destinationDetail.destinationDefinitionID,
metric.ConnectionDetails.DestinationID,
Expand Down Expand Up @@ -309,10 +309,10 @@ func (edr *ErrorDetailReporter) migrate(c types.SyncerConfig) (*sql.DB, error) {
return dbHandle, nil
}

func (edr *ErrorDetailReporter) extractErrorDetails(sampleResponse string) errorDetails {
func (edr *ErrorDetailReporter) extractErrorDetails(sampleResponse string, statTags map[string]string) errorDetails {
errMsg := edr.errorDetailExtractor.GetErrorMessage(sampleResponse)
cleanedErrMsg := edr.errorDetailExtractor.CleanUpErrorMessage(errMsg)
errorCode := edr.errorDetailExtractor.GetErrorCode(cleanedErrMsg)
errorCode := edr.errorDetailExtractor.GetErrorCode(cleanedErrMsg, statTags)
return errorDetails{
ErrorMessage: cleanedErrMsg,
ErrorCode: errorCode,
Expand Down
12 changes: 6 additions & 6 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ func (r *DefaultReporter) getReports(currentMs int64, syncerKey string) (reports
metricReport := types.ReportByStatus{StatusDetail: &types.StatusDetail{}}
err = rows.Scan(
&metricReport.InstanceDetails.WorkspaceID, &metricReport.InstanceDetails.Namespace, &metricReport.InstanceDetails.InstanceID,
&metricReport.ConnectionDetails.SourceDefinitionId,
&metricReport.ConnectionDetails.SourceDefinitionID,
&metricReport.ConnectionDetails.SourceCategory,
&metricReport.ConnectionDetails.SourceID,
&metricReport.ConnectionDetails.DestinationDefinitionId,
&metricReport.ConnectionDetails.DestinationDefinitionID,
&metricReport.ConnectionDetails.DestinationID,
&metricReport.ConnectionDetails.SourceTaskRunID,
&metricReport.ConnectionDetails.SourceJobID,
Expand Down Expand Up @@ -299,10 +299,10 @@ func (*DefaultReporter) getAggregatedReports(reports []*types.ReportByStatus) []
InstanceID: report.InstanceID,
},
ConnectionDetails: types.ConnectionDetails{
SourceDefinitionId: report.SourceDefinitionId,
SourceDefinitionID: report.SourceDefinitionID,
SourceCategory: report.SourceCategory,
SourceID: report.SourceID,
DestinationDefinitionId: report.DestinationDefinitionId,
DestinationDefinitionID: report.DestinationDefinitionID,
DestinationID: report.DestinationID,
SourceTaskRunID: report.SourceTaskRunID,
SourceJobID: report.SourceJobID,
Expand Down Expand Up @@ -665,10 +665,10 @@ func (r *DefaultReporter) Report(ctx context.Context, metrics []*types.PUReporte

_, err = stmt.Exec(
workspaceID, r.namespace, r.instanceID,
metric.ConnectionDetails.SourceDefinitionId,
metric.ConnectionDetails.SourceDefinitionID,
metric.ConnectionDetails.SourceCategory,
metric.ConnectionDetails.SourceID,
metric.ConnectionDetails.DestinationDefinitionId,
metric.ConnectionDetails.DestinationDefinitionID,
metric.ConnectionDetails.DestinationID,
metric.ConnectionDetails.SourceTaskRunID,
metric.ConnectionDetails.SourceJobID,
Expand Down
15 changes: 14 additions & 1 deletion enterprise/reporting/reporting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ func TestExtractErrorDetails(t *testing.T) {
caseDescription string
inputErrMsg string
output depTcOutput
statTags map[string]string
}
testCases := []depTc{
{
Expand All @@ -422,12 +423,24 @@ func TestExtractErrorDetails(t *testing.T) {
errorCode: "deprecation",
},
},
{
caseDescription: "should use statTags to compute errorCode",
statTags: map[string]string{
"errorCategory": "dataValidation",
"errorType": "configuration",
},
inputErrMsg: "Some error",
output: depTcOutput{
errorMsg: "Some error",
errorCode: "dataValidation:configuration",
},
},
}

edr := NewErrorDetailReporter(context.Background(), &configSubscriber{}, stats.NOP, config.Default)
for _, tc := range testCases {
t.Run(tc.caseDescription, func(t *testing.T) {
errorDetails := edr.extractErrorDetails(tc.inputErrMsg)
errorDetails := edr.extractErrorDetails(tc.inputErrMsg, tc.statTags)

require.Equal(t, tc.output.errorMsg, errorDetails.ErrorMessage)
require.Equal(t, tc.output.errorCode, errorDetails.ErrorCode)
Expand Down
Loading

0 comments on commit da76f4e

Please sign in to comment.