Skip to content

Commit

Permalink
[FLINK-3877] [gelly] Create TranslateFunction interface for Graph tra…
Browse files Browse the repository at this point in the history
…nslators

The TranslateFunction interface is similar to MapFunction but may be
called multiple times before serialization.

This closes apache#1968
  • Loading branch information
greghogan committed May 9, 2016
1 parent 70181b2 commit a27208d
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 118 deletions.
6 changes: 3 additions & 3 deletions docs/apis/batch/libs/gelly.md
Original file line number Diff line number Diff line change
Expand Up @@ -2193,7 +2193,7 @@ DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> pairDegree = graph
<tr>
<td>translate.<br/><strong>TranslateGraphIds</strong></td>
<td>
<p>Translate vertex and edge IDs using the given <code>MapFunction</code>.</p>
<p>Translate vertex and edge IDs using the given <code>TranslateFunction</code>.</p>
{% highlight java %}
graph.run(new TranslateGraphIds(new LongValueToStringValue()));
{% endhighlight %}
Expand All @@ -2203,7 +2203,7 @@ graph.run(new TranslateGraphIds(new LongValueToStringValue()));
<tr>
<td>translate.<br/><strong>TranslateVertexValues</strong></td>
<td>
<p>Translate vertex values using the given <code>MapFunction</code>.</p>
<p>Translate vertex values using the given <code>TranslateFunction</code>.</p>
{% highlight java %}
graph.run(new TranslateVertexValues(new LongValueAddOffset(vertexCount)));
{% endhighlight %}
Expand All @@ -2213,7 +2213,7 @@ graph.run(new TranslateVertexValues(new LongValueAddOffset(vertexCount)));
<tr>
<td>translate.<br/><strong>TranslateEdgeValues</strong></td>
<td>
<p>Translate edge values using the given <code>MapFunction</code>.</p>
<p>Translate edge values using the given <code>TranslateFunction</code>.</p>
{% highlight java %}
graph.run(new TranslateEdgeValues(new Nullify()));
{% endhighlight %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.{tuple => jtuple}
import org.apache.flink.api.scala._
import org.apache.flink.graph._
import org.apache.flink.graph.asm.translate.TranslateFunction
import org.apache.flink.graph.validation.GraphValidator
import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction}
import org.apache.flink.graph.spargel.{MessagingFunction, ScatterGatherConfiguration, VertexUpdateFunction}
import org.apache.flink.{graph => jg}

import _root_.scala.collection.JavaConverters._
import _root_.scala.reflect.ClassTag
import org.apache.flink.types.NullValue
Expand Down Expand Up @@ -412,7 +414,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
* @param translator implements conversion from K to NEW
* @return graph with translated vertex and edge IDs
*/
def translateGraphIds[NEW: TypeInformation : ClassTag](translator: MapFunction[K, NEW]):
def translateGraphIds[NEW: TypeInformation : ClassTag](translator: TranslateFunction[K, NEW]):
Graph[NEW, VV, EV] = {
new Graph[NEW, VV, EV](jgraph.translateGraphIds(translator))
}
Expand All @@ -423,15 +425,15 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
* @param fun implements conversion from K to NEW
* @return graph with translated vertex and edge IDs
*/
def translateGraphIds[NEW: TypeInformation : ClassTag](fun: K => NEW):
def translateGraphIds[NEW: TypeInformation : ClassTag](fun: (K, NEW) => NEW):
Graph[NEW, VV, EV] = {
val mapper: MapFunction[K, NEW] = new MapFunction[K, NEW] {
val translator: TranslateFunction[K, NEW] = new TranslateFunction[K, NEW] {
val cleanFun = clean(fun)

def map(in: K): NEW = cleanFun(in)
def translate(in: K, reuse: NEW): NEW = cleanFun(in, reuse)
}

new Graph[NEW, VV, EV](jgraph.translateGraphIds(mapper))
new Graph[NEW, VV, EV](jgraph.translateGraphIds(translator))
}

/**
Expand All @@ -440,8 +442,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
* @param translator implements conversion from VV to NEW
* @return graph with translated vertex values
*/
def translateVertexValues[NEW: TypeInformation : ClassTag](translator: MapFunction[VV, NEW]):
Graph[K, NEW, EV] = {
def translateVertexValues[NEW: TypeInformation : ClassTag](translator:
TranslateFunction[VV, NEW]): Graph[K, NEW, EV] = {
new Graph[K, NEW, EV](jgraph.translateVertexValues(translator))
}

Expand All @@ -451,15 +453,15 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
* @param fun implements conversion from VV to NEW
* @return graph with translated vertex values
*/
def translateVertexValues[NEW: TypeInformation : ClassTag](fun: VV => NEW):
def translateVertexValues[NEW: TypeInformation : ClassTag](fun: (VV, NEW) => NEW):
Graph[K, NEW, EV] = {
val mapper: MapFunction[VV, NEW] = new MapFunction[VV, NEW] {
val translator: TranslateFunction[VV, NEW] = new TranslateFunction[VV, NEW] {
val cleanFun = clean(fun)

def map(in: VV): NEW = cleanFun(in)
def translate(in: VV, reuse: NEW): NEW = cleanFun(in, reuse)
}

new Graph[K, NEW, EV](jgraph.translateVertexValues(mapper))
new Graph[K, NEW, EV](jgraph.translateVertexValues(translator))
}

/**
Expand All @@ -468,7 +470,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
* @param translator implements conversion from EV to NEW
* @return graph with translated edge values
*/
def translateEdgeValues[NEW: TypeInformation : ClassTag](translator: MapFunction[EV, NEW]):
def translateEdgeValues[NEW: TypeInformation : ClassTag](translator: TranslateFunction[EV, NEW]):
Graph[K, VV, NEW] = {
new Graph[K, VV, NEW](jgraph.translateEdgeValues(translator))
}
Expand All @@ -479,15 +481,15 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
* @param fun implements conversion from EV to NEW
* @return graph with translated edge values
*/
def translateEdgeValues[NEW: TypeInformation : ClassTag](fun: EV => NEW):
def translateEdgeValues[NEW: TypeInformation : ClassTag](fun: (EV, NEW) => NEW):
Graph[K, VV, NEW] = {
val mapper: MapFunction[EV, NEW] = new MapFunction[EV, NEW] {
val translator: TranslateFunction[EV, NEW] = new TranslateFunction[EV, NEW] {
val cleanFun = clean(fun)

def map(in: EV): NEW = cleanFun(in)
def translate(in: EV, reuse: NEW): NEW = cleanFun(in, reuse)
}

new Graph[K, VV, NEW](jgraph.translateEdgeValues(mapper))
new Graph[K, VV, NEW](jgraph.translateEdgeValues(translator))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.graph.asm.translate.TranslateEdgeValues;
import org.apache.flink.graph.asm.translate.TranslateFunction;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.asm.translate.TranslateVertexValues;
import org.apache.flink.graph.gsa.ApplyFunction;
Expand Down Expand Up @@ -557,7 +558,7 @@ public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper)
* @return graph with translated vertex and edge IDs
* @throws Exception
*/
public <NEW> Graph<NEW, VV, EV> translateGraphIds(MapFunction<K, NEW> translator) throws Exception {
public <NEW> Graph<NEW, VV, EV> translateGraphIds(TranslateFunction<K, NEW> translator) throws Exception {
return run(new TranslateGraphIds<K, NEW, VV, EV>(translator));
}

Expand All @@ -569,7 +570,7 @@ public <NEW> Graph<NEW, VV, EV> translateGraphIds(MapFunction<K, NEW> translator
* @return graph with translated vertex values
* @throws Exception
*/
public <NEW> Graph<K, NEW, EV> translateVertexValues(MapFunction<VV, NEW> translator) throws Exception {
public <NEW> Graph<K, NEW, EV> translateVertexValues(TranslateFunction<VV, NEW> translator) throws Exception {
return run(new TranslateVertexValues<K, VV, NEW, EV>(translator));
}

Expand All @@ -581,7 +582,7 @@ public <NEW> Graph<K, NEW, EV> translateVertexValues(MapFunction<VV, NEW> transl
* @return graph with translated edge values
* @throws Exception
*/
public <NEW> Graph<K, VV, NEW> translateEdgeValues(MapFunction<EV, NEW> translator) throws Exception {
public <NEW> Graph<K, VV, NEW> translateEdgeValues(TranslateFunction<EV, NEW> translator) throws Exception {
return run(new TranslateEdgeValues<K, VV, EV, NEW>(translator));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@

package org.apache.flink.graph.asm.translate;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.types.LongValue;

/**
* Translate {@link LongValue} by adding a constant offset value.
*/
public class LongValueAddOffset
implements MapFunction<LongValue, LongValue> {
implements TranslateFunction<LongValue, LongValue> {

private final long offset;

private LongValue output = new LongValue();

/**
* Translate {@link LongValue} by adding a constant offset value.
*
Expand All @@ -41,9 +38,13 @@ public LongValueAddOffset(long offset) {
}

@Override
public LongValue map(LongValue value)
public LongValue translate(LongValue value, LongValue reuse)
throws Exception {
output.setValue(offset + value.getValue());
return output;
if (reuse == null) {
reuse = new LongValue();
}

reuse.setValue(offset + value.getValue());
return reuse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.graph.asm.translate;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;

Expand All @@ -28,20 +27,22 @@
* Throws {@link RuntimeException} for integer overflow.
*/
public class LongValueToIntValue
implements MapFunction<LongValue, IntValue> {

private IntValue output = new IntValue();
implements TranslateFunction<LongValue, IntValue> {

@Override
public IntValue map(LongValue value)
public IntValue translate(LongValue value, IntValue reuse)
throws Exception {
long val = value.getValue();

if (val > Integer.MAX_VALUE) {
throw new RuntimeException("LongValue input overflows IntValue output");
}

output.setValue((int) val);
return output;
if (reuse == null) {
reuse = new IntValue();
}

reuse.setValue((int) val);
return reuse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,23 @@

package org.apache.flink.graph.asm.translate;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.StringValue;

/**
* Translate {@link LongValue} to {@link StringValue}.
*/
public class LongValueToStringValue
implements MapFunction<LongValue, StringValue> {

private StringValue output = new StringValue();
implements TranslateFunction<LongValue, StringValue> {

@Override
public StringValue map(LongValue value)
public StringValue translate(LongValue value, StringValue reuse)
throws Exception {
output.setValue(Long.toString(value.getValue()));
return output;
if (reuse == null) {
reuse = new StringValue();
}

reuse.setValue(Long.toString(value.getValue()));
return reuse;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.asm.translate;

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.RichFunction;

/**
* Rich variant of the {@link TranslateFunction}. As a {@link RichFunction}, it gives access to the
* {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
* {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
* {@link RichFunction#close()}.
*
* @param <IN> Type of the input elements.
* @param <OUT> Type of the returned elements.
*/
public abstract class RichTranslateFunction<IN, OUT> extends AbstractRichFunction implements TranslateFunction<IN, OUT> {

private static final long serialVersionUID = 1L;

@Override
public abstract OUT translate(IN value, OUT reuse) throws Exception;
}
Loading

0 comments on commit a27208d

Please sign in to comment.