Skip to content

Commit 8694176

Browse files
authored
[Improve] Improve doris sink to random use be (apache#6132)
1 parent 6822b9c commit 8694176

File tree

2 files changed

+26
-22
lines changed

2 files changed

+26
-22
lines changed

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java

+6-13
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,7 @@
3535
import org.apache.seatunnel.connectors.doris.util.HttpUtil;
3636
import org.apache.seatunnel.connectors.doris.util.UnsupportedTypeConverterUtils;
3737

38-
import com.google.common.annotations.VisibleForTesting;
3938
import com.google.common.util.concurrent.ThreadFactoryBuilder;
40-
import lombok.NonNull;
4139
import lombok.extern.slf4j.Slf4j;
4240

4341
import java.io.IOException;
@@ -76,7 +74,6 @@ public class DorisSinkWriter
7674
private transient Thread executorThread;
7775
private transient volatile Exception loadException = null;
7876
private List<BackendV2.BackendRowV2> backends;
79-
private long pos;
8077

8178
public DorisSinkWriter(
8279
SinkWriter.Context context,
@@ -156,7 +153,7 @@ public void write(SeaTunnelRow element) throws IOException {
156153
@Override
157154
public Optional<DorisCommitInfo> prepareCommit() throws IOException {
158155
RespContent respContent = flush();
159-
if (!dorisConfig.getEnable2PC()) {
156+
if (!dorisConfig.getEnable2PC() || respContent == null) {
160157
return Optional.empty();
161158
}
162159
long txnId = respContent.getTxnId();
@@ -165,12 +162,12 @@ public Optional<DorisCommitInfo> prepareCommit() throws IOException {
165162
new DorisCommitInfo(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId));
166163
}
167164

168-
@NonNull private RespContent flush() throws IOException {
165+
private RespContent flush() throws IOException {
169166
// disable exception checker before stop load.
170167
loading = false;
171168
checkState(dorisStreamLoad != null);
172169
RespContent respContent = dorisStreamLoad.stopLoad();
173-
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
170+
if (respContent != null && !DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
174171
String errMsg =
175172
String.format(
176173
"stream load error: %s, see more in %s",
@@ -253,14 +250,11 @@ public void close() throws IOException {
253250
}
254251
}
255252

256-
@VisibleForTesting
257-
public String getAvailableBackend() {
258-
long tmp = pos + backends.size();
259-
while (pos < tmp) {
260-
BackendV2.BackendRowV2 backend = backends.get((int) (pos % backends.size()));
253+
private String getAvailableBackend() {
254+
Collections.shuffle(backends);
255+
for (BackendV2.BackendRowV2 backend : backends) {
261256
String res = backend.toBackendString();
262257
if (tryHttpConnection(res)) {
263-
pos++;
264258
return res;
265259
}
266260
}
@@ -279,7 +273,6 @@ public boolean tryHttpConnection(String backend) {
279273
return true;
280274
} catch (Exception ex) {
281275
log.warn("Failed to connect to backend:{}", backend, ex);
282-
pos++;
283276
return false;
284277
}
285278
}

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java

+20-9
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public class DorisStreamLoad implements Serializable {
7878
private Future<CloseableHttpResponse> pendingLoadFuture;
7979
private final CloseableHttpClient httpClient;
8080
private final ExecutorService executorService;
81-
private boolean loadBatchFirstRecord;
81+
private volatile boolean loadBatchFirstRecord;
82+
private String label;
8283
private long recordCount = 0;
8384

8485
public DorisStreamLoad(
@@ -191,6 +192,8 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
191192
public void writeRecord(byte[] record) throws IOException {
192193
if (loadBatchFirstRecord) {
193194
loadBatchFirstRecord = false;
195+
recordStream.startInput();
196+
startStreamLoad();
194197
} else {
195198
recordStream.write(lineDelimiter);
196199
}
@@ -214,21 +217,29 @@ public RespContent handlePreCommitResponse(CloseableHttpResponse response) throw
214217
}
215218

216219
public RespContent stopLoad() throws IOException {
217-
recordStream.endInput();
218-
log.info("stream load stopped.");
219-
checkState(pendingLoadFuture != null);
220-
try {
221-
return handlePreCommitResponse(pendingLoadFuture.get());
222-
} catch (Exception e) {
223-
throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
220+
if (pendingLoadFuture != null) {
221+
log.info("stream load stopped.");
222+
recordStream.endInput();
223+
try {
224+
return handlePreCommitResponse(pendingLoadFuture.get());
225+
} catch (Exception e) {
226+
throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
227+
} finally {
228+
pendingLoadFuture = null;
229+
}
230+
} else {
231+
return null;
224232
}
225233
}
226234

227235
public void startLoad(String label) throws IOException {
228236
loadBatchFirstRecord = true;
229237
recordCount = 0;
238+
this.label = label;
239+
}
240+
241+
private void startStreamLoad() {
230242
HttpPutBuilder putBuilder = new HttpPutBuilder();
231-
recordStream.startInput();
232243
log.info("stream load started for {}", label);
233244
try {
234245
InputStreamEntity entity = new InputStreamEntity(recordStream);

0 commit comments

Comments
 (0)