Skip to content

Commit

Permalink
spark runner add spark2.x(test)
Browse files Browse the repository at this point in the history
  • Loading branch information
ideal committed Jul 26, 2018
1 parent d596368 commit cab38b6
Show file tree
Hide file tree
Showing 30 changed files with 237 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

import ideal.sylph.common.graph.impl.DefaultGraph;

import java.io.Serializable;

public interface Graph<E>
extends Serializable
{
/**
* 创建节点
Expand All @@ -19,7 +16,7 @@ public interface Graph<E>
*/
void addEdge(String in, String out);

void build()
void run()
throws Exception;

void build(boolean parallel)
Expand Down
2 changes: 0 additions & 2 deletions ideal-common/src/main/java/ideal/sylph/common/graph/Node.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package ideal.sylph.common.graph;

import java.io.Serializable;
import java.util.Collection;

public interface Node<E>
extends Serializable
{
String getId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import ideal.sylph.common.graph.Node;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -17,12 +16,12 @@ public class DagNode<T>
private List<Node<T>> nextNodes = new ArrayList<>();
private transient T tempData;

private Serializable nodeFunc;
private UnaryOperator<T> nodeFunc;

public DagNode(String id, UnaryOperator<T> nodeFunc)
{
this.id = id;
this.nodeFunc = (UnaryOperator<T> & Serializable) nodeFunc;
this.id = requireNonNull(id, "node id is null");
this.nodeFunc = requireNonNull(nodeFunc, "nodeFunc is null");
}

@Override
Expand Down Expand Up @@ -52,13 +51,12 @@ public void addNextNode(Node<T> node)
@Override
public void action(Node<T> parentNode)
{
UnaryOperator<T> function = (UnaryOperator<T>) this.nodeFunc;
if (parentNode == null) { //根节点
this.tempData = function.apply(null); //进行变换
this.tempData = nodeFunc.apply(null); //进行变换
}
else { //叶子节点
T parentOutput = requireNonNull(parentNode.getOutput(), parentNode.getId() + " return is null");
this.tempData = function.apply(parentOutput); //进行变换
this.tempData = nodeFunc.apply(parentOutput); //进行变换
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void addEdge(String in, String out)
}

@Override
public void build()
public void run()
throws Exception
{
System.out.println("开始寻找轨迹");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,11 @@ public static void main(String[] args)
throws Exception
{
System.out.println("vm start ok ...");
VmCallable<? extends Serializable> callable;
try (ObjectInputStreamProxy ois = new ObjectInputStreamProxy(System.in)) {
callable = (VmCallable<? extends Serializable>) ois.readObject();
}
System.out.println("vm start init ok ...");
VmFuture<? extends Serializable> future = new VmFuture<>();
try {

try (ObjectInputStreamProxy ois = new ObjectInputStreamProxy(System.in)) {
VmCallable<? extends Serializable> callable = (VmCallable<? extends Serializable>) ois.readObject();
System.out.println("vm start init ok ...");
future.setResult(callable.call());
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.junit.Test;

import java.io.BufferedOutputStream;
import java.nio.ByteBuffer;

public class MemoryAllocatorTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
this.doGet1(req, resp);
}
catch (Exception e) {
resp.getWriter().println(Throwables.getRootCause(e));
resp.sendError(500, Throwables.getRootCause(e).toString());
}
}

Expand All @@ -156,10 +156,10 @@ protected void doGet1(HttpServletRequest req, HttpServletResponse resp)
String[] parts = pathInfo.split("/", 3);
checkArgument(parts.length >= 2, remoteUser + " gave an invalid proxy path " + pathInfo);
//parts[0] is empty because path info always starts with a /
String jobId = requireNonNull(parts[1], "jobId not setting");
String runId = requireNonNull(parts[1], "runId not setting");
String rest = parts.length > 2 ? parts[2] : "";

URI trackingUri = new URI(getJobUrl(jobId));
URI trackingUri = new URI(getJobUrl(runId));

// Append the user-provided path and query parameter to the original
// tracking url.
Expand All @@ -178,16 +178,18 @@ protected void doGet1(HttpServletRequest req, HttpServletResponse resp)
}
}

public String getJobUrl(String jobId)
public String getJobUrl(String id)
{
JobContainer container = sylphContext.getJobContainer(jobId)
.orElseThrow(() -> new SylphException(JOB_CONFIG_ERROR, "job " + jobId + " not Online"));
JobContainer container = sylphContext.getJobContainer(id)
.orElseGet(() -> sylphContext.getJobContainerWithRunId(id).orElseThrow(() ->
new SylphException(JOB_CONFIG_ERROR, "job " + id + " not Online"))
);
Job.Status status = container.getStatus();
if (status == Job.Status.RUNNING) {
return container.getJobUrl();
}
else {
throw new RuntimeException("job " + jobId + " Status " + status + ",is not RUNNING");
throw new RuntimeException("job " + id + " Status " + status + ",is not RUNNING");
}
}
}
2 changes: 1 addition & 1 deletion sylph-dist/src/bin/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ exec java $GRAPHX_OPTS -cp lib/*: -Dconfig=etc/sylph/sylph.properties -Dlog4j.fi


#nohup $cmd > ${0%/*}/../logs/server.log 2>&1 &
#echo "Starting YseraServer,the pid is "$!
#echo "Starting $mainClass,the pid is "$!
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.broadtech.streamingload.runtime.sparkstreaming.source
package ideal.sylph.plugins.spark.source

import ideal.sylph.api.etl.{Sink, Source, TransForm}
import org.apache.spark.rdd.RDD
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.broadtech.streamingload.runtime.sparkstreaming.source
package ideal.sylph.plugins.spark.source

import ideal.sylph.api.etl.Source
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

/**
* Created by ideal on 17-4-25.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
import com.google.inject.Inject;
import ideal.sylph.main.service.JobManager;
import ideal.sylph.main.service.RunnerManger;
import ideal.sylph.main.service.YamlFlow;
import ideal.sylph.spi.SylphContext;
import ideal.sylph.spi.exception.SylphException;
import ideal.sylph.spi.job.Job;
import ideal.sylph.spi.job.JobContainer;
import ideal.sylph.spi.job.YamlFlow;

import java.util.Collection;
import java.util.Optional;

import static ideal.sylph.spi.exception.StandardErrorCode.UNKNOWN_ERROR;
import static java.util.Objects.requireNonNull;

public class SylphContextImpl
implements SylphContext
Expand All @@ -27,13 +28,17 @@ public class SylphContextImpl
public void saveJob(String jobId, String flow, String actuatorName)
throws Exception
{
requireNonNull(jobId, "jobId is null");
requireNonNull(flow, "flow is null");
requireNonNull(actuatorName, "actuatorName is null");
Job job = runnerManger.formJobWithFlow(jobId, YamlFlow.load(flow), actuatorName);
jobManager.saveJob(job);
}

@Override
public void stopJob(String jobId)
{
requireNonNull(jobId, "jobId is null");
try {
jobManager.stopJob(jobId);
}
Expand All @@ -45,13 +50,13 @@ public void stopJob(String jobId)
@Override
public void startJob(String jobId)
{
jobManager.startJob(jobId);
jobManager.startJob(requireNonNull(jobId, "jobId is null"));
}

@Override
public void deleteJob(String jobId)
{
jobManager.removeJob(jobId);
jobManager.removeJob(requireNonNull(jobId, "jobId is null"));
}

@Override
Expand All @@ -63,12 +68,18 @@ public Collection<Job> getAllJobs()
@Override
public Optional<Job> getJob(String jobId)
{
return jobManager.getJob(jobId);
return jobManager.getJob(requireNonNull(jobId, "jobId is null"));
}

@Override
public Optional<JobContainer> getJobContainer(String jobId)
{
return jobManager.getJobContainer(jobId);
return jobManager.getJobContainer(requireNonNull(jobId, "jobId is null"));
}

@Override
public Optional<JobContainer> getJobContainerWithRunId(String runId)
{
return jobManager.getJobContainerWithRunId(requireNonNull(runId, "runId is null"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,21 @@ public void start()
/**
* get running JobContainer
*/
public Optional<JobContainer> getJobContainer(String jobId)
public Optional<JobContainer> getJobContainer(@NotNull String jobId)
{
return Optional.ofNullable(runningContainers.get(jobId));
}

/**
* get running JobContainer with this runId(demo: yarnAppId)
*/
public Optional<JobContainer> getJobContainerWithRunId(@NotNull String runId)
{
for (JobContainer container : runningContainers.values()) {
if (runId.equals(container.getRunId())) {
return Optional.ofNullable(container);
}
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import ideal.sylph.spi.job.Job;
import ideal.sylph.spi.job.JobActuator;
import ideal.sylph.spi.job.JobContainer;
import ideal.sylph.spi.job.YamlFlow;

import javax.annotation.Nonnull;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void execute(JobExecutionContext context)
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
final Graph graph = (Graph) requireNonNull(jobDataMap.get("graph"), "graph is null");
try {
graph.build();
graph.run();
}
catch (Exception e) {
throw new JobExecutionException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package ideal.sylph.runner.flink;

import com.fasterxml.jackson.databind.ObjectMapper;
import ideal.sylph.api.NodeLoader;
import ideal.sylph.common.graph.Graph;
import ideal.sylph.common.graph.impl.DagNode;
import ideal.sylph.runner.flink.etl.FlinkPluginLoaderImpl;
import ideal.sylph.spi.NodeLoader;
import ideal.sylph.spi.exception.SylphException;
import ideal.sylph.spi.job.Flow;
import ideal.sylph.spi.model.EdgeInfo;
Expand Down Expand Up @@ -76,6 +76,6 @@ public void build(StreamExecutionEnvironment execEnv)
edgeInfo.getInNodeId().split("-")[0],
edgeInfo.getOutNodeId().split("-")[0]
));
graphx.build();
graphx.run();
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
package ideal.sylph.runner.flink.etl;

import ideal.sylph.api.NodeLoader;
import ideal.sylph.api.etl.RealTimeSink;
import ideal.sylph.api.etl.RealTimeTransForm;
import ideal.sylph.api.etl.Sink;
import ideal.sylph.api.etl.Source;
import ideal.sylph.api.etl.TransForm;
import ideal.sylph.spi.NodeLoader;
import ideal.sylph.spi.exception.SylphException;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.io.Serializable;
import java.util.Map;
import java.util.function.UnaryOperator;

Expand All @@ -33,7 +32,7 @@ public UnaryOperator<DataStream<Row>> loadSource(final StreamTableEnvironment ta

source.driverInit(tableEnv, config);
//source.getSource().getType(); //判断type
return (UnaryOperator<DataStream<Row>> & Serializable) (stream) -> source.getSource();
return (stream) -> source.getSource();
}
catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
throw new SylphException(JOB_BUILD_ERROR, e);
Expand Down Expand Up @@ -64,7 +63,7 @@ else if (driver instanceof Sink) {
}

sink.driverInit(config); //传入参数
return (UnaryOperator<DataStream<Row>> & Serializable) (stream) -> {
return (stream) -> {
sink.run(stream);
return null;
};
Expand Down Expand Up @@ -99,7 +98,7 @@ else if (driver instanceof TransForm) {
throw new SylphException(JOB_BUILD_ERROR, "NOT SUPPORTED TransForm:" + driver);
}
transform.driverInit(config);
return (UnaryOperator<DataStream<Row>> & Serializable) (stream) -> transform.transform(stream);
return (stream) -> transform.transform(stream);
}

private static Sink<DataStream<Row>> loadRealTimeSink(RealTimeSink realTimeSink)
Expand Down
18 changes: 15 additions & 3 deletions sylph-runners/spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,28 @@ tasks.compileJava.dependsOn compileScala
tasks.compileScala.dependsOn.remove("compileJava")

dependencies {
compile group: 'org.javassist', name: 'javassist', version: '3.22.0-GA'

compileOnly group: 'org.apache.spark', name: 'spark-sql_2.11', version: deps.spark
compileOnly group: 'org.apache.spark', name: 'spark-streaming_2.11', version: deps.spark
compileOnly "org.apache.spark:spark-yarn_2.11:$deps.spark"


compileOnly (group: 'org.apache.spark', name: 'spark-streaming-kafka-0-10_2.11', version: deps.spark){
//exclude(group: '*')
}

dependencies {
compile files("$sourceSets.main.scala.outputDir")
runtime(project(':sylph-spi')){
exclude(group: '*')
}
runtime(project(':ideal-common')){
exclude(group: '*')
}

//--- add scala class
compileOnly files("$sourceSets.main.scala.outputDir")


testCompile group: 'org.apache.spark', name: 'spark-sql_2.11', version: deps.spark
testCompile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: deps.spark
testCompile "org.apache.spark:spark-yarn_2.11:$deps.spark"
}
Loading

0 comments on commit cab38b6

Please sign in to comment.