Skip to content

Commit

Permalink
[FLINK-2501] [py] Remove the need to specify types for transformations
Browse files Browse the repository at this point in the history
Full changelog:

Major changes
===============
- Users no longer have to supply information about types
- Values are now stored as byte arrays on the Java side in
* a plain byte[] most of the time,
* a T2<b[],b[]> within a join/cross
* a T2<TX<b[]...b[]>, b[]> within keyed operations.
- Every value contains information about its type at the beginning of
each byte array.
- Implemented KeySelectors

Minor
===============
- improved error messages in several places
- defaultable operations now use a "usesUDF" flag
- reshuffled type ID's; tuple type encoded as 1-25
- broadcast variables are now sent via the tcp socket
- ProjectJoin/-Cross now executes projection on python side

Java
---------------
- Sort field now stored as String, continuation of FLINK-2431
- object->byte[] serializer code moved into separate utility class

Python
---------------
- Fixed NullSerializer not taking a read method argument
- Serializer/Deserializer interface added
- Refactored DataSet structure
* Set and ReduceSet merged into DataSet
- configure() now takes an OperationInfo argument
- Simplified GroupReduce tests
- removed unused Function._open()
- simplified chaining setup
- most functions now use super.configure()
  • Loading branch information
supermegaciaccount committed Jan 20, 2016
1 parent 54b52c9 commit ab84707
Show file tree
Hide file tree
Showing 39 changed files with 1,426 additions and 1,283 deletions.
20 changes: 10 additions & 10 deletions docs/apis/batch/dataset_transformations.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ val intSums = intPairs.map { pair => pair._1 + pair._2 }
<div data-lang="python" markdown="1">

~~~python
intSums = intPairs.map(lambda x: sum(x), INT)
intSums = intPairs.map(lambda x: sum(x))
~~~

</div>
Expand Down Expand Up @@ -115,7 +115,7 @@ val words = textLines.flatMap { _.split(" ") }
<div data-lang="python" markdown="1">

~~~python
words = lines.flat_map(lambda x,c: [line.split() for line in x], STRING)
words = lines.flat_map(lambda x,c: [line.split() for line in x])
~~~

</div>
Expand Down Expand Up @@ -163,7 +163,7 @@ val counts = texLines.mapPartition { in => Some(in.size) }
<div data-lang="python" markdown="1">

~~~python
counts = lines.map_partition(lambda x,c: [sum(1 for _ in x)], INT)
counts = lines.map_partition(lambda x,c: [sum(1 for _ in x)])
~~~

</div>
Expand Down Expand Up @@ -459,7 +459,7 @@ Works analogous to grouping by Case Class fields in *Reduce* transformations.
for key in dic.keys():
collector.collect(key)

output = data.group_by(0).reduce_group(DistinctReduce(), STRING)
output = data.group_by(0).reduce_group(DistinctReduce())
~~~


Expand Down Expand Up @@ -539,7 +539,7 @@ val output = input.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup {
for key in dic.keys():
collector.collect(key)

output = data.group_by(0).sort_group(1, Order.ASCENDING).reduce_group(DistinctReduce(), STRING)
output = data.group_by(0).sort_group(1, Order.ASCENDING).reduce_group(DistinctReduce())
~~~


Expand Down Expand Up @@ -644,7 +644,7 @@ class MyCombinableGroupReducer
def combine(self, iterator, collector):
return self.reduce(iterator, collector)

data.reduce_group(GroupReduce(), (STRING, INT, FLOAT), combinable=True)
data.reduce_group(GroupReduce(), combinable=True)
~~~

</div>
Expand Down Expand Up @@ -864,7 +864,7 @@ val output = input.reduceGroup(new MyGroupReducer())
<div data-lang="python" markdown="1">

~~~python
output = data.reduce_group(MyGroupReducer(), ... )
output = data.reduce_group(MyGroupReducer())
~~~

</div>
Expand Down Expand Up @@ -1223,7 +1223,7 @@ val weightedRatings = ratings.join(weights).where("category").equalTo(0) {

weightedRatings =
ratings.join(weights).where(0).equal_to(0). \
with(new PointWeighter(), (STRING, FLOAT));
with(new PointWeighter());
~~~

</div>
Expand Down Expand Up @@ -1719,7 +1719,7 @@ val distances = coords1.cross(coords2) {
def cross(self, c1, c2):
return (c1[0], c2[0], sqrt(pow(c1[1] - c2.[1], 2) + pow(c1[2] - c2[2], 2)))

distances = coords1.cross(coords2).using(Euclid(), (INT,INT,FLOAT))
distances = coords1.cross(coords2).using(Euclid())
~~~

#### Cross with Projection
Expand Down Expand Up @@ -1878,7 +1878,7 @@ val output = iVals.coGroup(dVals).where(0).equalTo(0) {
collector.collect(value[1] * i)


output = ivals.co_group(dvals).where(0).equal_to(0).using(CoGroup(), DOUBLE)
output = ivals.co_group(dvals).where(0).equal_to(0).using(CoGroup())
~~~

</div>
Expand Down
82 changes: 44 additions & 38 deletions docs/apis/batch/python.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ to run it locally.

{% highlight python %}
from flink.plan.Environment import get_environment
from flink.plan.Constants import INT, STRING
from flink.functions.GroupReduceFunction import GroupReduceFunction

class Adder(GroupReduceFunction):
Expand All @@ -59,27 +58,26 @@ class Adder(GroupReduceFunction):
count += sum([x[0] for x in iterator])
collector.collect((count, word))

if __name__ == "__main__":
env = get_environment()
data = env.from_elements("Who's there?",
"I think I hear them. Stand, ho! Who's there?")
env = get_environment()
data = env.from_elements("Who's there?",
"I think I hear them. Stand, ho! Who's there?")

data \
.flat_map(lambda x, c: [(1, word) for word in x.lower().split()], (INT, STRING)) \
.group_by(1) \
.reduce_group(Adder(), (INT, STRING), combinable=True) \
.output()
data \
.flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
.group_by(1) \
.reduce_group(Adder(), combinable=True) \
.output()

env.execute(local=True)
env.execute(local=True)
{% endhighlight %}

{% top %}

Program Skeleton
----------------

As we already saw in the example, Flink programs look like regular python
programs with a `if __name__ == "__main__":` block. Each program consists of the same basic parts:
As we already saw in the example, Flink programs look like regular python programs.
Each program consists of the same basic parts:

1. Obtain an `Environment`,
2. Load/create the initial data,
Expand Down Expand Up @@ -117,7 +115,7 @@ methods on DataSet with your own custom transformation function. For example,
a map transformation looks like this:

{% highlight python %}
data.map(lambda x: x*2, INT)
data.map(lambda x: x*2)
{% endhighlight %}

This will create a new DataSet by doubling every value in the original DataSet.
Expand Down Expand Up @@ -197,7 +195,7 @@ examples.
<td>
<p>Takes one element and produces one element.</p>
{% highlight python %}
data.map(lambda x: x * 2, INT)
data.map(lambda x: x * 2)
{% endhighlight %}
</td>
</tr>
Expand All @@ -208,8 +206,7 @@ data.map(lambda x: x * 2, INT)
<p>Takes one element and produces zero, one, or more elements. </p>
{% highlight python %}
data.flat_map(
lambda x,c: [(1,word) for word in line.lower().split() for line in x],
(INT, STRING))
lambda x,c: [(1,word) for word in line.lower().split() for line in x])
{% endhighlight %}
</td>
</tr>
Expand All @@ -221,7 +218,7 @@ data.flat_map(
as an `Iterator` and can produce an arbitrary number of result values. The number of
elements in each partition depends on the degree-of-parallelism and previous operations.</p>
{% highlight python %}
data.map_partition(lambda x,c: [value * 2 for value in x], INT)
data.map_partition(lambda x,c: [value * 2 for value in x])
{% endhighlight %}
</td>
</tr>
Expand Down Expand Up @@ -260,7 +257,7 @@ class Adder(GroupReduceFunction):
count += sum([x[0] for x in iterator)
collector.collect((count, word))

data.reduce_group(Adder(), (INT, STRING))
data.reduce_group(Adder())
{% endhighlight %}
</td>
</tr>
Expand Down Expand Up @@ -392,24 +389,33 @@ They are also the only way to define an optional `combine` function for a reduce
Lambda functions allow the easy insertion of one-liners. Note that a lambda function has to return
an iterable, if the operation can return multiple values. (All functions receiving a collector argument)

Flink requires type information at the time when it prepares the program for execution
(when the main method of the program is called). This is done by passing an exemplary
object that has the desired type. This holds also for tuples.
{% top %}

Data Types
----------

Flink's Python API currently only offers native support for primitive python types (int, float, bool, string) and byte arrays.

The type support can be extended by passing a serializer, deserializer and type class to the environment.
{% highlight python %}
(INT, STRING)
{% endhighlight %}
class MyObj(object):
def __init__(self, i):
self.value = i

Would denote a tuple containing an int and a string. Note that for Operations that work strictly on tuples (like cross), no braces are required.

There are a few Constants defined in flink.plan.Constants that allow this in a more readable fashion.
class MySerializer(object):
def serialize(self, value):
return struct.pack(">i", value.value)

{% top %}

Data Types
----------
class MyDeserializer(object):
def _deserialize(self, read):
i = struct.unpack(">i", read(4))[0]
return MyObj(i)


Flink's Python API currently only supports primitive python types (int, float, bool, string) and byte arrays.
env.register_custom_type(MyObj, MySerializer(), MyDeserializer())
{% endhighlight %}

#### Tuples/Lists

Expand All @@ -419,7 +425,7 @@ a fix number of fields of various types (up to 25). Every field of a tuple can b
{% highlight python %}
word_counts = env.from_elements(("hello", 1), ("world",2))

counts = word_counts.map(lambda x: x[1], INT)
counts = word_counts.map(lambda x: x[1])
{% endhighlight %}

When working with operators that require a Key for grouping or matching records,
Expand Down Expand Up @@ -455,16 +461,16 @@ Collection-based:
{% highlight python %}
env = get_environment

# read text file from local files system
\# read text file from local files system
localLiens = env.read_text("file:#/path/to/my/textfile")

read text file from a HDFS running at nnHost:nnPort
\# read text file from a HDFS running at nnHost:nnPort
hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")

read a CSV file with three fields
\# read a CSV file with three fields, schema defined using constants defined in flink.plan.Constants
csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))

create a set from some given elements
\# create a set from some given elements
values = env.from_elements("Foo", "bar", "foobar", "fubar")
{% endhighlight %}

Expand Down Expand Up @@ -530,7 +536,7 @@ toBroadcast = env.from_elements(1, 2, 3)
data = env.from_elements("a", "b")

# 2. Broadcast the DataSet
data.map(MapperBcv(), INT).with_broadcast_set("bcv", toBroadcast)
data.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast)
{% endhighlight %}

Make sure that the names (`bcv` in the previous example) match when registering and
Expand Down Expand Up @@ -568,9 +574,9 @@ execution environment as follows:
env = get_environment()
env.set_degree_of_parallelism(3)

text.flat_map(lambda x,c: x.lower().split(), (INT, STRING)) \
text.flat_map(lambda x,c: x.lower().split()) \
.group_by(1) \
.reduce_group(Adder(), (INT, STRING), combinable=True) \
.reduce_group(Adder(), combinable=True) \
.output()

env.execute()
Expand Down
Loading

0 comments on commit ab84707

Please sign in to comment.