Skip to content

Commit

Permalink
Fix linkedin#572: MysqlDataTruncation error when no TASK_FINISHED eve…
Browse files Browse the repository at this point in the history
…nt for a TonY task
  • Loading branch information
Anthony Hsu committed May 20, 2019
1 parent a045b64 commit 224ab65
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 5 deletions.
6 changes: 5 additions & 1 deletion app/com/linkedin/drelephant/tony/TonyMetricsAggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@ public void aggregate(HadoopApplicationData data) {

for (TonyTaskData taskData : entry.getValue().values()) {
long taskDurationSec = (taskData.getTaskEndTime() - taskData.getTaskStartTime()) / Statistics.SECOND_IN_MS;
if (taskDurationSec < 0) {
// Most likely TASK_FINISHED and APPLICATION_FINISHED events are missing for the task.
continue;
}
mbSecUsed += mbRequested * taskDurationSec;

if (maxMemoryMBUsed <= 0) {
if (maxMemoryMBUsed == 0) {
// If we don't have max memory metrics, don't calculate wasted memory.
continue;
}
Expand Down
15 changes: 15 additions & 0 deletions app/com/linkedin/drelephant/tony/data/TonyApplicationData.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.linkedin.drelephant.analysis.ApplicationType;
import com.linkedin.drelephant.analysis.HadoopApplicationData;
import com.linkedin.tony.events.ApplicationFinished;
import com.linkedin.tony.events.Event;
import com.linkedin.tony.events.EventType;
import com.linkedin.tony.events.TaskFinished;
Expand Down Expand Up @@ -103,6 +104,7 @@ private void initTaskMap(String taskType, int taskIndex) {
}

private void processEvents(List<Event> events) {
long appFinishedTime = 0;
for (Event event : events) {
if (event.getType().equals(EventType.TASK_STARTED)) {
TaskStarted taskStartedEvent = (TaskStarted) event.getEvent();
Expand All @@ -117,6 +119,19 @@ private void processEvents(List<Event> events) {
initTaskMap(taskType, taskIndex);
_taskMap.get(taskType).get(taskIndex).setTaskEndTime(event.getTimestamp());
_taskMap.get(taskType).get(taskIndex).setMetrics(taskFinishedEvent.getMetrics());
} else if (event.getType().equals(EventType.APPLICATION_FINISHED)) {
appFinishedTime = event.getTimestamp();
}
}

// Set end time for any tasks that don't have end times to application finish time
if (appFinishedTime > 0) {
for (Map<Integer, TonyTaskData> taskDataMap : _taskMap.values()) {
for (TonyTaskData taskData : taskDataMap.values()) {
if (taskData.getTaskEndTime() == 0) {
taskData.setTaskEndTime(appFinishedTime);
}
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion app/com/linkedin/drelephant/tony/util/TonyUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static double getMaxMemoryBytesUsedForTaskType(Map<String, Map<Integer, T
for (TonyTaskData taskData : taskMap.get(taskType).values()) {
List<Metric> metrics = taskData.getMetrics();
if (metrics == null) {
return -1;
break;
}
for (Metric metric : metrics) {
if (metric.getName().equals(Constants.MAX_MEMORY_BYTES)) {
Expand Down
69 changes: 66 additions & 3 deletions test/com/linkedin/drelephant/tony/TonyMetricsAggregatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.linkedin.tony.events.TaskStarted;
import com.linkedin.tony.rpc.impl.TaskStatus;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -39,9 +40,6 @@


public class TonyMetricsAggregatorTest {
/**
* Low memory utilization but default container size, so pass.
*/
@Test
public void testMetricsAggregator() {
Configuration conf = new Configuration(false);
Expand Down Expand Up @@ -80,4 +78,69 @@ public void testMetricsAggregator() {
Assert.assertEquals(expectedResourcesUsed, result.getResourceUsed());
Assert.assertEquals(expectedResourcesWasted, result.getResourceWasted());
}

/**
* Verifies that wasted resources is 0 when there are no metrics.
*/
@Test
public void testNullMetrics() {
Configuration conf = new Configuration(false);
conf.set(TonyConfigurationKeys.getResourceKey(Constants.WORKER_JOB_NAME, Constants.MEMORY), "4g");
conf.setInt(TonyConfigurationKeys.getInstancesKey(Constants.WORKER_JOB_NAME), 2);
conf.set(TonyConfigurationKeys.getResourceKey(Constants.PS_JOB_NAME, Constants.MEMORY), "4g");
conf.setInt(TonyConfigurationKeys.getInstancesKey(Constants.PS_JOB_NAME), 1);

List<Event> events = new ArrayList<>();
events.add(new Event(EventType.TASK_STARTED, new TaskStarted(Constants.WORKER_JOB_NAME, 0, null),0L));
events.add(new Event(EventType.TASK_STARTED, new TaskStarted(Constants.WORKER_JOB_NAME, 1, null),0L));
events.add(new Event(EventType.TASK_STARTED, new TaskStarted(Constants.PS_JOB_NAME, 0, null),0L));
events.add(new Event(EventType.TASK_FINISHED,
new TaskFinished(Constants.WORKER_JOB_NAME, 0, TaskStatus.SUCCEEDED.toString(), Collections.emptyList()),
10L * Statistics.SECOND_IN_MS));
events.add(new Event(EventType.TASK_FINISHED,
new TaskFinished(Constants.WORKER_JOB_NAME, 1, TaskStatus.SUCCEEDED.toString(), Collections.emptyList()),
20L * Statistics.SECOND_IN_MS));
events.add(new Event(EventType.TASK_FINISHED,
new TaskFinished(Constants.PS_JOB_NAME, 0, TaskStatus.SUCCEEDED.toString(), Collections.emptyList()),
20L * Statistics.SECOND_IN_MS));

long expectedResourcesUsed = 10 * 4 * 1024 + 20 * 4 * 1024 + 20 * 4 * 1024;
long expectedResourcesWasted = 0;

ApplicationType appType = new ApplicationType(Constants.APP_TYPE);
TonyApplicationData data = new TonyApplicationData("application_123_456", appType, conf, events);
TonyMetricsAggregator metricsAggregator = new TonyMetricsAggregator(null);
metricsAggregator.aggregate(data);
HadoopAggregatedData result = metricsAggregator.getResult();
Assert.assertEquals(expectedResourcesUsed, result.getResourceUsed());
Assert.assertEquals(expectedResourcesWasted, result.getResourceWasted());
}

/**
* Verifies that used and wasted resources are 0 when there are no task finished or application finished events.
*/
@Test
public void testNoEndEvents() {
Configuration conf = new Configuration(false);
conf.set(TonyConfigurationKeys.getResourceKey(Constants.WORKER_JOB_NAME, Constants.MEMORY), "4g");
conf.setInt(TonyConfigurationKeys.getInstancesKey(Constants.WORKER_JOB_NAME), 2);
conf.set(TonyConfigurationKeys.getResourceKey(Constants.PS_JOB_NAME, Constants.MEMORY), "4g");
conf.setInt(TonyConfigurationKeys.getInstancesKey(Constants.PS_JOB_NAME), 1);

List<Event> events = new ArrayList<>();
events.add(new Event(EventType.TASK_STARTED, new TaskStarted(Constants.WORKER_JOB_NAME, 0, null),0L));
events.add(new Event(EventType.TASK_STARTED, new TaskStarted(Constants.WORKER_JOB_NAME, 1, null),0L));
events.add(new Event(EventType.TASK_STARTED, new TaskStarted(Constants.PS_JOB_NAME, 0, null),0L));

long expectedResourcesUsed = 0;
long expectedResourcesWasted = 0;

ApplicationType appType = new ApplicationType(Constants.APP_TYPE);
TonyApplicationData data = new TonyApplicationData("application_123_456", appType, conf, events);
TonyMetricsAggregator metricsAggregator = new TonyMetricsAggregator(null);
metricsAggregator.aggregate(data);
HadoopAggregatedData result = metricsAggregator.getResult();
Assert.assertEquals(expectedResourcesUsed, result.getResourceUsed());
Assert.assertEquals(expectedResourcesWasted, result.getResourceWasted());
}
}

0 comments on commit 224ab65

Please sign in to comment.