Skip to content

Commit

Permalink
[hotfix][tests] Raise logging level in TestJobExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan committed May 26, 2022
1 parent 9bf10eb commit 0b139a1
Showing 1 changed file with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,28 +77,28 @@ private TestJobExecutor(
public static TestJobExecutor execute(
TestJobWithDescription testJob, MiniClusterWithClientResource miniClusterResource)
throws Exception {
LOG.debug("submitGraph: {}", testJob.jobGraph);
LOG.info("submitGraph: {}", testJob.jobGraph);
JobID job = miniClusterResource.getClusterClient().submitJob(testJob.jobGraph).get();
waitForAllTaskRunning(miniClusterResource.getMiniCluster(), job, false);
return new TestJobExecutor(testJob, job, miniClusterResource);
}

public TestJobExecutor waitForAllRunning() throws Exception {
LOG.debug("waitForAllRunning in {}", jobID);
LOG.info("waitForAllRunning in {}", jobID);
waitForAllTaskRunning(miniClusterResource.getMiniCluster(), jobID, true);
return this;
}

public TestJobExecutor waitForEvent(Class<? extends TestEvent> eventClass) throws Exception {
LOG.debug("waitForEvent: {}", eventClass.getSimpleName());
LOG.info("waitForEvent: {}", eventClass.getSimpleName());
testJob.eventQueue.withHandler(
e -> eventClass.isAssignableFrom(e.getClass()) ? STOP : CONTINUE);
return this;
}

public TestJobExecutor stopWithSavepoint(TemporaryFolder folder, boolean withDrain)
throws Exception {
LOG.debug("stopWithSavepoint: {} (withDrain: {})", folder, withDrain);
LOG.info("stopWithSavepoint: {} (withDrain: {})", folder, withDrain);
ClusterClient<?> client = miniClusterResource.getClusterClient();
client.stopWithSavepoint(
jobID,
Expand All @@ -111,13 +111,13 @@ public TestJobExecutor stopWithSavepoint(TemporaryFolder folder, boolean withDra

public TestJobExecutor sendOperatorCommand(
String operatorID, TestCommand command, TestCommandScope scope) {
LOG.debug("send command: {} to {}/{}", command, operatorID, scope);
LOG.info("send command: {} to {}/{}", command, operatorID, scope);
testJob.commandQueue.dispatch(command, scope, operatorID);
return this;
}

public void triggerFailover(String operatorID) throws Exception {
LOG.debug("sendCommand: {}", FAIL);
LOG.info("sendCommand: {}", FAIL);
BlockingQueue<TestEvent> queue = new LinkedBlockingQueue<>();
Consumer<TestEvent> listener = queue::add;
testJob.eventQueue.addListener(listener);
Expand Down Expand Up @@ -183,13 +183,13 @@ private void handleFailoverTimeout(TimeoutException e) throws Exception {
}

public TestJobExecutor sendBroadcastCommand(TestCommand command, TestCommandScope scope) {
LOG.debug("sendCommand: {}", command);
LOG.info("sendCommand: {}", command);
testJob.commandQueue.broadcast(command, scope);
return this;
}

public TestJobExecutor waitForTermination() throws Exception {
LOG.debug("waitForTermination");
LOG.info("waitForTermination");
while (!miniClusterResource
.getClusterClient()
.getJobStatus(jobID)
Expand All @@ -201,7 +201,7 @@ public TestJobExecutor waitForTermination() throws Exception {
}

public TestJobExecutor assertFinishedSuccessfully() throws Exception {
LOG.debug("assertFinishedSuccessfully");
LOG.info("assertFinishedSuccessfully");
JobStatus jobStatus = miniClusterResource.getClusterClient().getJobStatus(jobID).get();
if (!jobStatus.equals(FINISHED)) {
String message = String.format("Job didn't finish successfully, status: %s", jobStatus);
Expand All @@ -222,7 +222,7 @@ public TestJobExecutor assertFinishedSuccessfully() throws Exception {

public TestJobExecutor waitForSubtasksToFinish(JobVertexID id, TestCommandScope scope)
throws Exception {
LOG.debug("waitForSubtasksToFinish vertex {}, all subtasks: {}", id, scope);
LOG.info("waitForSubtasksToFinish vertex {}, all subtasks: {}", id, scope);
CommonTestUtils.waitForSubtasksToFinish(
miniClusterResource.getMiniCluster(), jobID, id, scope == ALL_SUBTASKS);
return this;
Expand Down

0 comments on commit 0b139a1

Please sign in to comment.