Skip to content

Commit

Permalink
Merge pull request linkedin#589 from linkedin/tony-check-intermediate…
Browse files Browse the repository at this point in the history
…-master

TonyFetcher should also check intermediate directory if job not found in finished directory linkedin#587
  • Loading branch information
erwa authored Jun 4, 2019
2 parents 20fb8d2 + a59de92 commit 7f648c9
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 10 deletions.
7 changes: 7 additions & 0 deletions app/com/linkedin/drelephant/tony/fetchers/TonyFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

public class TonyFetcher implements ElephantFetcher<TonyApplicationData> {
private static final Logger _LOGGER = Logger.getLogger(TonyFetcher.class);
private final Path _intermediateDir;
private final Path _finishedDir;
private final FileSystem _fs;

Expand All @@ -52,6 +53,7 @@ public TonyFetcher(FetcherConfigurationData fetcherConfig) throws IOException {
_LOGGER.info("Using TonY conf dir: " + tonyConfDir);

conf.addResource(new Path(tonyConfDir + Path.SEPARATOR + Constants.TONY_SITE_CONF));
_intermediateDir = new Path(conf.get(TonyConfigurationKeys.TONY_HISTORY_INTERMEDIATE));
_finishedDir = new Path(conf.get(TonyConfigurationKeys.TONY_HISTORY_FINISHED));
_fs = _finishedDir.getFileSystem(conf);
}
Expand All @@ -69,6 +71,11 @@ public TonyApplicationData fetchData(AnalyticJob job) throws Exception {
// In case we don't find the history files in yyyy/MM/dd, we should check the previous day as well.
String yearMonthDay = ParserUtils.getYearMonthDayDirectory(date);
Path jobDir = new Path(_finishedDir, yearMonthDay + Path.SEPARATOR + job.getAppId());
if (!_fs.exists(jobDir)) {
// check intermediate dir
jobDir = new Path(_intermediateDir, job.getAppId());
}

_LOGGER.debug("Job directory for " + job.getAppId() + ": " + jobDir);

// parse config
Expand Down
39 changes: 29 additions & 10 deletions test/com/linkedin/drelephant/tony/fetchers/TonyFetcherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
Expand All @@ -48,42 +47,52 @@


public class TonyFetcherTest {
private static final String APPLICATION_ID = "application_123_456";
private static final String APPLICATION_ID_1 = "application_123_456";
private static final String APPLICATION_ID_2 = "application_789_101";
private static File _intermediateDir;
private static File _finishedDir;
private static String _tonyConfDir;
private static Date _endDate;

@BeforeClass
public static void setup() throws IOException, ParseException {
setupFinishedApplicationDir();
setupTestData();
setupTestTonyConfDir();
}

private static void setupFinishedApplicationDir() throws IOException, ParseException {
private static void setupTestData() throws IOException, ParseException {
String yearMonthDay = "2019/05/02";
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd");
_endDate = sdf.parse(yearMonthDay);

File tempDir = Files.createTempDir();
_intermediateDir = new File(tempDir, "intermediate");
_finishedDir = new File(tempDir, "finished");
File appDir = new File(_finishedDir, yearMonthDay + Path.SEPARATOR + APPLICATION_ID);

createAppHistoryDir(new File(_intermediateDir, APPLICATION_ID_1));
createAppHistoryDir(new File(_finishedDir, yearMonthDay + Path.SEPARATOR + APPLICATION_ID_2));
}

private static void createAppHistoryDir(File appDir) throws IOException {
appDir.mkdirs();

// write fake config history file
Configuration conf = new Configuration(false);
conf.set("foo", "bar");

File configFile = new File(appDir, Constants.TONY_FINAL_XML);
conf.writeXml(new FileOutputStream(configFile));

// create fake events
Event event0 = new Event(EventType.TASK_FINISHED, new TaskFinished("worker", 0, "SUCCEEDED",
ImmutableList.of(new Metric("my_metric", 0.0))), System.currentTimeMillis());
Event event1 = new Event(EventType.TASK_FINISHED, new TaskFinished("worker", 1, "SUCCEEDED",
ImmutableList.of(new Metric("my_metric", 1.0))), System.currentTimeMillis());
Event event2 = new Event(EventType.TASK_FINISHED, new TaskFinished("ps", 0, "SUCCEEDED",
ImmutableList.of(new Metric("my_metric", 0.0))), System.currentTimeMillis());

// write fake events history file
File eventFile = new File(appDir,
APPLICATION_ID + "-0-" + _endDate.getTime() + "-user1-SUCCEEDED." + Constants.HISTFILE_SUFFIX);
APPLICATION_ID_1 + "-0-" + _endDate.getTime() + "-user1-SUCCEEDED." + Constants.HISTFILE_SUFFIX);
DatumWriter<Event> userDatumWriter = new SpecificDatumWriter<>(Event.class);
DataFileWriter<Event> dataFileWriter = new DataFileWriter<>(userDatumWriter);
dataFileWriter.create(event0.getSchema(), eventFile);
Expand All @@ -95,6 +104,7 @@ private static void setupFinishedApplicationDir() throws IOException, ParseExcep

private static void setupTestTonyConfDir() throws IOException {
Configuration testTonyConf = new Configuration(false);
testTonyConf.set(TonyConfigurationKeys.TONY_HISTORY_INTERMEDIATE, _intermediateDir.getPath());
testTonyConf.set(TonyConfigurationKeys.TONY_HISTORY_FINISHED, _finishedDir.getPath());

File confDir = Files.createTempDir();
Expand All @@ -104,19 +114,28 @@ private static void setupTestTonyConfDir() throws IOException {
}

@Test
public void testFetchData() throws Exception {
public void testFetchDataIntermediateDir() throws Exception {
testHelper(APPLICATION_ID_1);
}

@Test
public void testFetchDataFinishedDir() throws Exception {
testHelper(APPLICATION_ID_2);
}

private static void testHelper(String appId) throws Exception {
FetcherConfigurationData configData = new FetcherConfigurationData(null, null,
ImmutableMap.of(Constants.TONY_CONF_DIR, _tonyConfDir));
TonyFetcher tonyFetcher = new TonyFetcher(configData);

AnalyticJob job = new AnalyticJob();
ApplicationType tonyAppType = new ApplicationType(Constants.APP_TYPE);
job.setFinishTime(_endDate.getTime());
job.setAppId(APPLICATION_ID);
job.setAppId(appId);
job.setAppType(tonyAppType);
TonyApplicationData appData = tonyFetcher.fetchData(job);

Assert.assertEquals(APPLICATION_ID, appData.getAppId());
Assert.assertEquals(appId, appData.getAppId());
Assert.assertEquals(tonyAppType, appData.getApplicationType());
Assert.assertEquals("bar", appData.getConf().get("foo"));
Map<String, Map<Integer, TonyTaskData>> metricsMap = appData.getTaskMap();
Expand Down

0 comments on commit 7f648c9

Please sign in to comment.