Skip to content

Commit

Permalink
Replaced the JobClient by an actor.
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Dec 18, 2014
1 parent ac94253 commit 241c1ca
Show file tree
Hide file tree
Showing 62 changed files with 1,256 additions and 1,416 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.flink.hadoopcompatibility.mapred.wrapper;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.memory.DataInputView;
Expand Down Expand Up @@ -87,6 +89,30 @@ public void read(DataInputView in) throws IOException {

}

private void writeObject(ObjectOutputStream out) throws IOException {
out.writeInt(splitNumber);
out.writeUTF(hadoopInputSplitTypeName);
hadoopInputSplit.write(out);

}

@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
this.splitNumber=in.readInt();
this.hadoopInputSplitTypeName = in.readUTF();
if(hadoopInputSplit == null) {
try {
Class<? extends org.apache.hadoop.io.Writable> inputSplit =
Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit );
}
catch (Exception e) {
throw new RuntimeException("Unable to create InputSplit", e);
}
}
this.hadoopInputSplit.readFields(in);
}

@Override
public int getSplitNumber() {
return this.splitNumber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.flink.hadoopcompatibility.mapreduce.wrapper;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.memory.DataInputView;
Expand Down Expand Up @@ -78,6 +80,31 @@ public void read(DataInputView in) throws IOException {
}
((Writable)this.mapreduceInputSplit).readFields(in);
}

private void writeObject(ObjectOutputStream out) throws IOException {
out.writeInt(this.splitNumber);
out.writeUTF(this.mapreduceInputSplit.getClass().getName());
Writable w = (Writable) this.mapreduceInputSplit;
w.write(out);

}

@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
this.splitNumber=in.readInt();
String className = in.readUTF();

if(this.mapreduceInputSplit == null) {
try {
Class<? extends org.apache.hadoop.io.Writable> inputSplit =
Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit);
} catch (Exception e) {
throw new RuntimeException("Unable to create InputSplit", e);
}
}
((Writable)this.mapreduceInputSplit).readFields(in);
}

@Override
public int getSplitNumber() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Properties;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
Expand All @@ -54,12 +55,13 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.event.job.RecentJobEvent;
import org.apache.flink.runtime.jobmanager.RunningJob;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.EventCollectorMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.RequestRunningJobs$;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsResponse;
import org.apache.flink.util.StringUtils;

/**
Expand Down Expand Up @@ -508,34 +510,34 @@ protected int list(String[] args) {
return 1;
}

List<RecentJobEvent> recentJobs = AkkaUtils.<EventCollectorMessages.RecentJobs>ask(jobManager,
EventCollectorMessages.RequestRecentJobEvents$.MODULE$).asJavaList();
List<RunningJob> jobs = AkkaUtils.<RunningJobsResponse>ask(jobManager,
RequestRunningJobs$.MODULE$).asJavaList();

ArrayList<RecentJobEvent> runningJobs = null;
ArrayList<RecentJobEvent> scheduledJobs = null;
ArrayList<RunningJob> runningJobs = null;
ArrayList<RunningJob> scheduledJobs = null;
if (running) {
runningJobs = new ArrayList<RecentJobEvent>();
runningJobs = new ArrayList<RunningJob>();
}
if (scheduled) {
scheduledJobs = new ArrayList<RecentJobEvent>();
scheduledJobs = new ArrayList<RunningJob>();
}

for (RecentJobEvent rje : recentJobs) {
for (RunningJob rj : jobs) {

if (running && rje.getJobStatus().equals(JobStatus.RUNNING)) {
runningJobs.add(rje);
if (running && rj.jobStatus().equals(JobStatus.RUNNING)) {
runningJobs.add(rj);
}
if (scheduled && rje.getJobStatus().equals(JobStatus.CREATED)) {
scheduledJobs.add(rje);
if (scheduled && rj.jobStatus().equals(JobStatus.CREATED)) {
scheduledJobs.add(rj);
}
}

SimpleDateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
Comparator<RecentJobEvent> njec = new Comparator<RecentJobEvent>(){
Comparator<RunningJob> njec = new Comparator<RunningJob>(){

@Override
public int compare(RecentJobEvent o1, RecentJobEvent o2) {
return (int)(o1.getTimestamp()-o2.getTimestamp());
public int compare(RunningJob o1, RunningJob o2) {
return (int)(o1.timestamp()-o2.timestamp());
}
};

Expand All @@ -546,8 +548,9 @@ public int compare(RecentJobEvent o1, RecentJobEvent o2) {
Collections.sort(runningJobs, njec);

System.out.println("------------------------ Running Jobs ------------------------");
for(RecentJobEvent je : runningJobs) {
System.out.println(df.format(new Date(je.getTimestamp()))+" : "+je.getJobID().toString()+" : "+je.getJobName());
for(RunningJob rj : runningJobs) {
System.out.println(df.format(new Date(rj.timestamp()))+" : "+rj
.jobID().toString()+" : "+rj.jobName());
}
System.out.println("--------------------------------------------------------------");
}
Expand All @@ -559,8 +562,9 @@ public int compare(RecentJobEvent o1, RecentJobEvent o2) {
Collections.sort(scheduledJobs, njec);

System.out.println("----------------------- Scheduled Jobs -----------------------");
for(RecentJobEvent je : scheduledJobs) {
System.out.println(df.format(new Date(je.getTimestamp()))+" : "+je.getJobID().toString()+" : "+je.getJobName());
for(RunningJob rj : scheduledJobs) {
System.out.println(df.format(new Date(rj.timestamp()))+" : "+rj.jobID()
.toString()+" : "+rj.jobName());
}
System.out.println("--------------------------------------------------------------");
}
Expand Down Expand Up @@ -627,7 +631,7 @@ protected int cancel(String[] args) {
return 1;
}

AkkaUtils.ask(jobManager, new JobManagerMessages.CancelJob(jobId));
AkkaUtils.ask(jobManager, new CancelJob(jobId));
return 0;
}
catch (Throwable t) {
Expand Down Expand Up @@ -750,7 +754,8 @@ protected ActorRef getJobManager(CommandLine line) throws IOException {
return null;
}

return JobManager.getJobManager(jobManagerAddress);
return JobManager.getJobManager(jobManagerAddress,
ActorSystem.create("CliFrontendActorSystem", AkkaUtils.getDefaultActorSystemConfig()));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

import java.util.List;

import akka.actor.ActorRef;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.common.Program;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand Down Expand Up @@ -54,22 +56,13 @@ public class LocalExecutor extends PlanExecutor {

// ---------------------------------- config options ------------------------------------------

private int jobManagerRpcPort = -1;

private int taskManagerRpcPort = -1;

private int taskManagerDataPort = -1;

private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;

private String configDir;

private String hdfsConfigFile;

private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;

private boolean defaultAlwaysCreateDirectory = false;

// --------------------------------------------------------------------------------------------

public LocalExecutor() {
Expand All @@ -78,45 +71,7 @@ public LocalExecutor() {
}
}

public int getJobManagerRpcPort() {
return jobManagerRpcPort;
}

public void setJobManagerRpcPort(int jobManagerRpcPort) {
this.jobManagerRpcPort = jobManagerRpcPort;
}

public int getTaskManagerRpcPort() {
return taskManagerRpcPort;
}

public void setTaskManagerRpcPort(int taskManagerRpcPort) {
this.taskManagerRpcPort = taskManagerRpcPort;
}

public int getTaskManagerDataPort() {
return taskManagerDataPort;
}

public void setTaskManagerDataPort(int taskManagerDataPort) {
this.taskManagerDataPort = taskManagerDataPort;
}

public String getConfigDir() {
return configDir;
}

public void setConfigDir(String configDir) {
this.configDir = configDir;
}

public String getHdfsConfig() {
return hdfsConfigFile;
}

public void setHdfsConfig(String hdfsConfig) {
this.hdfsConfigFile = hdfsConfig;
}

public boolean isDefaultOverwriteFiles() {
return defaultOverwriteFiles;
Expand All @@ -126,14 +81,6 @@ public void setDefaultOverwriteFiles(boolean defaultOverwriteFiles) {
this.defaultOverwriteFiles = defaultOverwriteFiles;
}

public boolean isDefaultAlwaysCreateDirectory() {
return defaultAlwaysCreateDirectory;
}

public void setDefaultAlwaysCreateDirectory(boolean defaultAlwaysCreateDirectory) {
this.defaultAlwaysCreateDirectory = defaultAlwaysCreateDirectory;
}

public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }

public int getTaskManagerNumSlots() { return this.taskManagerNumSlots; }
Expand All @@ -147,7 +94,8 @@ public void start() throws Exception {
// create the embedded runtime
this.flink = new LocalFlinkMiniCluster(configDir);
Configuration configuration = new Configuration();

configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, isDefaultOverwriteFiles());
// start it up
this.flink.start(configuration);
} else {
Expand Down Expand Up @@ -216,10 +164,10 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {

NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
JobGraph jobGraph = jgg.compileJobGraph(op);
JobClient jobClient = this.flink.getJobClient(jobGraph);
JobExecutionResult result = jobClient.submitJobAndWait();
return result;

ActorRef jobClient = flink.getJobClient();

return JobClient.submitJobAndWait(jobGraph, true, jobClient);
}
finally {
if (shutDownAtEnd) {
Expand Down
Loading

0 comments on commit 241c1ca

Please sign in to comment.