From c4107d4c336ed8dbadc03a7018eb255f4df3d1cc Mon Sep 17 00:00:00 2001 From: zentol Date: Tue, 21 Nov 2017 14:15:24 +0100 Subject: [PATCH] [FLINK-8123][py] Bundle python scripts in jar --- flink-dist/src/main/assemblies/bin.xml | 9 --- flink-libraries/flink-python/pom.xml | 66 ++++++++++++++----- .../flink-python/src/assembly/python.xml | 37 +++++++++++ .../flink/python/api/PythonPlanBinder.java | 55 ++++++++++++---- 4 files changed, 128 insertions(+), 39 deletions(-) create mode 100644 flink-libraries/flink-python/src/assembly/python.xml diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml index eb6867dd3e3d9..4415d25e822c8 100644 --- a/flink-dist/src/main/assemblies/bin.xml +++ b/flink-dist/src/main/assemblies/bin.xml @@ -193,15 +193,6 @@ under the License. - - - ../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api - resources/python - 0755 - - **/example/** - - ../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example diff --git a/flink-libraries/flink-python/pom.xml b/flink-libraries/flink-python/pom.xml index 310be721c558e..eaa5b871bcd0b 100644 --- a/flink-libraries/flink-python/pom.xml +++ b/flink-libraries/flink-python/pom.xml @@ -30,24 +30,58 @@ under the License. flink-python_${scala.binary.version} flink-python jar + + + + + + target + + python-source.zip + + + - - - org.apache.maven.plugins - maven-jar-plugin - - - jar-with-dependencies - - - - true - org.apache.flink.python.api.PythonPlanBinder - - - - + + + org.apache.maven.plugins + maven-assembly-plugin + + src/assembly/python.xml + python-source + false + + + + generate-resources + + single + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + package + + shade + + + + + org.apache.flink.python.api.PythonPlanBinder + + + + + + diff --git a/flink-libraries/flink-python/src/assembly/python.xml b/flink-libraries/flink-python/src/assembly/python.xml new file mode 100644 index 0000000000000..4487b7c771d6d --- /dev/null +++ b/flink-libraries/flink-python/src/assembly/python.xml @@ -0,0 +1,37 @@ + + + python + + zip + + + false + + + + src/main/python/org/apache/flink/python/api/flink + flink + + + + diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index e0c8215fa276f..e4aa518113500 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -31,6 +31,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.python.api.PythonOperationInfo.DatasizeHint; @@ -45,6 +46,7 @@ import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer; import org.apache.flink.python.api.util.SetCache; import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.util.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +56,8 @@ import java.io.IOException; import java.util.Arrays; import java.util.UUID; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.HUGE; import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.NONE; @@ -72,12 +76,8 @@ public class PythonPlanBinder { public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_"; public static final String PLAN_ARGUMENTS_KEY = "python.plan.arguments"; - private static final String FLINK_PYTHON_REL_LOCAL_PATH = File.separator + "resources" + File.separator + "python"; - private final Configuration operatorConfig; - private final String pythonLibraryPath; - private final String tmpPlanFilesDir; private Path tmpDistributedDir; @@ -110,13 +110,6 @@ public PythonPlanBinder(Configuration globalConfig) { tmpDistributedDir = new Path(globalConfig.getString(PythonOptions.DC_TMP_DIR)); - String flinkRootDir = System.getenv("FLINK_ROOT_DIR"); - pythonLibraryPath = flinkRootDir != null - //command-line - ? flinkRootDir + FLINK_PYTHON_REL_LOCAL_PATH - //testing - : new File(System.getProperty("user.dir"), "src/main/python/org/apache/flink/python/api").getAbsolutePath(); - operatorConfig = new Configuration(); operatorConfig.setString(PythonOptions.PYTHON_BINARY_PATH, globalConfig.getString(PythonOptions.PYTHON_BINARY_PATH)); String configuredTmpDataDir = globalConfig.getString(PythonOptions.DATA_TMP_DIR); @@ -163,10 +156,16 @@ void runPlan(String[] args) throws Exception { } } - // copy flink library, plan file and additional files to temporary location + // setup temporary local directory for flink python library and user files + Path targetDir = new Path(tmpPlanFilesDir); + deleteIfExists(targetDir); + targetDir.getFileSystem().mkdirs(targetDir); + + // extract and unzip flink library to temporary location + unzipPythonLibrary(new Path(tmpPlanFilesDir)); + + // copy user files to temporary location Path tmpPlanFilesPath = new Path(tmpPlanFilesDir); - deleteIfExists(tmpPlanFilesPath); - FileCache.copy(new Path(pythonLibraryPath), tmpPlanFilesPath, false); copyFile(planPath, tmpPlanFilesPath, FLINK_PYTHON_PLAN_NAME); for (String file : filesToCopy) { Path source = new Path(file); @@ -213,6 +212,34 @@ void runPlan(String[] args) throws Exception { } } + private static void unzipPythonLibrary(Path targetDir) throws IOException { + FileSystem targetFs = targetDir.getFileSystem(); + ClassLoader classLoader = PythonPlanBinder.class.getClassLoader(); + ZipInputStream zis = new ZipInputStream(classLoader.getResourceAsStream("python-source.zip")); + ZipEntry entry = zis.getNextEntry(); + while (entry != null) { + String fileName = entry.getName(); + Path newFile = new Path(targetDir, fileName); + if (entry.isDirectory()) { + targetFs.mkdirs(newFile); + } else { + try { + LOG.debug("Unzipping to {}.", newFile); + FSDataOutputStream fsDataOutputStream = targetFs.create(newFile, FileSystem.WriteMode.NO_OVERWRITE); + IOUtils.copyBytes(zis, fsDataOutputStream, false); + } catch (Exception e) { + zis.closeEntry(); + zis.close(); + throw new IOException("Failed to unzip flink python library.", e); + } + } + + zis.closeEntry(); + entry = zis.getNextEntry(); + } + zis.closeEntry(); + } + //=====Setup======================================================================================================== private static void deleteIfExists(Path path) throws IOException {