Skip to content

Commit

Permalink
print detail error log when async side load data
Browse files Browse the repository at this point in the history
  • Loading branch information
todd5167 committed Jul 20, 2019
1 parent 62ee4f9 commit 67990fa
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ plugins/
lib/
.vertx/
bin/nohup.out

.DS_Store
bin/sideSql.txt
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public void onFailure(Throwable t) {
t.getMessage());
System.out.println("Failed to retrieve the data: " + t.getMessage());
cluster.closeAsync();
resultFuture.complete(null);
resultFuture.completeExceptionally(t);
}
});
}
Expand Down
17 changes: 16 additions & 1 deletion core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/



package com.dtstack.flink.sql.side;

Expand All @@ -28,9 +28,12 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
import org.apache.flink.types.Row;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeoutException;

/**
* All interfaces inherit naming rules: type + "AsyncReqRow" such as == "MysqlAsyncReqRow
Expand All @@ -50,6 +53,18 @@ public AsyncReqRow(SideInfo sideInfo){
this.sideInfo = sideInfo;
}

@Override
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
try {
if (null == future.get()) {
new TimeoutException("Async function call has timed out.");
}
} catch (Exception e) {
throw new Exception(e);
}
}

private void initCache(){
SideTableInfo sideTableInfo = sideInfo.getSideTableInfo();
if(sideTableInfo.getCacheType() == null || ECacheType.NONE.name().equalsIgnoreCase(sideTableInfo.getCacheType())){
Expand Down
1 change: 1 addition & 0 deletions hbase/hbase-side/hbase-async-side/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
<artifactSet>
<excludes>
<exclude>org.apache.hadoop:hadoop-common</exclude>
<exclude>org.apache.hadoop:hadoop-yarn-common</exclude>
<exclude>org.apache.hadoop:hadoop-auth</exclude>
<exclude>org.apache.hadoop:hadoop-mapreduce-client-core</exclude>
<exclude>org.slf4j:*</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void asyncGetData(String tableName, String rowKeyStr, Row input, ResultFu
}
}
}catch (Exception e){
resultFuture.complete(null);
resultFuture.completeExceptionally(e);
LOG.error("record:" + input);
LOG.error("get side record exception:", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
}
resultFuture.complete(rowList);
} else {
throw new RuntimeException("not support cache obj type " + val.getType());
resultFuture.completeExceptionally(new RuntimeException("not support cache obj type " + val.getType()));
}
return;
}
Expand All @@ -110,7 +110,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
connection.queryWithParams(sqlCondition, inputParams, rs -> {
if (rs.failed()) {
LOG.error("Cannot retrieve the data from the database", rs.cause());
resultFuture.complete(null);
resultFuture.completeExceptionally(rs.cause());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
}
resultFuture.complete(rowList);
}else{
throw new RuntimeException("not support cache obj type " + val.getType());
RuntimeException exception = new RuntimeException("not support cache obj type " + val.getType());
resultFuture.completeExceptionally(exception);
}
return;
}
Expand Down

0 comments on commit 67990fa

Please sign in to comment.