diff --git a/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/event/job/AbstractEvent.java b/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/event/job/AbstractEvent.java index 5a9d7c14a7856..bcefae87d575d 100644 --- a/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/event/job/AbstractEvent.java +++ b/nephele/nephele-common/src/main/java/eu/stratosphere/nephele/event/job/AbstractEvent.java @@ -35,8 +35,8 @@ public abstract class AbstractEvent implements IOReadableWritable { /** * Static variable that points to the current global sequence number */ - private static final AtomicLong globalSequenceNumber = new AtomicLong(0); - + private static final AtomicLong GLOBAL_SEQUENCE_NUMBER = new AtomicLong(0); + /** * Auxiliary object which helps to convert a {@link Date} object to the given string representation. */ @@ -51,7 +51,7 @@ public abstract class AbstractEvent implements IOReadableWritable { * The sequence number of the event. */ private long sequenceNumber = -1; - + /** * Constructs a new abstract event object. * @@ -60,10 +60,10 @@ public abstract class AbstractEvent implements IOReadableWritable { */ public AbstractEvent(final long timestamp) { this.timestamp = timestamp; - this.sequenceNumber = globalSequenceNumber.incrementAndGet(); + this.sequenceNumber = GLOBAL_SEQUENCE_NUMBER.incrementAndGet(); } - - public long getSequenceNumber(){ + + public long getSequenceNumber() { return this.sequenceNumber; } diff --git a/nephele/nephele-management/src/main/java/eu/stratosphere/nephele/event/job/RecentJobEvent.java b/nephele/nephele-management/src/main/java/eu/stratosphere/nephele/event/job/RecentJobEvent.java index 27edc1f3d4f8d..89bf7b3354554 100644 --- a/nephele/nephele-management/src/main/java/eu/stratosphere/nephele/event/job/RecentJobEvent.java +++ b/nephele/nephele-management/src/main/java/eu/stratosphere/nephele/event/job/RecentJobEvent.java @@ -51,6 +51,11 @@ public final class RecentJobEvent extends AbstractEvent implements ManagementEve */ private boolean isProfilingEnabled; + /** + * The time stamp of the job submission. + */ + private long submissionTimestamp; + /** * Constructs a new event. * @@ -62,11 +67,13 @@ public final class RecentJobEvent extends AbstractEvent implements ManagementEve * the status of the job * @param isProfilingEnabled * true if profiling is enabled for this job, false otherwise + * @param submissionTimestamp + * the time stamp of the job submission * @param timestamp * the time stamp of the event */ public RecentJobEvent(final JobID jobID, final String jobName, final JobStatus jobStatus, - final boolean isProfilingEnabled, final long timestamp) { + final boolean isProfilingEnabled, final long submissionTimestamp, final long timestamp) { super(timestamp); if (jobStatus == null) { @@ -77,6 +84,7 @@ public RecentJobEvent(final JobID jobID, final String jobName, final JobStatus j this.jobName = jobName; this.jobStatus = jobStatus; this.isProfilingEnabled = isProfilingEnabled; + this.submissionTimestamp = submissionTimestamp; } /** @@ -122,6 +130,16 @@ public JobStatus getJobStatus() { return this.jobStatus; } + /** + * Returns the time stamp of the job submission. + * + * @return the time stamp of the job submission + */ + public long getSubmissionTimestamp() { + + return this.submissionTimestamp; + } + /** * {@inheritDoc} */ @@ -141,6 +159,9 @@ public void read(final DataInput in) throws IOException { // Read if profiling is enabled this.isProfilingEnabled = in.readBoolean(); + + // Read the submission time stamp + this.submissionTimestamp = in.readLong(); } /** @@ -161,6 +182,9 @@ public void write(final DataOutput out) throws IOException { // Write out if profiling is enabled out.writeBoolean(this.isProfilingEnabled); + + // Write out the submission time stamp + out.writeLong(this.submissionTimestamp); } /** @@ -191,6 +215,10 @@ public boolean equals(final Object obj) { return false; } + if (this.submissionTimestamp != newJobEvent.getSubmissionTimestamp()) { + return false; + } + return true; } diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/EventCollector.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/EventCollector.java index cbdbd0f1e38c1..39760dbfecb04 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/EventCollector.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/EventCollector.java @@ -179,6 +179,11 @@ private static final class JobStatusListenerWrapper implements JobStatusListener */ private final boolean isProfilingAvailable; + /** + * The time stamp of the job submission + */ + private final long submissionTimestamp; + /** * Constructs a new job status listener wrapper. * @@ -188,12 +193,16 @@ private static final class JobStatusListenerWrapper implements JobStatusListener * the name of the job * @param isProfilingAvailable * true if profiling events are collected for the job, false otherwise + * @param submissionTimestamp + * the submission time stamp of the job */ public JobStatusListenerWrapper(final EventCollector eventCollector, final String jobName, - final boolean isProfilingAvailable) { + final boolean isProfilingAvailable, final long submissionTimestamp) { + this.eventCollector = eventCollector; this.jobName = jobName; this.isProfilingAvailable = isProfilingAvailable; + this.submissionTimestamp = submissionTimestamp; } /** @@ -214,7 +223,8 @@ public void jobStatusHasChanged(final ExecutionGraph executionGraph, final Inter // Update recent job event final JobStatus jobStatus = InternalJobStatus.toJobStatus(newJobStatus); if (jobStatus != null) { - this.eventCollector.updateRecentJobEvent(jobID, this.jobName, this.isProfilingAvailable, jobStatus); + this.eventCollector.updateRecentJobEvent(jobID, this.jobName, this.isProfilingAvailable, + this.submissionTimestamp, jobStatus); this.eventCollector.addEvent(jobID, new JobEvent(System.currentTimeMillis(), jobStatus, optionalMessage)); @@ -423,17 +433,20 @@ private void addEvent(JobID jobID, AbstractEvent event) { * the name of the new job * @param isProfilingEnabled * true if profiling events are collected for the job, false otherwise - * @param initialJobStatus - * the initial status of the job + * @param submissionTimestamp + * the submission time stamp of the job + * @param jobStatus + * the status of the job */ private void updateRecentJobEvent(final JobID jobID, final String jobName, final boolean isProfilingEnabled, - final JobStatus jobStatus) { + final long submissionTimestamp, final JobStatus jobStatus) { final long currentTime = System.currentTimeMillis(); final RecentJobEvent recentJobEvent = new RecentJobEvent(jobID, jobName, jobStatus, isProfilingEnabled, - currentTime); + submissionTimestamp, currentTime); synchronized (this.recentJobs) { + this.recentJobs.put(jobID, recentJobEvent); } } @@ -449,8 +462,11 @@ private void updateRecentJobEvent(final JobID jobID, final String jobName, final * the execution graph representing the job * @param profilingAvailable * indicates if profiling data is available for this job + * @param submissionTimestamp + * the submission time stamp of the job */ - public void registerJob(final ExecutionGraph executionGraph, final boolean profilingAvailable) { + public void registerJob(final ExecutionGraph executionGraph, final boolean profilingAvailable, + final long submissionTimestamp) { final Iterator it = new ExecutionGraphIterator(executionGraph, true); @@ -470,7 +486,7 @@ public void registerJob(final ExecutionGraph executionGraph, final boolean profi // Register one job status listener wrapper for the entire job executionGraph.registerJobStatusListener(new JobStatusListenerWrapper(this, executionGraph.getJobName(), - profilingAvailable)); + profilingAvailable, submissionTimestamp)); } diff --git a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java index 5e0131504a954..6c509120eb317 100644 --- a/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java +++ b/nephele/nephele-server/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java @@ -537,7 +537,7 @@ public JobSubmissionResult submitJob(JobGraph job) throws IOException { // Register job with the progress collector if (this.eventCollector != null) { - this.eventCollector.registerJob(eg, jobRunsWithProfiling); + this.eventCollector.registerJob(eg, jobRunsWithProfiling, System.currentTimeMillis()); } // Check if profiling should be enabled for this job diff --git a/nephele/nephele-visualization/src/main/java/eu/stratosphere/nephele/visualization/swt/GraphVisualizationData.java b/nephele/nephele-visualization/src/main/java/eu/stratosphere/nephele/visualization/swt/GraphVisualizationData.java index 0026140ace6f5..e65160a7cf079 100644 --- a/nephele/nephele-visualization/src/main/java/eu/stratosphere/nephele/visualization/swt/GraphVisualizationData.java +++ b/nephele/nephele-visualization/src/main/java/eu/stratosphere/nephele/visualization/swt/GraphVisualizationData.java @@ -38,12 +38,15 @@ public class GraphVisualizationData { private final boolean profilingEnabledForJob; + private final long submissionTimestamp; + public GraphVisualizationData(JobID jobID, String jobName, boolean profilingEnabledForJob, - ManagementGraph managementGraph, NetworkTopology networkTopology) { + long submissionTimestamp, ManagementGraph managementGraph, NetworkTopology networkTopology) { this.jobID = jobID; this.jobName = jobName; this.profilingEnabledForJob = profilingEnabledForJob; + this.submissionTimestamp = submissionTimestamp; this.managementGraph = managementGraph; this.networkTopology = networkTopology; } @@ -69,6 +72,10 @@ public boolean isProfilingAvailableForJob() { return this.profilingEnabledForJob; } + public long getSubmissionTimestamp() { + return this.submissionTimestamp; + } + public void detectBottlenecks() { // Detect CPU bottlenecks diff --git a/nephele/nephele-visualization/src/main/java/eu/stratosphere/nephele/visualization/swt/SWTToolTip.java b/nephele/nephele-visualization/src/main/java/eu/stratosphere/nephele/visualization/swt/SWTToolTip.java index 462d9f0e78c49..3ff98da43307a 100644 --- a/nephele/nephele-visualization/src/main/java/eu/stratosphere/nephele/visualization/swt/SWTToolTip.java +++ b/nephele/nephele-visualization/src/main/java/eu/stratosphere/nephele/visualization/swt/SWTToolTip.java @@ -68,11 +68,11 @@ protected void finishInstantiation(int x, int y, int width, boolean pack) { this.shell.setLocation(x, y + OFFSET); - this.shell.setVisible(true); - if (pack) { this.shell.pack(); } + + this.shell.setVisible(true); } public void dispose() { diff --git a/nephele/nephele-visualization/src/main/java/eu/stratosphere/nephele/visualization/swt/SWTVisualizationGUI.java b/nephele/nephele-visualization/src/main/java/eu/stratosphere/nephele/visualization/swt/SWTVisualizationGUI.java index 229e53c7441b6..cfedf75ea4f01 100644 --- a/nephele/nephele-visualization/src/main/java/eu/stratosphere/nephele/visualization/swt/SWTVisualizationGUI.java +++ b/nephele/nephele-visualization/src/main/java/eu/stratosphere/nephele/visualization/swt/SWTVisualizationGUI.java @@ -16,6 +16,8 @@ package eu.stratosphere.nephele.visualization.swt; import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -193,8 +195,9 @@ public void handleEvent(final Event event) { final String jobName = gvi.getJobName(); final JobID jobID = gvi.getJobID(); + final long submissionTimestamp = gvi.getSubmissionTimestamp(); - this.jobToolTip = new SWTJobToolTip(shell, jobName, jobID, 0L, pt.x, pt.y); + this.jobToolTip = new SWTJobToolTip(shell, jobName, jobID, submissionTimestamp, pt.x, pt.y); break; } @@ -504,12 +507,23 @@ public void run() { // Check for new jobs final List newJobs = this.jobManager.getRecentJobs(); + + // Sort jobs according to submission time stamps + Collections.sort(newJobs, new Comparator() { + + @Override + public int compare(final RecentJobEvent o1, final RecentJobEvent o2) { + + return (int) (o1.getSubmissionTimestamp() - o2.getSubmissionTimestamp()); + } + }); + if (!newJobs.isEmpty()) { final Iterator it = newJobs.iterator(); while (it.hasNext()) { final RecentJobEvent newJobEvent = it.next(); addJob(newJobEvent.getJobID(), newJobEvent.getJobName(), newJobEvent.isProfilingAvailable(), - newJobEvent.getTimestamp()); + newJobEvent.getSubmissionTimestamp(), newJobEvent.getTimestamp()); } } @@ -584,8 +598,8 @@ private void updateView() { ((SWTJobTabItem) control).updateView(); } - private void addJob(JobID jobID, String jobName, boolean isProfilingAvailable, final long referenceTime) - throws IOException { + private void addJob(JobID jobID, String jobName, boolean isProfilingAvailable, final long submissionTimestamp, + final long referenceTime) throws IOException { synchronized (this.recentJobs) { @@ -600,7 +614,7 @@ private void addJob(JobID jobID, String jobName, boolean isProfilingAvailable, f // Create graph visualization object final GraphVisualizationData graphVisualizationData = new GraphVisualizationData(jobID, jobName, - isProfilingAvailable, managementGraph, networkTopology); + isProfilingAvailable, submissionTimestamp, managementGraph, networkTopology); managementGraph.setAttachment(graphVisualizationData); final Iterator it = new ManagementGraphIterator(managementGraph, true);