Skip to content

Commit

Permalink
add spark job log4j.properties
Browse files Browse the repository at this point in the history
Optimize module code dependencies and packaging
  • Loading branch information
ideal committed Aug 28, 2018
1 parent 39d3ccb commit d6428fc
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 153 deletions.
9 changes: 8 additions & 1 deletion sylph-runners/spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,21 @@ tasks.compileJava.dependsOn compileScala
tasks.compileScala.dependsOn.remove("compileJava")

dependencies {
runtime(project(':sylph-spi')){
exclude(module: '*')
}

runtime(project(':ideal-common')){
exclude(module: '*')
}

compileOnly group: 'org.apache.spark', name: 'spark-sql_2.11', version: deps.spark
compileOnly group: 'org.apache.spark', name: 'spark-streaming_2.11', version: deps.spark
compileOnly "org.apache.spark:spark-yarn_2.11:$deps.spark"

compileOnly (group: 'org.apache.spark', name: 'spark-streaming-kafka-0-10_2.11', version: deps.spark){
//exclude(group: '*')
}

compile group: 'org.fusesource.jansi', name: 'jansi', version: '1.17.1'

//--- add scala class
Expand Down
22 changes: 22 additions & 0 deletions sylph-runners/spark/conf/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
log4j.rootLogger=INFO, console


##设置不同包的日志级别
log4j.logger.org.apache.kafka=WARN
log4j.logger.org.apache.spark=WARN
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.apache.parquet=WARN


# Set everything to be logged to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy-MM-dd HH:mm:ss} %p[%F:%L]-%m%n
# %d{yy-MM-dd HH:mm:ss} %p[%F:%L]-%m%n
# %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

log4j.logger.org.apache.spark.sql.execution.datasources.parquet=WARN
log4j.logger.org.apache.spark.sql.execution.datasources.FileScanRDD=WARN
log4j.logger.org.apache.hadoop.io.compress.CodecPool=WARN

Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright (C) 2018 The Sylph Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.runner.spark;

import ideal.common.jvm.JVMException;
import ideal.common.jvm.JVMLauncher;
import ideal.common.jvm.JVMLaunchers;
import ideal.sylph.runner.spark.etl.sparkstreaming.StreamNodeLoader;
import ideal.sylph.runner.spark.etl.structured.StructuredNodeLoader;
import ideal.sylph.spi.App;
import ideal.sylph.spi.EtlFlow;
import ideal.sylph.spi.GraphApp;
import ideal.sylph.spi.NodeLoader;
import ideal.sylph.spi.exception.SylphException;
import ideal.sylph.spi.model.PipelinePluginManager;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import org.fusesource.jansi.Ansi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.net.URLClassLoader;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

import static ideal.sylph.spi.exception.StandardErrorCode.JOB_BUILD_ERROR;
import static org.fusesource.jansi.Ansi.Color.GREEN;
import static org.fusesource.jansi.Ansi.Color.YELLOW;

/**
* SparkJobHandle 会在yarn集群中 进行序列化在具体位置在{@link ideal.sylph.runner.spark.SparkAppMain#main}这个函数中
* 因此这个工具类 目的是 减少SparkJobHandle 序列化时的依赖,
* SparkJobHandle序列化则 只需要依赖上面import导入class ,最核心的一点是移除了guava和其他无关依赖
*/
final class JobHelper
{
private JobHelper() {}

private static final Logger logger = LoggerFactory.getLogger(JobHelper.class);

static SparkJobHandle<App<SparkSession>> build2xJob(String jobId, EtlFlow flow, URLClassLoader jobClassLoader, PipelinePluginManager pluginManager)
{
final AtomicBoolean isComplic = new AtomicBoolean(true);
Supplier<App<SparkSession>> appGetter = (Supplier<App<SparkSession>> & Serializable) () -> new GraphApp<SparkSession, Dataset<Row>>()
{
private final SparkSession spark = getSparkSession();

private SparkSession getSparkSession()
{
logger.info("========create spark SparkSession mode isComplic = " + isComplic.get() + "============");
return isComplic.get() ? SparkSession.builder()
.appName("streamLoadTest")
.master("local[*]")
.getOrCreate()
: SparkSession.builder().getOrCreate();
}

@Override
public NodeLoader<SparkSession, Dataset<Row>> getNodeLoader()
{
return new StructuredNodeLoader(pluginManager)
{
@Override
public UnaryOperator<Dataset<Row>> loadSink(Map<String, Object> config)
{
return isComplic.get() ? (stream) -> {
super.loadSinkWithComplic(config).apply(stream);
return null;
} : super.loadSink(config);
}
};
}

@Override
public SparkSession getContext()
{
return spark;
}

@Override
public void build()
throws Exception
{
this.buildGraph(jobId, flow).run();
}
};

try {
JVMLauncher<Integer> launcher = JVMLaunchers.<Integer>newJvm()
.setCallable(() -> {
appGetter.get().build();
return 1;
})
.setConsole((line) -> System.out.println(new Ansi().fg(YELLOW).a("[" + jobId + "] ").fg(GREEN).a(line).reset()))
.addUserURLClassLoader(jobClassLoader)
.build();
launcher.startAndGet(jobClassLoader);
isComplic.set(false);
return new SparkJobHandle<>(appGetter);
}
catch (IOException | ClassNotFoundException | JVMException e) {
throw new SylphException(JOB_BUILD_ERROR, "JOB_BUILD_ERROR", e);
}
}

static SparkJobHandle<App<StreamingContext>> build1xJob(String jobId, EtlFlow flow, URLClassLoader jobClassLoader, PipelinePluginManager pluginManager)
{
final Supplier<App<StreamingContext>> appGetter = (Supplier<App<StreamingContext>> & Serializable) () -> new GraphApp<StreamingContext, DStream<Row>>()
{
private final StreamingContext spark = new StreamingContext(new SparkConf(), Seconds.apply(5));

@Override
public NodeLoader<StreamingContext, DStream<Row>> getNodeLoader()
{
return new StreamNodeLoader(pluginManager);
}

@Override
public StreamingContext getContext()
{
return spark;
}

@Override
public void build()
throws Exception
{
this.buildGraph(jobId, flow).run();
}
};

try {
JVMLauncher<Integer> launcher = JVMLaunchers.<Integer>newJvm()
.setCallable(() -> {
appGetter.get().build();
return 1;
})
.setConsole((line) -> System.out.println(new Ansi().fg(YELLOW).a("[" + jobId + "] ").fg(GREEN).a(line).reset()))
.addUserURLClassLoader(jobClassLoader)
.build();
launcher.startAndGet(jobClassLoader);
return new SparkJobHandle<>(appGetter);
}
catch (IOException | ClassNotFoundException | JVMException e) {
throw new SylphException(JOB_BUILD_ERROR, "JOB_BUILD_ERROR", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
*/
package ideal.sylph.runner.spark;

import ideal.common.base.Serializables;
import ideal.sylph.spi.App;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.StreamingContext;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
Expand All @@ -39,9 +39,8 @@ public static void main(String[] args)
{
System.out.println("spark on yarn app starting...");

byte[] bytes = Files.readAllBytes(Paths.get(new File("job_handle.byt").toURI()));
@SuppressWarnings("unchecked")
SparkJobHandle<App<?>> sparkJobHandle = (SparkJobHandle<App<?>>) Serializables.byteToObject(bytes);
SparkJobHandle<App<?>> sparkJobHandle = (SparkJobHandle<App<?>>) byteToObject(new FileInputStream("job_handle.byt"));

App<?> app = requireNonNull(sparkJobHandle, "sparkJobHandle is null").getApp().get();
app.build();
Expand All @@ -55,4 +54,13 @@ else if (appContext instanceof StreamingContext) {
((StreamingContext) appContext).awaitTermination();
}
}

private static Object byteToObject(InputStream inputStream)
throws IOException, ClassNotFoundException
{
try (ObjectInputStream oi = new ObjectInputStream(inputStream)
) {
return oi.readObject();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,12 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import ideal.common.jvm.JVMException;
import ideal.common.jvm.JVMLauncher;
import ideal.common.jvm.JVMLaunchers;
import ideal.common.proxy.DynamicProxy;
import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.runner.spark.etl.structured.StructuredNodeLoader;
import ideal.sylph.runner.spark.yarn.SparkAppLauncher;
import ideal.sylph.runner.spark.yarn.YarnJobContainer;
import ideal.sylph.spi.App;
import ideal.sylph.spi.EtlFlow;
import ideal.sylph.spi.GraphApp;
import ideal.sylph.spi.NodeLoader;
import ideal.sylph.spi.classloader.DirClassLoader;
import ideal.sylph.spi.classloader.ThreadContextClassLoader;
import ideal.sylph.spi.exception.SylphException;
Expand All @@ -46,22 +39,14 @@
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import javax.validation.constraints.NotNull;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.net.URLClassLoader;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

import static ideal.sylph.spi.exception.StandardErrorCode.JOB_BUILD_ERROR;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -95,7 +80,7 @@ public JobHandle formJob(String jobId, Flow inFlow, DirClassLoader jobClassLoade
jobClassLoader.addJarFiles(builder.build());

try {
return new SparkJobHandle<>(buildJob(jobId, flow, jobClassLoader, pluginManager));
return JobHelper.build2xJob(jobId, flow, jobClassLoader, pluginManager);
}
catch (Exception e) {
throw new SylphException(JOB_BUILD_ERROR, e);
Expand Down Expand Up @@ -136,75 +121,4 @@ public Object invoke(Object proxy, Method method, Object[] args)

return (JobContainer) invocationHandler.getProxy(JobContainer.class);
}

private static Supplier<App<SparkSession>> buildJob(String jobId, EtlFlow flow, URLClassLoader jobClassLoader, PipelinePluginManager pluginManager)
{
final AtomicBoolean isComplic = new AtomicBoolean(true);
Supplier<App<SparkSession>> appGetter = (Supplier<App<SparkSession>> & Serializable) () -> new GraphApp<SparkSession, Dataset<Row>>()
{
private final SparkSession spark = getSparkSession();

private SparkSession getSparkSession()
{
System.out.println("========create spark SparkSession mode isComplic = " + isComplic.get() + "============");
return isComplic.get() ? SparkSession.builder()
.appName("streamLoadTest")
.master("local[*]")
.getOrCreate()
: SparkSession.builder().getOrCreate();
}

@Override
public NodeLoader<SparkSession, Dataset<Row>> getNodeLoader()
{
return new StructuredNodeLoader(pluginManager)
{
@Override
public UnaryOperator<Dataset<Row>> loadSink(Map<String, Object> config)
{
return isComplic.get() ? (stream) -> {
super.loadSinkWithComplic(config).apply(stream);
return null;
} : super.loadSink(config);
}
};
}

@Override
public SparkSession getContext()
{
return spark;
}

@Override
public void build()
throws Exception
{
this.buildGraph(jobId, flow).run();
}
};

try {
//AnsiConsole.systemInstall();
JVMLauncher<Integer> launcher = JVMLaunchers.<Integer>newJvm()
.setCallable(() -> {
appGetter.get().build();
return 1;
})
.setConsole(System.err::println)
//.setConsole((line) -> System.out.println(ansi().eraseScreen().render("@|green "+line+"|@").reset()))
.addUserURLClassLoader(jobClassLoader)
.build();
launcher.startAndGet(jobClassLoader);

isComplic.set(false);
return appGetter;
}
catch (IOException | ClassNotFoundException | JVMException e) {
throw new SylphException(JOB_BUILD_ERROR, "JOB_BUILD_ERROR", e);
}
finally {
//AnsiConsole.systemUninstall();
}
}
}
Loading

0 comments on commit d6428fc

Please sign in to comment.