Skip to content

Commit

Permalink
[FLINK-2527] [gelly] Ensure that VertexUpdateFunction.setNewVertexVal…
Browse files Browse the repository at this point in the history
…ue is called at most once per updateVertex

This closes apache#1027
  • Loading branch information
ggevay authored and StephanEwen committed Aug 17, 2015
1 parent e4e44be commit 0ea0bc1
Showing 1 changed file with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void setOptDegrees(boolean optDegrees) {

/**
* This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
* the incoming messages. It may set a new vertex state via {@link #setNewVV(Object)}. If the vertex
* the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
* state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
*
* @param vertex The vertex.
Expand All @@ -105,10 +105,16 @@ public void postSuperstep() throws Exception {}

/**
* Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
*
* This should be called at most once per updateVertex.
*
* @param newValue The new vertex value.
*/
public void setNewVertexValue(VV newValue) {
if(setNewVertexValueCalled) {
throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex");
}
setNewVertexValueCalled = true;
if(isOptDegrees()) {
outValWithDegrees.f1.f0 = newValue;
outWithDegrees.collect(outValWithDegrees);
Expand Down Expand Up @@ -178,20 +184,24 @@ public <T> Collection<T> getBroadcastSet(String name) {

private long outDegree = -1;

private boolean setNewVertexValueCalled;

void init(IterationRuntimeContext context) {
this.runtimeContext = context;
}

void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
this.outVal = outVal;
this.out = out;
setNewVertexValueCalled = false;
}

@SuppressWarnings({ "unchecked", "rawtypes" })
<ValueWithDegree> void setOutputWithDegrees(Vertex<K, ValueWithDegree> outVal,
Collector out) {
this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal;
this.outWithDegrees = out;
setNewVertexValueCalled = false;
}

/**
Expand Down

0 comments on commit 0ea0bc1

Please sign in to comment.