Skip to content

Commit

Permalink
Merge pull request linkedin#573 from linkedin/fix-negative-task-duration
Browse files Browse the repository at this point in the history
Fix linkedin#572: MysqlDataTruncation error when no TASK_FINISHED event for a TonY task
  • Loading branch information
erwa authored May 21, 2019
2 parents a045b64 + ba3b68b commit 4a7e873
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 5 deletions.
6 changes: 5 additions & 1 deletion app/com/linkedin/drelephant/tony/TonyMetricsAggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@ public void aggregate(HadoopApplicationData data) {

for (TonyTaskData taskData : entry.getValue().values()) {
long taskDurationSec = (taskData.getTaskEndTime() - taskData.getTaskStartTime()) / Statistics.SECOND_IN_MS;
if (taskDurationSec < 0) {
// Most likely TASK_FINISHED and APPLICATION_FINISHED events are missing for the task.
continue;
}
mbSecUsed += mbRequested * taskDurationSec;

if (maxMemoryMBUsed <= 0) {
if (maxMemoryMBUsed == 0) {
// If we don't have max memory metrics, don't calculate wasted memory.
continue;
}
Expand Down
14 changes: 14 additions & 0 deletions app/com/linkedin/drelephant/tony/data/TonyApplicationData.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ private void initTaskMap(String taskType, int taskIndex) {
}

private void processEvents(List<Event> events) {
long appFinishedTime = 0;
for (Event event : events) {
if (event.getType().equals(EventType.TASK_STARTED)) {
TaskStarted taskStartedEvent = (TaskStarted) event.getEvent();
Expand All @@ -117,6 +118,19 @@ private void processEvents(List<Event> events) {
initTaskMap(taskType, taskIndex);
_taskMap.get(taskType).get(taskIndex).setTaskEndTime(event.getTimestamp());
_taskMap.get(taskType).get(taskIndex).setMetrics(taskFinishedEvent.getMetrics());
} else if (event.getType().equals(EventType.APPLICATION_FINISHED)) {
appFinishedTime = event.getTimestamp();
}
}

// Set end time for any tasks that don't have end times to application finish time
if (appFinishedTime > 0) {
for (Map<Integer, TonyTaskData> taskDataMap : _taskMap.values()) {
for (TonyTaskData taskData : taskDataMap.values()) {
if (taskData.getTaskEndTime() == 0) {
taskData.setTaskEndTime(appFinishedTime);
}
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion app/com/linkedin/drelephant/tony/util/TonyUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static double getMaxMemoryBytesUsedForTaskType(Map<String, Map<Integer, T
for (TonyTaskData taskData : taskMap.get(taskType).values()) {
List<Metric> metrics = taskData.getMetrics();
if (metrics == null) {
return -1;
continue;
}
for (Metric metric : metrics) {
if (metric.getName().equals(Constants.MAX_MEMORY_BYTES)) {
Expand Down
69 changes: 66 additions & 3 deletions test/com/linkedin/drelephant/tony/TonyMetricsAggregatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.linkedin.tony.events.TaskStarted;
import com.linkedin.tony.rpc.impl.TaskStatus;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -39,9 +40,6 @@


public class TonyMetricsAggregatorTest {
/**
* Low memory utilization but default container size, so pass.
*/
@Test
public void testMetricsAggregator() {
Configuration conf = new Configuration(false);
Expand Down Expand Up @@ -80,4 +78,69 @@ public void testMetricsAggregator() {
Assert.assertEquals(expectedResourcesUsed, result.getResourceUsed());
Assert.assertEquals(expectedResourcesWasted, result.getResourceWasted());
}

/**
* Verifies that wasted resources is 0 when there are no metrics.
*/
@Test
public void testNullMetrics() {
Configuration conf = new Configuration(false);
conf.set(TonyConfigurationKeys.getResourceKey(Constants.WORKER_JOB_NAME, Constants.MEMORY), "4g");
conf.setInt(TonyConfigurationKeys.getInstancesKey(Constants.WORKER_JOB_NAME), 2);
conf.set(TonyConfigurationKeys.getResourceKey(Constants.PS_JOB_NAME, Constants.MEMORY), "4g");
conf.setInt(TonyConfigurationKeys.getInstancesKey(Constants.PS_JOB_NAME), 1);

List<Event> events = new ArrayList<>();
events.add(new Event(EventType.TASK_STARTED, new TaskStarted(Constants.WORKER_JOB_NAME, 0, null),0L));
events.add(new Event(EventType.TASK_STARTED, new TaskStarted(Constants.WORKER_JOB_NAME, 1, null),0L));
events.add(new Event(EventType.TASK_STARTED, new TaskStarted(Constants.PS_JOB_NAME, 0, null),0L));
events.add(new Event(EventType.TASK_FINISHED,
new TaskFinished(Constants.WORKER_JOB_NAME, 0, TaskStatus.SUCCEEDED.toString(), Collections.emptyList()),
10L * Statistics.SECOND_IN_MS));
events.add(new Event(EventType.TASK_FINISHED,
new TaskFinished(Constants.WORKER_JOB_NAME, 1, TaskStatus.SUCCEEDED.toString(), Collections.emptyList()),
20L * Statistics.SECOND_IN_MS));
events.add(new Event(EventType.TASK_FINISHED,
new TaskFinished(Constants.PS_JOB_NAME, 0, TaskStatus.SUCCEEDED.toString(), Collections.emptyList()),
20L * Statistics.SECOND_IN_MS));

long expectedResourcesUsed = 10 * 4 * 1024 + 20 * 4 * 1024 + 20 * 4 * 1024;
long expectedResourcesWasted = 0;

ApplicationType appType = new ApplicationType(Constants.APP_TYPE);
TonyApplicationData data = new TonyApplicationData("application_123_456", appType, conf, events);
TonyMetricsAggregator metricsAggregator = new TonyMetricsAggregator(null);
metricsAggregator.aggregate(data);
HadoopAggregatedData result = metricsAggregator.getResult();
Assert.assertEquals(expectedResourcesUsed, result.getResourceUsed());
Assert.assertEquals(expectedResourcesWasted, result.getResourceWasted());
}

/**
* Verifies that used and wasted resources are 0 when there are no task finished or application finished events.
*/
@Test
public void testNoEndEvents() {
Configuration conf = new Configuration(false);
conf.set(TonyConfigurationKeys.getResourceKey(Constants.WORKER_JOB_NAME, Constants.MEMORY), "4g");
conf.setInt(TonyConfigurationKeys.getInstancesKey(Constants.WORKER_JOB_NAME), 2);
conf.set(TonyConfigurationKeys.getResourceKey(Constants.PS_JOB_NAME, Constants.MEMORY), "4g");
conf.setInt(TonyConfigurationKeys.getInstancesKey(Constants.PS_JOB_NAME), 1);

List<Event> events = new ArrayList<>();
events.add(new Event(EventType.TASK_STARTED, new TaskStarted(Constants.WORKER_JOB_NAME, 0, null),0L));
events.add(new Event(EventType.TASK_STARTED, new TaskStarted(Constants.WORKER_JOB_NAME, 1, null),0L));
events.add(new Event(EventType.TASK_STARTED, new TaskStarted(Constants.PS_JOB_NAME, 0, null),0L));

long expectedResourcesUsed = 0;
long expectedResourcesWasted = 0;

ApplicationType appType = new ApplicationType(Constants.APP_TYPE);
TonyApplicationData data = new TonyApplicationData("application_123_456", appType, conf, events);
TonyMetricsAggregator metricsAggregator = new TonyMetricsAggregator(null);
metricsAggregator.aggregate(data);
HadoopAggregatedData result = metricsAggregator.getResult();
Assert.assertEquals(expectedResourcesUsed, result.getResourceUsed());
Assert.assertEquals(expectedResourcesWasted, result.getResourceWasted());
}
}
33 changes: 33 additions & 0 deletions test/com/linkedin/drelephant/tony/util/TonyUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.linkedin.drelephant.tony.util;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.linkedin.drelephant.tony.data.TonyTaskData;
import com.linkedin.tony.Constants;
import com.linkedin.tony.events.Metric;
import java.util.Map;
import java.util.TreeMap;
import org.junit.Assert;
import org.junit.Test;


public class TonyUtilsTest {
/**
* Worker 0 is missing metrics, but worker 1 has metrics; we should use worker 1's
* max memory metrics.
*/
@Test
public void testGetMaxMemorySomeTasksMissingMetrics() {
Map<Integer, TonyTaskData> taskDataMap = new TreeMap<>();
TonyTaskData worker0Data = new TonyTaskData("worker", 0);
TonyTaskData worker1Data = new TonyTaskData("worker", 1);
double worker1MaxMemoryBytes = 123d;
worker1Data.setMetrics(ImmutableList.of(new Metric(Constants.MAX_MEMORY_BYTES, worker1MaxMemoryBytes)));

taskDataMap.put(0, worker0Data);
taskDataMap.put(1, worker1Data);

Assert.assertEquals(worker1MaxMemoryBytes,
TonyUtils.getMaxMemoryBytesUsedForTaskType(ImmutableMap.of("worker", taskDataMap), "worker"), 0);
}
}

0 comments on commit 4a7e873

Please sign in to comment.