Skip to content

Commit

Permalink
add spark and batch runner
Browse files Browse the repository at this point in the history
  • Loading branch information
ideal committed Jul 22, 2018
1 parent 919a1a6 commit 66eeecc
Show file tree
Hide file tree
Showing 116 changed files with 3,439 additions and 867 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ sylph builds use Maven and requires Java 10 or higher.
# Clean the build
./gradlew clean
```
vm
-Dconfig=etc/sylph/sylph.properties
-Dlog4j.file=etc/sylph/sylph-log4j.properties

env
FLINK_HOME=/ideal/hadoop/flink
HADOOP_CONF_DIR=/ideal/hadoop/hadoop/etc/hadoop

## Useful mailing lists
1. [email protected] - For discussions about code, design and features
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ allprojects{
}

ext.deps = [
flink: '1.5.0',
flink: '1.5.1',
jetty: "9.4.6.v20170531", //8.1.17.v20150415 "9.4.6.v20170531"
hadoop: "2.7.4",
plugin_core: "1.0.4",
spark: "2.3.1",
scala: '2.11.8',
joda_time:'2.9.3',
log4j12:'1.7.21',
Expand Down
6 changes: 3 additions & 3 deletions build.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#!/bin/bash

#JAVA10_HOME
echo JAVA10_HOME=$JAVA10_HOME
export JAVA_HOME=$JAVA10_HOME
export PATH=$JAVA10_HOME/bin:$PATH
#echo JAVA10_HOME=$JAVA10_HOME
#export JAVA_HOME=$JAVA10_HOME
#export PATH=$JAVA10_HOME/bin:$PATH
java -version

./gradlew -v
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.8-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.9-bin.zip
17 changes: 17 additions & 0 deletions ideal-common/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

dependencies {
compile ('io.airlift:configuration:0.171'){
exclude(module: 'guice')
exclude(module: 'guava')
exclude(module: "guice-multibindings")
}
compile (group: 'com.google.inject.extensions', name: 'guice-multibindings', version: deps.guice){
exclude(module: "guava")
}
compile (group: 'com.google.inject', name: 'guice', version: deps.guice){
exclude(module: 'guava')
}
compile group: 'com.google.guava', name: 'guava', version: deps.guava

compile group: 'org.slf4j', name: 'slf4j-log4j12', version: deps.log4j12 //1.8.0-beta2
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package ideal.sylph.common.base;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectStreamClass;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;

public class ObjectInputStreamProxy
extends java.io.ObjectInputStream
{
private ClassLoader classLoader;

public ObjectInputStreamProxy(InputStream in)
throws IOException
{
super(in);
}

/**
* ObjectInputStreamProxy used by user classLoader
* <p>
*
* @param classLoader used by loadObject
*/
public ObjectInputStreamProxy(InputStream in, ClassLoader classLoader)
throws IOException
{
super(in);
this.classLoader = classLoader;
}

/**
* get Method LatestUserDefinedLoader with java.io.ObjectInputStreamProxy
* with jdk.internal.misc.VM.latestUserDefinedLoader()
*/
public static ClassLoader getLatestUserDefinedLoader()
{
//super.latestUserDefinedLoader();
Class<?> class1 = java.io.ObjectInputStream.class;
try {
Method method = class1.getDeclaredMethod("latestUserDefinedLoader");
method.setAccessible(true); //必须要加这个才能
return (ClassLoader) method.invoke(null);
}
catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException("Not compatible with java version");
}
}

/**
* get field primClasses with java.io.ObjectInputStreamProxy
*/
private static Map<String, Class<?>> getPrimClasses()
{
Class<?> class1 = java.io.ObjectInputStream.class;
Map<String, Class<?>> primClasses = null;
try {
Field field = class1.getDeclaredField("primClasses");
field.setAccessible(true);
primClasses = (Map<String, Class<?>>) field.get(class1);
return primClasses;
}
catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Not compatible with java version");
}
}

@Override
protected Class<?> resolveClass(ObjectStreamClass desc)
throws IOException, ClassNotFoundException
{
if (classLoader == null) {
return super.resolveClass(desc);
}

//return super.resolveClass(desc);
String name = desc.getName();
try {
return Class.forName(name, false, classLoader);
}
catch (ClassNotFoundException ex) {
Class<?> cl = getPrimClasses().get(name);
if (cl != null) {
return cl;
}
else {
throw ex;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static Object byteToObject(byte[] bytes)
throws IOException, ClassNotFoundException
{
try (ByteArrayInputStream bi = new ByteArrayInputStream(bytes);
ObjectInputStream oi = new ObjectInputStream(bi)
ObjectInputStreamProxy oi = new ObjectInputStreamProxy(bi)
) {
return oi.readObject();
}
Expand All @@ -35,7 +35,7 @@ public static Object byteToObject(byte[] bytes, ClassLoader classLoader)
throws IOException, ClassNotFoundException
{
try (ByteArrayInputStream bi = new ByteArrayInputStream(bytes);
ObjectInputStream oi = new ObjectInputStream(bi, classLoader)
ObjectInputStreamProxy oi = new ObjectInputStreamProxy(bi, classLoader)
) {
return oi.readObject();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ideal.sylph.spi.bootstrap;
package ideal.sylph.common.bootstrap;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
Expand All @@ -12,7 +12,6 @@
import io.airlift.configuration.ConfigurationLoader;
import io.airlift.configuration.ConfigurationModule;
import io.airlift.configuration.ValidationErrorModule;
import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -83,9 +82,8 @@ public Bootstrap requireExplicitBindings(boolean requireExplicitBindings)
public Injector initialize()
throws Exception
{
logger.info("=========Bootstrap initialize...========");
ConfigurationLoader loader = new ConfigurationLoader();
String log4j = System.getProperty("log4j.file");
PropertyConfigurator.configure(log4j);

Map<String, String> requiredProperties = new TreeMap<>();
if (requiredConfigurationProperties == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

import ideal.sylph.common.graph.Node;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import java.util.function.UnaryOperator;

import static java.util.Objects.requireNonNull;

Expand All @@ -14,14 +15,14 @@ public class DagNode<T>
{
private final String id;
private List<Node<T>> nextNodes = new ArrayList<>();
private T tempData;
private transient T tempData;

private Function<T, T> function;
private Serializable nodeFunc;

public DagNode(String id, Function<T, T> function)
public DagNode(String id, UnaryOperator<T> nodeFunc)
{
this.id = id;
this.function = function;
this.nodeFunc = (UnaryOperator<T> & Serializable) nodeFunc;
}

@Override
Expand Down Expand Up @@ -51,11 +52,11 @@ public void addNextNode(Node<T> node)
@Override
public void action(Node<T> parentNode)
{
UnaryOperator<T> function = (UnaryOperator<T>) this.nodeFunc;
if (parentNode == null) { //根节点
this.tempData = function.apply(null); //进行变换
}
else { //叶子节点
//System.out.println("我是:" + toString() + "来自:" + parentNode.toString() + "-->" + toString());
T parentOutput = requireNonNull(parentNode.getOutput(), parentNode.getId() + " return is null");
this.tempData = function.apply(parentOutput); //进行变换
}
Expand Down
54 changes: 54 additions & 0 deletions ideal-common/src/main/java/ideal/sylph/common/io/IOUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package ideal.sylph.common.io;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;

public class IOUtils
{
private IOUtils() {}

/**
* Copies from one stream to another.
*
* @param in InputStrem to read from
* @param out OutputStream to write to
* @param buffSize the size of the buffer
* @param close whether or not close the InputStream and
* OutputStream at the end. The streams are closed in the finally clause.
*/
public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close)
throws IOException
{
if (close) {
try (InputStream input = in; OutputStream output = out) {
copyBytes(in, out, buffSize);
}
}
else {
copyBytes(in, out, buffSize);
}
}

/**
* Copies from one stream to another.
*
* @param in InputStrem to read from
* @param out OutputStream to write to
* @param buffSize the size of the buffer
*/
public static void copyBytes(InputStream in, OutputStream out, int buffSize)
throws IOException
{
PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;
byte[] buf = new byte[buffSize];
int bytesRead = -1;
while ((bytesRead = in.read(buf)) >= 0) {
out.write(buf, 0, bytesRead);
if ((ps != null) && ps.checkError()) {
throw new IOException("Unable to write to output stream.");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ideal.sylph.common.jvm;

import ideal.sylph.common.base.ObjectInputStream;
import ideal.sylph.common.base.ObjectInputStreamProxy;
import ideal.sylph.common.base.Serializables;
import ideal.sylph.common.base.Throwables;

Expand All @@ -21,6 +21,8 @@
import java.util.List;
import java.util.stream.Collectors;

import static java.nio.charset.StandardCharsets.UTF_8;

public final class JVMLauncher<R extends Serializable>
{
private VmCallable<R> callable;
Expand Down Expand Up @@ -59,7 +61,7 @@ public byte[] startAndGetByte()
try (OutputStream os = new BufferedOutputStream(process.getOutputStream())) {
os.write(Serializables.serialize(callable)); //把当前对象 发送到编译进程
}
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream(), UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
System.err.println(line); //打印编译进程的控制台输出
Expand Down Expand Up @@ -107,7 +109,7 @@ public static void main(String[] args)
{
System.out.println("vm start ok ...");
VmCallable<? extends Serializable> callable;
try (ObjectInputStream ois = new ObjectInputStream(System.in)) {
try (ObjectInputStreamProxy ois = new ObjectInputStreamProxy(System.in)) {
callable = (VmCallable<? extends Serializable>) ois.readObject();
}
System.out.println("vm start init ok ...");
Expand Down
26 changes: 26 additions & 0 deletions ideal-common/src/main/java/ideal/sylph/common/jvm/JVMUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package ideal.sylph.common.jvm;

import java.io.File;
import java.util.Arrays;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class JVMUtil
{
private JVMUtil() {}

/**
* 当前class.path里面所有的jar
*/
public static Set<File> systemJars()
{
String[] jars = System.getProperty("java.class.path")
.split(Pattern.quote(File.pathSeparator));
Set<File> res = Arrays.stream(jars).map(File::new).filter(File::isFile)
.collect(Collectors.toSet());
//res.forEach(x -> logger.info("systemJars: {}", x));
//logger.info("flink job systemJars size: {}", res.size());
return res;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package ideal.sylph.common.memory;

public interface MemoryAllocator
{
/**
* Whether to fill newly allocated and deallocated memory with 0xa5 and 0x5a bytes respectively.
* This helps catch misuse of uninitialized or freed memory, but imposes some overhead.
*/
boolean MEMORY_DEBUG_FILL_ENABLED = Boolean.parseBoolean(
System.getProperty("spark.memory.debugFill", "false"));

// Same as jemalloc's debug fill values.
byte MEMORY_DEBUG_FILL_CLEAN_VALUE = (byte) 0xa5;
byte MEMORY_DEBUG_FILL_FREED_VALUE = (byte) 0x5a;

/**
* Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed
* to be zeroed out (call `fill(0)` on the result if this is necessary).
*/
MemoryBlock allocate(long size)
throws OutOfMemoryError;

void free(MemoryBlock memory);

MemoryAllocator UNSAFE = new UnsafeMemoryAllocator();
}
Loading

0 comments on commit 66eeecc

Please sign in to comment.