Skip to content

Commit

Permalink
[FLINK-8123][py] Bundle python scripts in jar
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Nov 21, 2017
1 parent 44c603d commit c4107d4
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 39 deletions.
9 changes: 0 additions & 9 deletions flink-dist/src/main/assemblies/bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,6 @@ under the License.
</includes>
</fileSet>

<!-- copy python package -->
<fileSet>
<directory>../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api</directory>
<outputDirectory>resources/python</outputDirectory>
<fileMode>0755</fileMode>
<excludes>
<exclude>**/example/**</exclude>
</excludes>
</fileSet>
<!-- copy python example to examples of dist -->
<fileSet>
<directory>../flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example</directory>
Expand Down
66 changes: 50 additions & 16 deletions flink-libraries/flink-python/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,58 @@ under the License.
<artifactId>flink-python_${scala.binary.version}</artifactId>
<name>flink-python</name>
<packaging>jar</packaging>

<build>
<resources>
<resource>
<!-- include the zip generated by the assembly-plugin in the jar as a resource -->
<directory>target</directory>
<includes>
<include>python-source.zip</include>
</includes>
</resource>
</resources>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>org.apache.flink.python.api.PythonPlanBinder</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<!-- generate zip containing the flink python library -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptor>src/assembly/python.xml</descriptor>
<finalName>python-source</finalName>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<phase>generate-resources</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.python.api.PythonPlanBinder</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
37 changes: 37 additions & 0 deletions flink-libraries/flink-python/src/assembly/python.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id>python</id>
<formats>
<format>zip</format>
</formats>

<includeBaseDirectory>false</includeBaseDirectory>

<fileSets>
<fileSet>
<directory>src/main/python/org/apache/flink/python/api/flink</directory>
<outputDirectory>flink</outputDirectory>
</fileSet>
</fileSets>

</assembly>
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.python.api.PythonOperationInfo.DatasizeHint;
Expand All @@ -45,6 +46,7 @@
import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
import org.apache.flink.python.api.util.SetCache;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.util.IOUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -54,6 +56,8 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.UUID;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.HUGE;
import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.NONE;
Expand All @@ -72,12 +76,8 @@ public class PythonPlanBinder {
public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_";
public static final String PLAN_ARGUMENTS_KEY = "python.plan.arguments";

private static final String FLINK_PYTHON_REL_LOCAL_PATH = File.separator + "resources" + File.separator + "python";

private final Configuration operatorConfig;

private final String pythonLibraryPath;

private final String tmpPlanFilesDir;
private Path tmpDistributedDir;

Expand Down Expand Up @@ -110,13 +110,6 @@ public PythonPlanBinder(Configuration globalConfig) {

tmpDistributedDir = new Path(globalConfig.getString(PythonOptions.DC_TMP_DIR));

String flinkRootDir = System.getenv("FLINK_ROOT_DIR");
pythonLibraryPath = flinkRootDir != null
//command-line
? flinkRootDir + FLINK_PYTHON_REL_LOCAL_PATH
//testing
: new File(System.getProperty("user.dir"), "src/main/python/org/apache/flink/python/api").getAbsolutePath();

operatorConfig = new Configuration();
operatorConfig.setString(PythonOptions.PYTHON_BINARY_PATH, globalConfig.getString(PythonOptions.PYTHON_BINARY_PATH));
String configuredTmpDataDir = globalConfig.getString(PythonOptions.DATA_TMP_DIR);
Expand Down Expand Up @@ -163,10 +156,16 @@ void runPlan(String[] args) throws Exception {
}
}

// copy flink library, plan file and additional files to temporary location
// setup temporary local directory for flink python library and user files
Path targetDir = new Path(tmpPlanFilesDir);
deleteIfExists(targetDir);
targetDir.getFileSystem().mkdirs(targetDir);

// extract and unzip flink library to temporary location
unzipPythonLibrary(new Path(tmpPlanFilesDir));

// copy user files to temporary location
Path tmpPlanFilesPath = new Path(tmpPlanFilesDir);
deleteIfExists(tmpPlanFilesPath);
FileCache.copy(new Path(pythonLibraryPath), tmpPlanFilesPath, false);
copyFile(planPath, tmpPlanFilesPath, FLINK_PYTHON_PLAN_NAME);
for (String file : filesToCopy) {
Path source = new Path(file);
Expand Down Expand Up @@ -213,6 +212,34 @@ void runPlan(String[] args) throws Exception {
}
}

private static void unzipPythonLibrary(Path targetDir) throws IOException {
FileSystem targetFs = targetDir.getFileSystem();
ClassLoader classLoader = PythonPlanBinder.class.getClassLoader();
ZipInputStream zis = new ZipInputStream(classLoader.getResourceAsStream("python-source.zip"));
ZipEntry entry = zis.getNextEntry();
while (entry != null) {
String fileName = entry.getName();
Path newFile = new Path(targetDir, fileName);
if (entry.isDirectory()) {
targetFs.mkdirs(newFile);
} else {
try {
LOG.debug("Unzipping to {}.", newFile);
FSDataOutputStream fsDataOutputStream = targetFs.create(newFile, FileSystem.WriteMode.NO_OVERWRITE);
IOUtils.copyBytes(zis, fsDataOutputStream, false);
} catch (Exception e) {
zis.closeEntry();
zis.close();
throw new IOException("Failed to unzip flink python library.", e);
}
}

zis.closeEntry();
entry = zis.getNextEntry();
}
zis.closeEntry();
}

//=====Setup========================================================================================================

private static void deleteIfExists(Path path) throws IOException {
Expand Down

0 comments on commit c4107d4

Please sign in to comment.