Skip to content

Commit

Permalink
Improve pipeline job item error message persistance (apache#28466)
Browse files Browse the repository at this point in the history
* Re-impl cleanJobItemErrorMessage

* Update persistJobItemErrorMessage impl to updateJobItemErrorMessage

* Remove assertStartWithGetEstimatedRowsFailure

* Update cleanJobItemErrorMessage impl
  • Loading branch information
sandynz authored Sep 19, 2023
1 parent 6e79deb commit fe5096b
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ public interface GovernanceRepositoryAPI {
*/
void persist(String key, String value);

/**
* Update data.
*
* @param key key of data
* @param value value of data
*/
void update(String key, String value);

/**
* Get sharding items of job.
*
Expand Down Expand Up @@ -218,12 +226,4 @@ public interface GovernanceRepositoryAPI {
* @return error msg
*/
String getJobItemErrorMessage(String jobId, int shardingItem);

/**
* Clean job item error message.
*
* @param jobId job id
* @param shardingItem sharding item
*/
void cleanJobItemErrorMessage(String jobId, int shardingItem);
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ public void persist(final String key, final String value) {
repository.persist(key, value);
}

@Override
public void update(final String key, final String value) {
repository.update(key, value);
}

@Override
public List<Integer> getShardingItems(final String jobId) {
List<String> result = getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
Expand Down Expand Up @@ -185,9 +190,4 @@ public void persistMetaDataProcessConfiguration(final JobType jobType, final Str
public String getJobItemErrorMessage(final String jobId, final int shardingItem) {
return repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem));
}

@Override
public void cleanJobItemErrorMessage(final String jobId, final int shardingItem) {
repository.delete(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected void prepare(final PipelineJobItemContext jobItemContext) {
protected void processFailed(final PipelineJobItemContext jobItemContext, final Exception ex) {
String jobId = jobItemContext.getJobId();
log.error("job prepare failed, {}-{}", jobId, jobItemContext.getShardingItem(), ex);
jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
jobAPI.updateJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
jobAPI.stop(jobId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,13 @@ public interface PipelineJobAPI extends TypedSPI {
String getJobItemErrorMessage(String jobId, int shardingItem);

/**
* Persist job item error message.
* Update job item error message.
*
* @param jobId job id
* @param shardingItem sharding item
* @param error error
*/
void persistJobItemErrorMessage(String jobId, int shardingItem, Object error);
void updateJobItemErrorMessage(String jobId, int shardingItem, Object error);

/**
* Clean job item error message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,18 @@ public String getJobItemErrorMessage(final String jobId, final int shardingItem)
}

@Override
public void persistJobItemErrorMessage(final String jobId, final int shardingItem, final Object error) {
public void updateJobItemErrorMessage(final String jobId, final int shardingItem, final Object error) {
String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem);
String value = "";
if (null != error) {
value = error instanceof Throwable ? ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
}
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key, value);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key, value);
}

@Override
public void cleanJobItemErrorMessage(final String jobId, final int shardingItem) {
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).cleanJobItemErrorMessage(jobId, shardingItem);
String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, shardingItem);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ protected void inventorySuccessCallback() {
protected void inventoryFailureCallback(final Throwable throwable) {
log.error("onFailure, inventory task execute failed.", throwable);
String jobId = jobItemContext.getJobId();
jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
jobAPI.updateJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
jobAPI.stop(jobId);
}

Expand Down Expand Up @@ -169,7 +169,7 @@ public void onSuccess() {
public void onFailure(final Throwable throwable) {
log.error("onFailure, incremental task execute failed.", throwable);
String jobId = jobItemContext.getJobId();
jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
jobAPI.updateJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
jobAPI.stop(jobId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private void prepare(final Collection<CDCJobItemContext> jobItemContexts) {
protected void processFailed(final PipelineJobItemContext jobItemContext, final Exception ex) {
String jobId = jobItemContext.getJobId();
log.error("job prepare failed, {}-{}", jobId, jobItemContext.getShardingItem(), ex);
jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
jobAPI.updateJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}
Expand Down Expand Up @@ -197,7 +197,7 @@ public void onSuccess() {
public void onFailure(final Throwable throwable) {
log.error("onFailure, {} task execute failed.", identifier, throwable);
String jobId = jobItemContext.getJobId();
jobAPI.persistJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
jobAPI.updateJobItemErrorMessage(jobId, jobItemContext.getShardingItem(), throwable);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void onFailure(final Throwable throwable) {
return;
}
log.info("onFailure, check job id: {}, parent job id: {}", checkJobId, parentJobId, throwable);
checkJobAPI.persistJobItemErrorMessage(checkJobId, 0, throwable);
checkJobAPI.updateJobItemErrorMessage(checkJobId, 0, throwable);
checkJobAPI.stop(checkJobId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,12 @@

import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.dumper.InventoryDumper;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
Expand All @@ -52,7 +48,6 @@

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;

class InventoryTaskTest {
Expand All @@ -76,19 +71,6 @@ void setUp() {
taskConfig = PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
}

@Test
void assertStartWithGetEstimatedRowsFailure() {
InventoryDumperConfiguration inventoryDumperConfig = createInventoryDumperConfiguration("t_non_exist", "t_non_exist");
AtomicReference<IngestPosition> position = new AtomicReference<>(inventoryDumperConfig.getPosition());
PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(PipelineContextUtils.getPipelineChannelCreator(), 100, position);
PipelineDataSourceWrapper dataSource = DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
PipelineTableMetaDataLoader metaDataLoader = new StandardPipelineTableMetaDataLoader(dataSource);
InventoryTask inventoryTask = new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(inventoryDumperConfig),
PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(),
new InventoryDumper(inventoryDumperConfig, channel, dataSource, metaDataLoader), mock(Importer.class), position);
assertThrows(ExecutionException.class, () -> CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS));
}

@Test
void assertGetProgress() throws SQLException, ExecutionException, InterruptedException, TimeoutException {
initTableData(taskConfig.getDumperConfig());
Expand Down

0 comments on commit fe5096b

Please sign in to comment.