Skip to content

Commit

Permalink
Fixed apache#185
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Warneke committed Jul 19, 2012
1 parent 3d52529 commit bee443e
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public abstract class AbstractEvent implements IOReadableWritable {
/**
* Static variable that points to the current global sequence number
*/
private static final AtomicLong globalSequenceNumber = new AtomicLong(0);
private static final AtomicLong GLOBAL_SEQUENCE_NUMBER = new AtomicLong(0);

/**
* Auxiliary object which helps to convert a {@link Date} object to the given string representation.
*/
Expand All @@ -51,7 +51,7 @@ public abstract class AbstractEvent implements IOReadableWritable {
* The sequence number of the event.
*/
private long sequenceNumber = -1;

/**
* Constructs a new abstract event object.
*
Expand All @@ -60,10 +60,10 @@ public abstract class AbstractEvent implements IOReadableWritable {
*/
public AbstractEvent(final long timestamp) {
this.timestamp = timestamp;
this.sequenceNumber = globalSequenceNumber.incrementAndGet();
this.sequenceNumber = GLOBAL_SEQUENCE_NUMBER.incrementAndGet();
}
public long getSequenceNumber(){

public long getSequenceNumber() {
return this.sequenceNumber;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public final class RecentJobEvent extends AbstractEvent implements ManagementEve
*/
private boolean isProfilingEnabled;

/**
* The time stamp of the job submission.
*/
private long submissionTimestamp;

/**
* Constructs a new event.
*
Expand All @@ -62,11 +67,13 @@ public final class RecentJobEvent extends AbstractEvent implements ManagementEve
* the status of the job
* @param isProfilingEnabled
* <code>true</code> if profiling is enabled for this job, <code>false</code> otherwise
* @param submissionTimestamp
* the time stamp of the job submission
* @param timestamp
* the time stamp of the event
*/
public RecentJobEvent(final JobID jobID, final String jobName, final JobStatus jobStatus,
final boolean isProfilingEnabled, final long timestamp) {
final boolean isProfilingEnabled, final long submissionTimestamp, final long timestamp) {
super(timestamp);

if (jobStatus == null) {
Expand All @@ -77,6 +84,7 @@ public RecentJobEvent(final JobID jobID, final String jobName, final JobStatus j
this.jobName = jobName;
this.jobStatus = jobStatus;
this.isProfilingEnabled = isProfilingEnabled;
this.submissionTimestamp = submissionTimestamp;
}

/**
Expand Down Expand Up @@ -122,6 +130,16 @@ public JobStatus getJobStatus() {
return this.jobStatus;
}

/**
* Returns the time stamp of the job submission.
*
* @return the time stamp of the job submission
*/
public long getSubmissionTimestamp() {

return this.submissionTimestamp;
}

/**
* {@inheritDoc}
*/
Expand All @@ -141,6 +159,9 @@ public void read(final DataInput in) throws IOException {

// Read if profiling is enabled
this.isProfilingEnabled = in.readBoolean();

// Read the submission time stamp
this.submissionTimestamp = in.readLong();
}

/**
Expand All @@ -161,6 +182,9 @@ public void write(final DataOutput out) throws IOException {

// Write out if profiling is enabled
out.writeBoolean(this.isProfilingEnabled);

// Write out the submission time stamp
out.writeLong(this.submissionTimestamp);
}

/**
Expand Down Expand Up @@ -191,6 +215,10 @@ public boolean equals(final Object obj) {
return false;
}

if (this.submissionTimestamp != newJobEvent.getSubmissionTimestamp()) {
return false;
}

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ private static final class JobStatusListenerWrapper implements JobStatusListener
*/
private final boolean isProfilingAvailable;

/**
* The time stamp of the job submission
*/
private final long submissionTimestamp;

/**
* Constructs a new job status listener wrapper.
*
Expand All @@ -188,12 +193,16 @@ private static final class JobStatusListenerWrapper implements JobStatusListener
* the name of the job
* @param isProfilingAvailable
* <code>true</code> if profiling events are collected for the job, <code>false</code> otherwise
* @param submissionTimestamp
* the submission time stamp of the job
*/
public JobStatusListenerWrapper(final EventCollector eventCollector, final String jobName,
final boolean isProfilingAvailable) {
final boolean isProfilingAvailable, final long submissionTimestamp) {

this.eventCollector = eventCollector;
this.jobName = jobName;
this.isProfilingAvailable = isProfilingAvailable;
this.submissionTimestamp = submissionTimestamp;
}

/**
Expand All @@ -214,7 +223,8 @@ public void jobStatusHasChanged(final ExecutionGraph executionGraph, final Inter
// Update recent job event
final JobStatus jobStatus = InternalJobStatus.toJobStatus(newJobStatus);
if (jobStatus != null) {
this.eventCollector.updateRecentJobEvent(jobID, this.jobName, this.isProfilingAvailable, jobStatus);
this.eventCollector.updateRecentJobEvent(jobID, this.jobName, this.isProfilingAvailable,
this.submissionTimestamp, jobStatus);

this.eventCollector.addEvent(jobID,
new JobEvent(System.currentTimeMillis(), jobStatus, optionalMessage));
Expand Down Expand Up @@ -423,17 +433,20 @@ private void addEvent(JobID jobID, AbstractEvent event) {
* the name of the new job
* @param isProfilingEnabled
* <code>true</code> if profiling events are collected for the job, <code>false</code> otherwise
* @param initialJobStatus
* the initial status of the job
* @param submissionTimestamp
* the submission time stamp of the job
* @param jobStatus
* the status of the job
*/
private void updateRecentJobEvent(final JobID jobID, final String jobName, final boolean isProfilingEnabled,
final JobStatus jobStatus) {
final long submissionTimestamp, final JobStatus jobStatus) {

final long currentTime = System.currentTimeMillis();
final RecentJobEvent recentJobEvent = new RecentJobEvent(jobID, jobName, jobStatus, isProfilingEnabled,
currentTime);
submissionTimestamp, currentTime);

synchronized (this.recentJobs) {

this.recentJobs.put(jobID, recentJobEvent);
}
}
Expand All @@ -449,8 +462,11 @@ private void updateRecentJobEvent(final JobID jobID, final String jobName, final
* the execution graph representing the job
* @param profilingAvailable
* indicates if profiling data is available for this job
* @param submissionTimestamp
* the submission time stamp of the job
*/
public void registerJob(final ExecutionGraph executionGraph, final boolean profilingAvailable) {
public void registerJob(final ExecutionGraph executionGraph, final boolean profilingAvailable,
final long submissionTimestamp) {

final Iterator<ExecutionVertex> it = new ExecutionGraphIterator(executionGraph, true);

Expand All @@ -470,7 +486,7 @@ public void registerJob(final ExecutionGraph executionGraph, final boolean profi

// Register one job status listener wrapper for the entire job
executionGraph.registerJobStatusListener(new JobStatusListenerWrapper(this, executionGraph.getJobName(),
profilingAvailable));
profilingAvailable, submissionTimestamp));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ public JobSubmissionResult submitJob(JobGraph job) throws IOException {

// Register job with the progress collector
if (this.eventCollector != null) {
this.eventCollector.registerJob(eg, jobRunsWithProfiling);
this.eventCollector.registerJob(eg, jobRunsWithProfiling, System.currentTimeMillis());
}

// Check if profiling should be enabled for this job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ public class GraphVisualizationData {

private final boolean profilingEnabledForJob;

private final long submissionTimestamp;

public GraphVisualizationData(JobID jobID, String jobName, boolean profilingEnabledForJob,
ManagementGraph managementGraph, NetworkTopology networkTopology) {
long submissionTimestamp, ManagementGraph managementGraph, NetworkTopology networkTopology) {

this.jobID = jobID;
this.jobName = jobName;
this.profilingEnabledForJob = profilingEnabledForJob;
this.submissionTimestamp = submissionTimestamp;
this.managementGraph = managementGraph;
this.networkTopology = networkTopology;
}
Expand All @@ -69,6 +72,10 @@ public boolean isProfilingAvailableForJob() {
return this.profilingEnabledForJob;
}

public long getSubmissionTimestamp() {
return this.submissionTimestamp;
}

public void detectBottlenecks() {

// Detect CPU bottlenecks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ protected void finishInstantiation(int x, int y, int width, boolean pack) {

this.shell.setLocation(x, y + OFFSET);

this.shell.setVisible(true);

if (pack) {
this.shell.pack();
}

this.shell.setVisible(true);
}

public void dispose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package eu.stratosphere.nephele.visualization.swt;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -193,8 +195,9 @@ public void handleEvent(final Event event) {

final String jobName = gvi.getJobName();
final JobID jobID = gvi.getJobID();
final long submissionTimestamp = gvi.getSubmissionTimestamp();

this.jobToolTip = new SWTJobToolTip(shell, jobName, jobID, 0L, pt.x, pt.y);
this.jobToolTip = new SWTJobToolTip(shell, jobName, jobID, submissionTimestamp, pt.x, pt.y);
break;
}

Expand Down Expand Up @@ -504,12 +507,23 @@ public void run() {

// Check for new jobs
final List<RecentJobEvent> newJobs = this.jobManager.getRecentJobs();

// Sort jobs according to submission time stamps
Collections.sort(newJobs, new Comparator<RecentJobEvent>() {

@Override
public int compare(final RecentJobEvent o1, final RecentJobEvent o2) {

return (int) (o1.getSubmissionTimestamp() - o2.getSubmissionTimestamp());
}
});

if (!newJobs.isEmpty()) {
final Iterator<RecentJobEvent> it = newJobs.iterator();
while (it.hasNext()) {
final RecentJobEvent newJobEvent = it.next();
addJob(newJobEvent.getJobID(), newJobEvent.getJobName(), newJobEvent.isProfilingAvailable(),
newJobEvent.getTimestamp());
newJobEvent.getSubmissionTimestamp(), newJobEvent.getTimestamp());
}
}

Expand Down Expand Up @@ -584,8 +598,8 @@ private void updateView() {
((SWTJobTabItem) control).updateView();
}

private void addJob(JobID jobID, String jobName, boolean isProfilingAvailable, final long referenceTime)
throws IOException {
private void addJob(JobID jobID, String jobName, boolean isProfilingAvailable, final long submissionTimestamp,
final long referenceTime) throws IOException {

synchronized (this.recentJobs) {

Expand All @@ -600,7 +614,7 @@ private void addJob(JobID jobID, String jobName, boolean isProfilingAvailable, f

// Create graph visualization object
final GraphVisualizationData graphVisualizationData = new GraphVisualizationData(jobID, jobName,
isProfilingAvailable, managementGraph, networkTopology);
isProfilingAvailable, submissionTimestamp, managementGraph, networkTopology);

managementGraph.setAttachment(graphVisualizationData);
final Iterator<ManagementVertex> it = new ManagementGraphIterator(managementGraph, true);
Expand Down

0 comments on commit bee443e

Please sign in to comment.