Skip to content

Commit

Permalink
[FLINK-1038] added RemoteCollectorOutputFormat
Browse files Browse the repository at this point in the history
This closes apache#94
  • Loading branch information
fatschi authored and StephanEwen committed Sep 21, 2014
1 parent 9463e27 commit 7f946ce
Show file tree
Hide file tree
Showing 5 changed files with 543 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.example.java.remotecollectoroutputformat;

import java.util.HashSet;
import java.util.Set;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.RemoteCollectorConsumer;
import org.apache.flink.api.java.io.RemoteCollectorImpl;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
* Implements the "WordCount" program that computes a simple word occurrence
* histogram over some sample data and collects the results with an
* implementation of a {@link RemoteCollectorConsumer}.
*/
@SuppressWarnings("serial")
public class RemoteCollectorOutputFormatExample {

public static void main(String[] args) throws Exception {

/**
* We create a remote {@link ExecutionEnvironment} here, because this
* OutputFormat is designed for use in a distributed setting. For local
* use you should consider using the {@link LocalCollectionOutputFormat
* <T>}.
*/
final ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("<remote>", 6124,
"/path/to/your/file.jar");

// get input data
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,");

DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new LineSplitter())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0).aggregate(Aggregations.SUM, 1);

// emit result
RemoteCollectorImpl.collectLocal(counts,
new RemoteCollectorConsumer<Tuple2<String, Integer>>() {
// user defined IRemoteCollectorConsumer
@Override
public void collect(Tuple2<String, Integer> element) {
System.out.println("word/occurrences:" + element);
}
});

// local collection to store results in
Set<Tuple2<String, Integer>> collection = new HashSet<Tuple2<String, Integer>>();
// collect results from remote in local collection
RemoteCollectorImpl.collectLocal(counts, collection);

// execute program
env.execute("WordCount Example with RemoteCollectorOutputFormat");

System.out.println(collection);
}

//
// User Functions
//

/**
* Implements the string tokenizer that splits sentences into words as a
* user-defined FlatMapFunction. The function takes a line (String) and
* splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
* Integer>).
*/
public static final class LineSplitter implements
FlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");

// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.io;

import java.rmi.Remote;
import java.rmi.RemoteException;

/**
* This interface is the counterpart to the {@link RemoteCollectorOutputFormat}
* and implementations will receive remote results through the collect function.
*
* @param <T>
* The type of the records the collector will receive
*/
public interface RemoteCollector<T> extends Remote {

public void collect(T element) throws RemoteException;

public RemoteCollectorConsumer<T> getConsumer() throws RemoteException;

public void setConsumer(RemoteCollectorConsumer<T> consumer)
throws RemoteException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.io;

/**
* This interface describes consumers of {@link RemoteCollector} implementations.
*/
public interface RemoteCollectorConsumer<T> {
public void collect(T element);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.io;

import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.rmi.AlreadyBoundException;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.UUID;

import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSink;

/**
* This class provides a counterpart implementation for the
* {@link RemoteCollectorOutputFormat}.
*/

public class RemoteCollectorImpl<T> extends UnicastRemoteObject implements
RemoteCollector<T> {

private static final long serialVersionUID = 1L;

/**
* Instance of an implementation of a {@link RemoteCollectorConsumer}. This
* instance will get the records passed.
*/

private RemoteCollectorConsumer<T> consumer;

/**
* This factory method creates an instance of the
* {@link RemoteCollectorImpl} and binds it in the local RMI
* {@link Registry}.
*
* @param port
* The port where the local colector is listening.
* @param consumer
* The consumer instance.
* @param rmiId
* An ID to register the collector in the RMI registry.
* @return
*/
public static <T> void createAndBind(Integer port, RemoteCollectorConsumer<T> consumer, String rmiId) {
RemoteCollectorImpl<T> collectorInstance = null;

try {
collectorInstance = new RemoteCollectorImpl<T>();

Registry registry;

registry = LocateRegistry.createRegistry(port);
registry.bind(rmiId, collectorInstance);
} catch (RemoteException e) {
e.printStackTrace();
} catch (AlreadyBoundException e) {
e.printStackTrace();
}

collectorInstance.setConsumer(consumer);
}

/**
* Writes a DataSet to a {@link RemoteCollectorConsumer} through an
* {@link RemoteCollector} remotely called from the
* {@link RemoteCollectorOutputFormat}.<br/>
*
* @return The DataSink that writes the DataSet.
*/
public static <T> DataSink<T> collectLocal(DataSet<T> source,
RemoteCollectorConsumer<T> consumer) {
// if the RMI parameter was not set by the user make a "good guess"
String ip = System.getProperty("java.rmi.server.hostname");
if (ip == null) {
Enumeration<NetworkInterface> networkInterfaces = null;
try {
networkInterfaces = NetworkInterface.getNetworkInterfaces();
} catch (Throwable t) {
throw new RuntimeException(t);
}
while (networkInterfaces.hasMoreElements()) {
NetworkInterface networkInterface = (NetworkInterface) networkInterfaces
.nextElement();
Enumeration<InetAddress> inetAddresses = networkInterface
.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
InetAddress inetAddress = (InetAddress) inetAddresses
.nextElement();
if (!inetAddress.isLoopbackAddress()
&& inetAddress instanceof Inet4Address) {
ip = inetAddress.getHostAddress();
System.setProperty("java.rmi.server.hostname", ip);
}
}
}
}

// get some random free port
Integer randomPort = 0;
try {
ServerSocket tmp = new ServerSocket(0);
randomPort = tmp.getLocalPort();
tmp.close();
} catch (Throwable t) {
throw new RuntimeException(t);
}

// create an ID for this output format instance
String rmiId = String.format("%s-%s", RemoteCollectorOutputFormat.class.getName(), UUID.randomUUID());

// create the local listening object and bind it to the RMI registry
RemoteCollectorImpl.createAndBind(randomPort, consumer, rmiId);

// create and configure the output format
OutputFormat<T> remoteCollectorOutputFormat = new RemoteCollectorOutputFormat<T>(ip, randomPort, rmiId);

// create sink
return source.output(remoteCollectorOutputFormat);
}

/**
* Writes a DataSet to a local {@link Collection} through an
* {@link RemoteCollector} and a standard {@link RemoteCollectorConsumer}
* implementation remotely called from the
* {@link RemoteCollectorOutputFormat}.<br/>
*
* @param local
* @param port
* @param collection
*/
public static <T> void collectLocal(DataSet<T> source,
Collection<T> collection) {
final Collection<T> synchronizedCollection = Collections
.synchronizedCollection(collection);
collectLocal(source, new RemoteCollectorConsumer<T>() {
@Override
public void collect(T element) {
synchronizedCollection.add(element);
}
});
}

/**
* Necessary private default constructor.
*
* @throws RemoteException
*/
private RemoteCollectorImpl() throws RemoteException {
super();
}

/**
* This method is called by the remote to collect records.
*/
@Override
public void collect(T element) throws RemoteException {
this.consumer.collect(element);
}

@Override
public RemoteCollectorConsumer<T> getConsumer() {
return this.consumer;
}

@Override
public void setConsumer(RemoteCollectorConsumer<T> consumer) {
this.consumer = consumer;
}
}
Loading

0 comments on commit 7f946ce

Please sign in to comment.