Skip to content

Commit

Permalink
Make it configurable for OpenLineageService async executor. (MarquezP…
Browse files Browse the repository at this point in the history
…roject#1866)

Signed-off-by: Minkyu Park <[email protected]>
  • Loading branch information
fm100 authored Feb 7, 2022
1 parent d31149d commit a1106a0
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions api/src/main/java/marquez/service/OpenLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -44,17 +45,23 @@ public class OpenLineageService extends DelegatingDaos.DelegatingOpenLineageDao
private final DatasetVersionDao datasetVersionDao;
private final ObjectMapper mapper = Utils.newObjectMapper();

private final Executor executor;

public OpenLineageService(BaseDao baseDao, RunService runService) {
this(baseDao, runService, ForkJoinPool.commonPool());
}

public OpenLineageService(BaseDao baseDao, RunService runService, Executor executor) {
super(baseDao.createOpenLineageDao());
this.runService = runService;
this.datasetVersionDao = baseDao.createDatasetVersionDao();
this.executor = executor;
}

public CompletableFuture<Void> createAsync(LineageEvent event) {
CompletableFuture<Void> marquez =
CompletableFuture.supplyAsync(
withSentry(withMdc(() -> updateMarquezModel(event, mapper))),
ForkJoinPool.commonPool())
withSentry(withMdc(() -> updateMarquezModel(event, mapper))), executor)
.thenAccept(
(update) -> {
if (event.getEventType() != null) {
Expand All @@ -79,7 +86,7 @@ public CompletableFuture<Void> createAsync(LineageEvent event) {
event.getJob().getNamespace(),
createJsonArray(event, mapper),
event.getProducer()))),
ForkJoinPool.commonPool());
executor);

return CompletableFuture.allOf(marquez, openLineage);
}
Expand Down

0 comments on commit a1106a0

Please sign in to comment.