Skip to content

Commit

Permalink
Merge pull request harbby#187 from harbby/dev
Browse files Browse the repository at this point in the history
merge for Dev
  • Loading branch information
harbby authored Aug 14, 2021
2 parents 4fd67fa + 91732aa commit 7d01127
Show file tree
Hide file tree
Showing 59 changed files with 1,073 additions and 978 deletions.
37 changes: 20 additions & 17 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,6 @@ allprojects {
group 'com.github.harbby'
version '0.8.0-SNAPSHOT' //SNAPSHOT

apply plugin: 'java'
apply plugin: 'maven'
apply plugin: 'checkstyle'
apply plugin: 'jacoco'

def jdk = project.hasProperty('jdk') ? project.jdk : 'java8' //or java11 use -Pjdk=java8
//def jdk = System.getProperty("jdk") ?: "java8" //or java11 use -Djdk=java8
apply from: "$rootProject.projectDir/profile-${jdk}.gradle"

tasks.withType(JavaCompile) {
options.encoding = 'UTF-8'
}
tasks.withType(ScalaCompile) {
options.encoding = 'UTF-8'
}

ext.deps = [
flink : '1.13.1',
jetty : "9.4.6.v20170531", //8.1.17.v20150415 "9.4.6.v20170531"
Expand All @@ -34,7 +18,7 @@ allprojects {
joda_time : '2.9.3',
slf4j : '1.7.25',
guice : '4.2.1',
gadtry : '1.9.1',
gadtry : '1.9.2-SNAPSHOT',
guava : '27.0-jre',
jackson : '2.9.8',
jersey : '2.28',
Expand All @@ -43,6 +27,22 @@ allprojects {
}

subprojects {
apply plugin: 'java'
apply plugin: 'maven'
apply plugin: 'checkstyle'
apply plugin: 'jacoco'

def jdk = project.hasProperty('jdk') ? project.jdk : 'java8' //or java11 use -Pjdk=java8
//def jdk = System.getProperty("jdk") ?: "java8" //or java11 use -Djdk=java8
apply from: "$rootProject.projectDir/profile-${jdk}.gradle"

tasks.withType(JavaCompile) {
options.encoding = 'UTF-8'
}
tasks.withType(ScalaCompile) {
options.encoding = 'UTF-8'
}

if (project != rootProject) {
apply plugin: 'com.github.hierynomus.license'
}
Expand All @@ -55,6 +55,9 @@ subprojects {
mavenLocal()
//maven { url 'http://maven.aliyun.com/nexus/content/groups/public' }
mavenCentral()
maven {
url 'https://oss.sonatype.org/service/local/repositories/snapshots/content/'
}
}

dependencies {
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ project(':sylph-runners:spark').name = 'sylph-spark'
//----
include 'sylph-api'
include 'sylph-connectors'
include 'sylph-connectors:sylph-example'
include 'sylph-connectors:flink-kafka'
include 'sylph-connectors:sylph-mysql'
include 'sylph-connectors:sylph-hdfs'
include 'sylph-connectors:sylph-kudu'

include 'sylph-connectors:spark-kafka'

//----
Expand Down
24 changes: 0 additions & 24 deletions sylph-api/src/main/java/ideal/sylph/etl/Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,9 @@
*/
package ideal.sylph.etl;

import ideal.sylph.etl.api.Sink;
import ideal.sylph.etl.api.Source;
import ideal.sylph.etl.api.TransForm;

import java.io.Serializable;

public interface Operator
extends Serializable
{
public static enum PipelineType
{
source(Source.class),
transform(TransForm.class),
sink(Sink.class),
@Deprecated
batch_join(TransForm.class);

private final Class<? extends Operator> value;

PipelineType(Class<? extends Operator> value)
{
this.value = value;
}

public Class<? extends Operator> getValue()
{
return value;
}
}
}
41 changes: 41 additions & 0 deletions sylph-api/src/main/java/ideal/sylph/etl/OperatorType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (C) 2018 The Sylph Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.etl;

import ideal.sylph.etl.api.Sink;
import ideal.sylph.etl.api.Source;
import ideal.sylph.etl.api.TransForm;

public enum OperatorType
{
source(Source.class),
transform(TransForm.class),
sink(Sink.class),
@Deprecated
batch_join(TransForm.class);

private final Class<? extends Operator> value;

OperatorType(Class<? extends Operator> value)
{
this.value = value;
}

public Class<? extends Operator> getValue()
{
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public Set<Class<? extends Operator>> getConnectors()
{
return MutableSet.<Class<? extends Operator>>builder()
.add(KafkaSource.class)
.add(TestSource.class)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import ideal.sylph.parser.antlr.AntlrSqlParser;
import ideal.sylph.runner.flink.engines.StreamSqlBuilder;
import ideal.sylph.spi.ConnectorStore;
import ideal.sylph.spi.OperatorMetaData;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
Expand Down Expand Up @@ -55,7 +55,7 @@ public void createSource()
" kafka_group_id = 'streamload1'\n" +
")";

StreamSqlBuilder streamSqlBuilder = new StreamSqlBuilder(tableEnv, ConnectorStore.getDefault(), sqlParser);
StreamSqlBuilder streamSqlBuilder = new StreamSqlBuilder(tableEnv, OperatorMetaData.getDefault(), sqlParser);
streamSqlBuilder.buildStreamBySql(sql);

Table kafka = tableEnv.sqlQuery("select * from tb1");
Expand Down
2 changes: 2 additions & 0 deletions sylph-connectors/sylph-example/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
dependencies {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (C) 2018 The Sylph Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.plugins.example;

import com.github.harbby.gadtry.collection.MutableSet;
import ideal.sylph.etl.Operator;

import java.util.Set;

public class Plugin
implements ideal.sylph.etl.Plugin
{
@Override
public Set<Class<? extends Operator>> getConnectors()
{
return MutableSet.<Class<? extends Operator>>builder()
.add(PrintSink.class)
.add(TestTrans.class)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.plugins.mysql;
package ideal.sylph.plugins.example;

import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.plugins.mysql;
package ideal.sylph.plugins.example;

import ideal.sylph.etl.Collector;
import ideal.sylph.etl.Record;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import ideal.sylph.parser.antlr.tree.CreateTable;
import ideal.sylph.runner.flink.engines.StreamSqlBuilder;
import ideal.sylph.runner.flink.etl.FlinkNodeLoader;
import ideal.sylph.spi.ConnectorStore;
import ideal.sylph.spi.OperatorMetaData;
import ideal.sylph.spi.NodeLoader;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -95,7 +95,7 @@ public Map<String, Object> withConfig()
return withConfig;
}
}));
NodeLoader<DataStream<Row>> loader = new FlinkNodeLoader(ConnectorStore.getDefault(), iocFactory);
NodeLoader<DataStream<Row>> loader = new FlinkNodeLoader(OperatorMetaData.getDefault(), iocFactory);

KuduSink kuduSink = (KuduSink) loader.getPluginInstance(Class.forName(driverClass), withConfig);
Assert.assertTrue(kuduSink != null);
Expand All @@ -107,7 +107,7 @@ public void createKuduSink()
{
StreamTableEnvironment tableEnv = getTableEnv();

StreamSqlBuilder streamSqlBuilder = new StreamSqlBuilder(tableEnv, ConnectorStore.getDefault(), sqlParser);
StreamSqlBuilder streamSqlBuilder = new StreamSqlBuilder(tableEnv, OperatorMetaData.getDefault(), sqlParser);
streamSqlBuilder.buildStreamBySql(kuduSinkSql);

tableEnv.sqlUpdate("insert into kudu select 'key' as key, '' as `value`");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ public Set<Class<? extends Operator>> getConnectors()
return MutableSet.<Class<? extends Operator>>builder()
.add(MysqlAsyncJoin.class)
.add(MysqlSink.class)
.add(PrintSink.class)
.add(TestTrans.class)
.build();
}
}
4 changes: 2 additions & 2 deletions sylph-main/src/main/java/ideal/sylph/main/SylphMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import ideal.sylph.main.server.SylphBean;
import ideal.sylph.main.service.JobEngineManager;
import ideal.sylph.main.service.JobManager;
import ideal.sylph.main.service.OperatorLoader;
import ideal.sylph.main.service.OperatorManager;
import ideal.sylph.main.util.PropertiesUtil;
import ideal.sylph.spi.job.JobStore;
import org.slf4j.Logger;
Expand Down Expand Up @@ -69,7 +69,7 @@ public static void main(String[] args)
//----analysis
logger.info("Analysis App dependencys {}", String.join("\n", app.analysis().printShow()));

app.getInstance(OperatorLoader.class).loadPlugins();
app.getInstance(OperatorManager.class).loadPlugins();
app.getInstance(JobEngineManager.class).loadRunners();
app.getInstance(JobStore.class).loadJobs();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import javax.validation.constraints.NotNull;

import java.io.File;
import java.util.Map;
import java.util.Properties;

Expand All @@ -30,12 +31,13 @@ public class ServerMainConfig
private final String metadataPath;
private final String jobWorkDir;
private final String runMode;
private final Map<String, String> config;

@Autowired
public ServerMainConfig(Properties properties)
{
Map<String, String> config = PropertiesUtil.fromProperties(properties);

this.config = config;
this.metadataPath = config.get("server.metadata.path");
this.jobWorkDir = requireNonNull(config.get("server.jobstore.workpath"), "server.jobstore.workpath not setting");
this.runMode = config.getOrDefault("job.runtime.mode", "yarn");
Expand All @@ -57,4 +59,10 @@ public String getRunMode()
{
return runMode;
}

public File getPluginDir()
{
String dir = config.getOrDefault("plugin.operator.dir", "./plugins");
return new File(dir);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import ideal.sylph.controller.ServerConfig;
import ideal.sylph.main.service.JobEngineManager;
import ideal.sylph.main.service.JobManager;
import ideal.sylph.main.service.OperatorLoader;
import ideal.sylph.main.service.OperatorManager;
import ideal.sylph.main.service.SqliteDbJobStore;
import ideal.sylph.spi.SylphContext;
import ideal.sylph.spi.job.JobStore;
Expand All @@ -49,7 +49,7 @@ public void configure(Binder binder)
binder.bind(JobStore.class).by(SqliteDbJobStore.class).withSingle();

// --- Binding parameter
binder.bind(OperatorLoader.class).withSingle();
binder.bind(OperatorManager.class).withSingle();
binder.bind(JobEngineManager.class).withSingle();
binder.bind(JobManager.class).withSingle();

Expand All @@ -61,7 +61,7 @@ private static class SylphContextProvider
{
@Autowired private JobManager jobManager;
@Autowired private JobEngineManager runnerManger;
@Autowired private OperatorLoader pluginLoader;
@Autowired private OperatorManager pluginLoader;

@Override
public SylphContext get()
Expand Down
Loading

0 comments on commit 7d01127

Please sign in to comment.