Skip to content

Commit

Permalink
[hotfix] [py] Code cleanup - PythonPlanBinder
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Apr 6, 2017
1 parent bba49d6 commit 940d16c
Showing 1 changed file with 22 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@
*/
package org.apache.flink.python.api;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Random;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
Expand All @@ -40,24 +33,32 @@
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.python.api.functions.util.NestedKeyDiscarder;
import org.apache.flink.python.api.functions.util.StringTupleDeserializerMap;
import org.apache.flink.python.api.PythonOperationInfo.DatasizeHint;
import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.HUGE;
import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.NONE;
import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.TINY;
import org.apache.flink.python.api.functions.PythonCoGroup;
import org.apache.flink.python.api.functions.util.IdentityGroupReduce;
import org.apache.flink.python.api.functions.PythonMapPartition;
import org.apache.flink.python.api.functions.util.IdentityGroupReduce;
import org.apache.flink.python.api.functions.util.KeyDiscarder;
import org.apache.flink.python.api.functions.util.NestedKeyDiscarder;
import org.apache.flink.python.api.functions.util.SerializerMap;
import org.apache.flink.python.api.functions.util.StringDeserializerMap;
import org.apache.flink.python.api.functions.util.StringTupleDeserializerMap;
import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
import org.apache.flink.python.api.util.SetCache;
import org.apache.flink.runtime.filecache.FileCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Random;

import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.HUGE;
import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.NONE;
import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.TINY;

/**
* This class allows the execution of a Flink plan written in python.
*/
Expand Down Expand Up @@ -116,7 +117,7 @@ public static void main(String[] args) throws Exception {
binder.runPlan(Arrays.copyOfRange(args, 1, args.length));
}

public PythonPlanBinder() throws IOException {
public PythonPlanBinder() {
Configuration conf = GlobalConfiguration.loadConfiguration();
FLINK_PYTHON2_BINARY_PATH = conf.getString(FLINK_PYTHON2_BINARY_KEY, "python");
FLINK_PYTHON3_BINARY_PATH = conf.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
Expand Down Expand Up @@ -164,6 +165,7 @@ private void runPlan(String[] args) throws Exception {
}

//=====Setup========================================================================================================

/**
* Copies all files to a common directory (FLINK_PYTHON_FILE_PATH). This allows us to distribute it as one big
* package, and resolves PYTHONPATH issues.
Expand All @@ -186,7 +188,7 @@ private void prepareFiles(String tempFilePath, String... filePaths) throws IOExc
}
}

private static void clearPath(String path) throws IOException, URISyntaxException {
private static void clearPath(String path) throws IOException {
FileSystem fs = FileSystem.get(new Path(path).toUri());
if (fs.exists(new Path(path))) {
fs.delete(new Path(path), true);
Expand Down Expand Up @@ -232,9 +234,9 @@ private void close() {
local.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
local.delete(new Path(FLINK_TMP_DATA_DIR), true);
streamer.close();
} catch (NullPointerException npe) {
} catch (NullPointerException ignored) {
} catch (IOException ioe) {
LOG.error("PythonAPI file cleanup failed. " + ioe.getMessage());
LOG.error("PythonAPI file cleanup failed. {}", ioe.getMessage());
} catch (URISyntaxException use) { // can't occur
}
}
Expand All @@ -249,6 +251,7 @@ private void receivePlan() throws IOException {
}

//====Environment===================================================================================================

/**
* This enum contains the identifiers for all supported environment parameters.
*/
Expand Down Expand Up @@ -285,6 +288,7 @@ private void receiveParameters() throws IOException {
}

//====Operations====================================================================================================

/**
* This enum contains the identifiers for all supported DataSet operations.
*/
Expand All @@ -300,12 +304,7 @@ private void receiveOperations() throws IOException {
Integer operationCount = (Integer) streamer.getRecord(true);
for (int x = 0; x < operationCount; x++) {
PythonOperationInfo info = new PythonOperationInfo(streamer, currentEnvironmentID);
Operation op;
try {
op = Operation.valueOf(info.identifier.toUpperCase());
} catch (IllegalArgumentException iae) {
throw new IllegalArgumentException("Invalid operation specified: " + info.identifier);
}
Operation op = Operation.valueOf(info.identifier.toUpperCase());
switch (op) {
case SOURCE_CSV:
createCsvSource(info);
Expand Down

0 comments on commit 940d16c

Please sign in to comment.