Skip to content

Commit

Permalink
KYLO-2818: fix for bad duration in Merge Table processor
Browse files Browse the repository at this point in the history
  • Loading branch information
harschware committed Oct 23, 2018
1 parent 2e38184 commit f42197f
Showing 1 changed file with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
mergeStrategyValue = STRATEGY_DEDUPE_MERGE;
}

logger.info(
"Merge strategy: " + mergeStrategyValue + " Using Source: " + sourceTable + " Target: " + targetTable + " feed partition:" + feedPartitionValue + " partSpec: " + partitionSpecString);
logger.info( "Merge strategy: " + mergeStrategyValue
+ " Using Source: " + sourceTable
+ " Target: " + targetTable
+ " feed partition:" + feedPartitionValue
+ " partSpec: " + partitionSpecString);

final StopWatch stopWatch = new StopWatch(true);

Expand Down Expand Up @@ -347,10 +350,15 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
throw new UnsupportedOperationException("Failed to resolve the merge strategy");
}

stopWatch.stop();
session.getProvenanceReporter().modifyContent(flowFile, "Execution completed", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
flowFile = session.putAttribute(flowFile, PROVENANCE_EXECUTION_STATUS_KEY, "Successful");
release(blockingValue);
logger.info( "Execution completed: " + stopWatch.getElapsed(TimeUnit.MILLISECONDS)
+ " Merge strategy: " + mergeStrategyValue
+ " Using Source: " + sourceTable
+ " Target: " + targetTable
+ " feed partition:" + feedPartitionValue
+ " partSpec: " + partitionSpecString);
session.transfer(flowFile, REL_SUCCESS);

} catch (final Exception e) {
Expand Down

0 comments on commit f42197f

Please sign in to comment.