Skip to content

Commit

Permalink
[FLINK-4734] [gelly] Remove use of Tuple setField for fixed position
Browse files Browse the repository at this point in the history
This closes apache#2590
  • Loading branch information
greghogan committed Oct 5, 2016
1 parent bb34133 commit 1577e89
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2206,7 +2206,7 @@ public ApplyNeighborReduceFunction(ReduceNeighborsFunction<VV> fun) {

@Override
public Tuple2<K, VV> reduce(Tuple2<K, VV> first, Tuple2<K, VV> second) throws Exception {
first.setField(function.reduceNeighbors(first.f1, second.f1), 1);
first.f1 = function.reduceNeighbors(first.f1, second.f1);
return first;
}
}
Expand Down Expand Up @@ -2256,7 +2256,7 @@ public ApplyReduceFunction(ReduceEdgesFunction<EV> fun) {

@Override
public Tuple2<K, EV> reduce(Tuple2<K, EV> first, Tuple2<K, EV> second) throws Exception {
first.setField(function.reduceEdges(first.f1, second.f1), 1);
first.f1 = function.reduceEdges(first.f1, second.f1);
return first;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ public final Iterable<Edge<K, EV>> getEdges() {
*/
public final void sendMessageToAllNeighbors(Message m) {
verifyEdgeUsage();
outMsg.setField(m, 1);
outMsg.f1 = m;
while (edges.hasNext()) {
Tuple next = edges.next();
outMsg.setField(next.getField(1), 0);
outMsg.f0 = next.getField(1);
out.collect(Either.Right(outMsg));
}
}
Expand All @@ -115,8 +115,8 @@ public final void sendMessageToAllNeighbors(Message m) {
*/
public final void sendMessageTo(K target, Message m) {

outMsg.setField(target, 0);
outMsg.setField(m, 1);
outMsg.f0 = target;
outMsg.f1 = m;

out.collect(Either.Right(outMsg));
}
Expand All @@ -134,7 +134,7 @@ public final void setNewVertexValue(VV newValue) {
}
setNewVertexValueCalled = true;

outVertex.setField(newValue, 1);
outVertex.f1 = newValue;

out.collect(Either.Left(outVertex));
}
Expand Down Expand Up @@ -213,7 +213,7 @@ void init(IterationRuntimeContext context) {
void set(K vertexId, Iterator<Edge<K, EV>> edges,
Collector<Either<Vertex<K, VV>, Tuple2<K, Message>>> out) {

this.outVertex.setField(vertexId, 0);
this.outVertex.f0 = vertexId;
this.edges = edges;
this.out = (Collector<Either<?, ?>>) (Collector<?>) out;
this.edgesUsed = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class MessageCombiner<K, Message> implements Serializable {
void set(K target, Collector<Tuple2<K, Either<NullValue, Message>>> collector) {
this.out = collector;
this.outValue = new Tuple2<>();
outValue.setField(target, 0);
outValue.f0 = target;
}

/**
Expand All @@ -62,7 +62,7 @@ void set(K target, Collector<Tuple2<K, Either<NullValue, Message>>> collector) {
* @throws Exception
*/
public final void sendCombinedMessage(Message combinedMessage) {
outValue.setField(Either.Right(combinedMessage), 1);
outValue.f1 = Either.Right(combinedMessage);
out.collect(outValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,11 @@ private static class InitializeWorkSet<K, VV, Message> extends
public void open(Configuration parameters) {
outTuple = new Tuple2<>();
nullMessage = Either.Left(NullValue.getInstance());
outTuple.setField(nullMessage, 1);
outTuple.f1 = nullMessage;
}

public Tuple2<K, Either<NullValue, Message>> map(Vertex<K, VV> vertex) {
outTuple.setField(vertex.getId(), 0);
outTuple.f0 = vertex.getId();
return outTuple;
}
}
Expand Down Expand Up @@ -474,8 +474,8 @@ private static final class AppendVertexState<K, VV, Message> implements
public Tuple2<Vertex<K, VV>, Either<NullValue, Message>> join(
Vertex<K, VV> vertex, Tuple2<K, Either<NullValue, Message>> message) {

outTuple.setField(vertex, 0);
outTuple.setField(message.f1, 1);
outTuple.f0 = vertex;
outTuple.f1 = message.f1;
return outTuple;
}
}
Expand Down Expand Up @@ -504,8 +504,8 @@ public void flatMap(Either<Vertex<K, VV>, Tuple2<K, Message>> value,

if (value.isRight()) {
Tuple2<K, Message> message = value.right();
outTuple.setField(message.f0, 0);
outTuple.setField(Either.Right(message.f1), 1);
outTuple.f0 = message.f0;
outTuple.f1 = Either.Right(message.f1);
out.collect(outTuple);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, L
if (stateIter.hasNext()) {
Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = stateIter.next();

nextVertex.setField(vertexWithDegrees.f0, 0);
nextVertex.setField(vertexWithDegrees.f1.f0, 1);
nextVertex.f0 = vertexWithDegrees.f0;
nextVertex.f1 = vertexWithDegrees.f1.f0;

scatterFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
scatterFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
Expand Down

0 comments on commit 1577e89

Please sign in to comment.