Skip to content

Commit

Permalink
Removed execution service from execution graph and replaced by akka's…
Browse files Browse the repository at this point in the history
… futures.
  • Loading branch information
tillrohrmann committed Dec 18, 2014
1 parent b1cb66f commit ac94253
Show file tree
Hide file tree
Showing 70 changed files with 1,128 additions and 2,530 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,20 @@ public class ClusterUtil {
*
* @param jobGraph
* jobGraph
* @param degreeOfPrallelism
* @param degreeOfParallelism
* numberOfTaskTrackers
* @param memorySize
* memorySize
*/
public static void runOnMiniCluster(JobGraph jobGraph, int degreeOfPrallelism, long memorySize)
throws Exception {
public static void runOnMiniCluster(JobGraph jobGraph, int degreeOfParallelism,
long memorySize) throws Exception {

Configuration configuration = jobGraph.getJobConfiguration();

LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(null);

configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_SLOTS, degreeOfParallelism);
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, degreeOfParallelism);
if (LOG.isInfoEnabled()) {
LOG.info("Running on mini cluster");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.apache.flink.api.common.accumulators;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

import org.apache.flink.core.io.IOReadableWritable;

/**
* Interface for custom accumulator objects. Data are written to in a UDF and
Expand All @@ -39,7 +41,7 @@
* Type of the accumulator result as it will be reported to the
* client
*/
public interface Accumulator<V, R> extends IOReadableWritable, Serializable {
public interface Accumulator<V, R> extends Serializable, Cloneable{

/**
* @param value
Expand All @@ -65,4 +67,21 @@ public interface Accumulator<V, R> extends IOReadableWritable, Serializable {
*/
void merge(Accumulator<V, R> other);

/**
* Serialization method of accumulators
*
* @param oos
* @throws IOException
*/
void write(ObjectOutputStream oos) throws IOException;

/**
* Deserialization method of accumulators
*
* @param ois
* @throws IOException
*/
void read(ObjectInputStream ois) throws IOException;

Accumulator<V, R> clone();
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,15 @@ public static void resetAndClearAccumulators(
accumulators.clear();
}

public static Map<String, Accumulator<?, ?>> copy(final Map<String, Accumulator<?,
?>> accumulators) {
Map<String, Accumulator<?, ?>> result = new HashMap<String, Accumulator<?, ?>>();

for(Map.Entry<String, Accumulator<?, ?>> entry: accumulators.entrySet()){
result.put(entry.getKey(), entry.getValue().clone());
}

return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
package org.apache.flink.api.common.accumulators;

import java.io.IOException;

import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

public class DoubleCounter implements SimpleAccumulator<Double> {

Expand Down Expand Up @@ -51,15 +50,23 @@ public void resetLocal() {
}

@Override
public void write(DataOutputView out) throws IOException {
public void write(ObjectOutputStream out) throws IOException {
out.writeDouble(localValue);
}

@Override
public void read(DataInputView in) throws IOException {
public void read(ObjectInputStream in) throws IOException {
this.localValue = in.readDouble();
}

@Override
public Accumulator<Double, Double> clone() {
DoubleCounter result = new DoubleCounter();
result.localValue = localValue;

return result;
}

@Override
public String toString() {
return "DoubleCounter object. Local value: " + this.localValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
package org.apache.flink.api.common.accumulators;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Map;
import java.util.TreeMap;

import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

/**
* Histogram for discrete-data. Let's you populate a histogram distributedly.
* Implemented as a Integer->Integer TreeMap, so that the entries are sorted
Expand Down Expand Up @@ -77,7 +76,7 @@ public String toString() {
}

@Override
public void write(DataOutputView out) throws IOException {
public void write(ObjectOutputStream out) throws IOException {
out.writeInt(treeMap.size());
for (Map.Entry<Integer, Integer> entry : treeMap.entrySet()) {
out.writeInt(entry.getKey());
Expand All @@ -86,11 +85,20 @@ public void write(DataOutputView out) throws IOException {
}

@Override
public void read(DataInputView in) throws IOException {
public void read(ObjectInputStream in) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; ++i) {
treeMap.put(in.readInt(), in.readInt());
}
}

@Override
public Accumulator<Integer, Map<Integer, Integer>> clone() {
Histogram result = new Histogram();

result.treeMap = new TreeMap<Integer, Integer>(treeMap);

return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
package org.apache.flink.api.common.accumulators;

import java.io.IOException;

import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

public class IntCounter implements SimpleAccumulator<Integer> {

Expand Down Expand Up @@ -51,15 +50,23 @@ public void resetLocal() {
}

@Override
public void write(DataOutputView out) throws IOException {
public void write(ObjectOutputStream out) throws IOException {
out.writeInt(localValue);
}

@Override
public void read(DataInputView in) throws IOException {
public void read(ObjectInputStream in) throws IOException {
localValue = in.readInt();
}

@Override
public Accumulator<Integer, Integer> clone() {
IntCounter result = new IntCounter();
result.localValue = localValue;

return result;
}

@Override
public String toString() {
return "IntCounter object. Local value: " + this.localValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@
package org.apache.flink.api.common.accumulators;

import java.io.IOException;

import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;


import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

public class LongCounter implements SimpleAccumulator<Long> {

Expand Down Expand Up @@ -53,15 +50,23 @@ public void resetLocal() {
}

@Override
public void write(DataOutputView out) throws IOException {
public void write(ObjectOutputStream out) throws IOException {
out.writeLong(this.localValue);
}

@Override
public void read(DataInputView in) throws IOException {
public void read(ObjectInputStream in) throws IOException {
this.localValue = in.readLong();
}


@Override
public Accumulator<Long, Long> clone() {
LongCounter result = new LongCounter();
result.localValue = localValue;

return result;
}

@Override
public String toString() {
return "LongCounter object. Local value: " + this.localValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.core.io;

import java.io.IOException;
import java.io.Serializable;

import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
Expand All @@ -31,7 +30,7 @@
* to a binary representation.
* When implementing this Interface make sure that the implementing class has a default (zero-argument) constructor!
*/
public interface IOReadableWritable extends Serializable {
public interface IOReadableWritable {

/**
* Writes the object's internal data to the given data output view.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.flink.examples.java.relational;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -30,9 +32,6 @@
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;


/**
* This program filters lines from a CSV file with empty fields. In doing so, it counts the number of empty fields per
Expand Down Expand Up @@ -193,7 +192,15 @@ public boolean filter(final Tuple t) {
public static class VectorAccumulator implements Accumulator<Integer, List<Integer>> {

/** Stores the accumulated vector components. */
private final List<Integer> resultVector = new ArrayList<Integer>();
private final List<Integer> resultVector;

public VectorAccumulator(){
this(new ArrayList<Integer>());
}

public VectorAccumulator(List<Integer> resultVector){
this.resultVector = resultVector;
}

/**
* Increases the result vector component at the specified position by 1.
Expand Down Expand Up @@ -238,7 +245,7 @@ public void merge(final Accumulator<Integer, List<Integer>> other) {
}

@Override
public void write(final DataOutputView out) throws IOException {
public void write(final ObjectOutputStream out) throws IOException {
// binary serialization of the result vector:
// [number of components, component 0, component 1, ...]
out.writeInt(this.resultVector.size());
Expand All @@ -248,7 +255,7 @@ public void write(final DataOutputView out) throws IOException {
}

@Override
public void read(final DataInputView in) throws IOException {
public void read(final ObjectInputStream in) throws IOException {
// binary deserialization of the result vector
final int size = in.readInt();
for (int numReadComponents = 0; numReadComponents < size; numReadComponents++) {
Expand All @@ -257,5 +264,12 @@ public void read(final DataInputView in) throws IOException {
}
}

@Override
public Accumulator<Integer, List<Integer>> clone() {
VectorAccumulator result = new VectorAccumulator(new ArrayList<Integer>(resultVector));

return result;
}

}
}
12 changes: 8 additions & 4 deletions flink-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ under the License.
<artifactId>flink-runtime</artifactId>
<name>flink-runtime</name>

<properties>
<akka.version>2.3.6</akka.version>
</properties>

<packaging>jar</packaging>

<dependencies>
Expand Down Expand Up @@ -106,25 +110,25 @@ under the License.
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.10</artifactId>
<version>2.3.5</version>
<version>${akka.version}</version>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.10</artifactId>
<version>2.3.5</version>
<version>${akka.version}</version>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.10</artifactId>
<version>2.3.5</version>
<version>${akka.version}</version>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.10</artifactId>
<version>2.3.5</version>
<version>${akka.version}</version>
<scope>test</scope>
</dependency>

Expand Down
Loading

0 comments on commit ac94253

Please sign in to comment.