Skip to content

Commit

Permalink
[kuduwriter] Fix data lost without flushing cache wgzhao#294 (wgzhao#296
Browse files Browse the repository at this point in the history
)
  • Loading branch information
wgzhao authored Aug 17, 2021
1 parent 64bc8a7 commit 0a66221
Showing 1 changed file with 40 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,68 +45,57 @@
import java.util.concurrent.ThreadPoolExecutor;

public class KuduWriterTask
extends Writer
extends Writer
{
private static final Logger LOG = LoggerFactory.getLogger(KuduWriterTask.class);
public KuduClient kuduClient;
public KuduSession session;
private List<Configuration> columns;
private List<List<Configuration>> columnLists;
private List<Map<String, Type>> needColumns;
private ThreadPoolExecutor pool;
private String encoding;
private Double batchSize;
private Boolean isUpsert;
private Boolean isSkipFail;
private KuduTable table;
private Integer primaryKeyIndexUntil;
private Configuration configuration;
private final Double batchSize;
List<Configuration> columns;
private final Boolean isUpsert;
private final Boolean isSkipFail;
private final KuduTable table;
// private final Configuration configuration;

public KuduWriterTask(Configuration configuration)
{
this.configuration = configuration;
columns = configuration.getListConfiguration(KuduKey.COLUMN);
columnLists = KuduHelper.getColumnLists(columns);
pool = KuduHelper.createRowAddThreadPool(columnLists.size());
// this.configuration = configuration;
this.columns = configuration.getListConfiguration(KuduKey.COLUMN);

this.encoding = configuration.getString(KuduKey.ENCODING);
this.batchSize = configuration.getDouble(KuduKey.WRITE_BATCH_SIZE);
this.isUpsert = !"insert".equalsIgnoreCase(configuration.getString(KuduKey.WRITE_MODE));
this.isSkipFail = configuration.getBool(KuduKey.SKIP_FAIL);
long mutationBufferSpace = configuration.getLong(KuduKey.MUTATION_BUFFER_SPACE);

this.kuduClient = KuduHelper.getKuduClient(configuration);
this.table = KuduHelper.getKuduTable(this.kuduClient,
configuration.getString(KuduKey.KUDU_TABLE_NAME));
this.table = KuduHelper.getKuduTable(this.kuduClient, configuration.getString(KuduKey.KUDU_TABLE_NAME));

this.session = kuduClient.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
session.setMutationBufferSpace((int) mutationBufferSpace);
// this.primaryKeyIndexUntil = KuduHelper.getPrimaryKeyIndexUntil(columns);
// tableName = configuration.getString(Key.TABLE);
}

public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPluginCollector)
{
LOG.info("kuduwriter begin to write!");
LOG.info("Begin to write");
Record record;
List<Configuration> columns = configuration.getListConfiguration(KuduKey.COLUMN);
int commit = 0;
List<String> columnNames = KuduHelper.getColumnNames(columns);
while ((record = lineReceiver.getFromReader()) != null) {
if (record.getColumnNumber() != columns.size()) {
throw AddaxException.asAddaxException(KuduWriterErrorCode.PARAMETER_NUM_ERROR,
" number of record fields:" + record.getColumnNumber()
+ " number of configuration fields:" + columns.size());
"The number of record fields (" + record.getColumnNumber()
+ ") is different from the number of configuration fields (" + columns.size() + ")");
}
Upsert upsert = table.newUpsert();
Insert insert = table.newInsert();
PartialRow row;
if (isUpsert) {
//覆盖更新
//override update
row = upsert.getRow();
}
else {
//增量更新
//incremental update
row = insert.getRow();
}
for (int i = 0; i < record.getColumnNumber(); i++) {
Expand Down Expand Up @@ -144,12 +133,11 @@ public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPlu
row.addTimestamp(name, new Timestamp(column.asLong()));
break;
case DATE:
row.addDate(name,
(Date) new java.util.Date(column.asDate().getTime()));
row.addDate(name, (Date) new java.util.Date(column.asDate().getTime()));
break;
default:
throw AddaxException.asAddaxException(
KuduWriterErrorCode.ILLEGAL_VALUE, "The data type " + type + "is unsupported"
KuduWriterErrorCode.ILLEGAL_VALUE, "The data type " + type + " is unsupported"
);
}
} // end a row
Expand All @@ -160,17 +148,37 @@ public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPlu
else {
session.apply(insert);
}
commit++;
if (commit % batchSize == 0) {
// flush
session.flush();
}
}
catch (Exception e) {
LOG.error("Record Write Failure!", e);
LOG.error("Failed to write a record: ", e);
if (isSkipFail) {
LOG.warn("Since you have configured \"skipFail\" to be true, this record will be skipped !");
LOG.warn("Since you have configured 'skipFail' to be true, this record will be skipped.");
taskPluginCollector.collectDirtyRecord(record, e.getMessage());
}
else {
throw AddaxException.asAddaxException(KuduWriterErrorCode.PUT_KUDU_ERROR, e.getMessage());
}
}
}

try {
// try to flush last upsert/insert
session.flush();
}
catch (Exception e) {
LOG.error("Failed to write a record: ", e);
if (isSkipFail) {
LOG.warn("Since you have configured 'skipFail' to be true, this record will be skipped !");
taskPluginCollector.collectDirtyRecord(record, e.getMessage());
}
else {
throw AddaxException.asAddaxException(KuduWriterErrorCode.PUT_KUDU_ERROR, e.getMessage());
}
}
}
}

0 comments on commit 0a66221

Please sign in to comment.