Skip to content

Commit

Permalink
[fix][dingo-executor] Encode error messages uniformly as base64 for e…
Browse files Browse the repository at this point in the history
…asy storage and parsing
  • Loading branch information
guojn1 authored and ketor committed Sep 30, 2024
1 parent fa19c6f commit 0d00dc9
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 23 deletions.
14 changes: 14 additions & 0 deletions dingo-common/src/main/java/io/dingodb/common/ddl/DdlJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -270,6 +271,19 @@ public void addErrorCount(int nu) {
this.errorCount += nu;
}

public void encodeError(String error) {
if (error != null) {
this.error = Base64.getEncoder().encodeToString(error.getBytes());
}
}

public String decodeError() {
if (this.error == null) {
return null;
}
return new String(Base64.getDecoder().decode(this.error));
}

@Override
public String toString() {
return "DdlJob{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import io.dingodb.store.proxy.mapper.Mapper;
import lombok.extern.slf4j.Slf4j;

import java.util.Base64;

@Slf4j
public final class DdlRollBack {

Expand Down Expand Up @@ -64,14 +66,10 @@ public static Pair<Long, String> convertJob2RollbackJob(DdlWorker worker, DdlCon
}
if (error != null) {
if (job.getError() == null) {
job.setError(error);
job.encodeError(error);
}
job.addErrorCount(1);
if ("ErrCancelledDDLJob".equals(error)) {
if (!"ErrCancelledDDLJob".equals(job.getError())) {
job.setError(job.getError());
}
} else {
if (!"ErrCancelledDDLJob".equals(error)) {
if (job.getErrorCount() > DdlUtil.errorCountLimit) {
job.setState(JobState.jobStateCancelled);
//job.setError("[ddl] rollback DDL job error count exceed the limit");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import lombok.extern.slf4j.Slf4j;

import java.sql.SQLException;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
Expand Down Expand Up @@ -337,7 +338,7 @@ public static void waitSchemaChanged(
.ownerCheckAllVersions(job.getId(), latestSchemaVersion, job.mayNeedReorg());
if (error != null) {
if ("Lock wait timeout exceeded".equalsIgnoreCase(error)) {
job.setError(error);
job.encodeError(error);
ddlWorker.updateDDLJob(job, false);
}
LogUtils.error(log, "[ddl] wait latest schema version encounter error, latest version:{}, jobId:{}" , latestSchemaVersion, job.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public Pair<Long, String> runDdlJob(DdlContext dc, DdlJob job) {
}

private static String countForError(DdlJob job, String error) {
job.setError(Base64.getEncoder().encodeToString(error.getBytes()));
job.encodeError(error);
job.setErrorCount(job.getErrorCount() + 1);
if (job.getState() == JobState.jobStateCancelled) {
LogUtils.info(log, "[ddl] DDL job is cancelled normally");
Expand Down Expand Up @@ -399,9 +399,12 @@ public static Pair<Long, String> onTruncateTable(DdlContext dc, DdlJob job) {
job.setSchemaState(SchemaState.SCHEMA_GLOBAL_TXN_ONLY);
return updateSchemaVersion(dc, job);
}
if ("Lock wait timeout exceeded".equalsIgnoreCase(job.getError())) {
job.setState(JobState.jobStateCancelled);
return Pair.of(0L, job.getError());
if (job.getError() != null) {
String error = job.decodeError();
if ("Lock wait timeout exceeded".equalsIgnoreCase(error)) {
job.setState(JobState.jobStateCancelled);
return Pair.of(0L, error);
}
}
try {
ms.truncateTable(job.getTableName(), newTableId);
Expand All @@ -426,16 +429,18 @@ public static Pair<Long, String> onDropTable(DdlContext dc, DdlJob job) {
if (tableInfo == null) {
return Pair.of(0L, "table not exists");
}
if ("Lock wait timeout exceeded".equalsIgnoreCase(job.getError())
&& tableInfo.getTableDefinition().getSchemaState() != SCHEMA_PUBLIC) {
tableInfo.getTableDefinition().setSchemaState(SCHEMA_PUBLIC);
ActionType originType = job.getActionType();
job.setActionType(ActionType.ActionCreateTable);
job.setState(JobState.jobStateCancelling);
Pair<Long, String> res = TableUtil.updateVersionAndTableInfos(dc, job, tableInfo, true);
job.setActionType(originType);
DdlContext.INSTANCE.getSchemaSyncer().ownerUpdateExpVersion(res.getKey());
return res;
if (job.getError() != null) {
if ("Lock wait timeout exceeded".equalsIgnoreCase(job.decodeError())
&& tableInfo.getTableDefinition().getSchemaState() != SCHEMA_PUBLIC) {
tableInfo.getTableDefinition().setSchemaState(SCHEMA_PUBLIC);
ActionType originType = job.getActionType();
job.setActionType(ActionType.ActionCreateTable);
job.setState(JobState.jobStateCancelling);
Pair<Long, String> res = TableUtil.updateVersionAndTableInfos(dc, job, tableInfo, true);
job.setActionType(originType);
DdlContext.INSTANCE.getSchemaSyncer().ownerUpdateExpVersion(res.getKey());
return res;
}
}
SchemaState originalState = job.getSchemaState();
Pair<Long, String> res;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ public static Pair<Boolean, String> historyJob(long jobId) {
return Pair.of(true, null);
}
if (ddlJob.getError() != null) {
String error = new String(Base64.getDecoder().decode(ddlJob.getError()));
String error = ddlJob.decodeError();
return Pair.of(false, error);
}
return Pair.of(false, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public String ownerCheckAllVersions(long jobId, long latestVer, boolean reorg) {
if (!updateMap.isEmpty()) {
synced = false;
updateMap.keySet().forEach(str -> {
LogUtils.info(log, "[ddl] syncer check all versions, someone is not synced, instance:{}", str);
LogUtils.debug(log, "[ddl] syncer check all versions, someone is not synced, instance:{}", str);
});
}
} else {
Expand Down

0 comments on commit 0d00dc9

Please sign in to comment.