Skip to content

Commit

Permalink
fix flink 1.11 join sql build
Browse files Browse the repository at this point in the history
  • Loading branch information
harbby committed Sep 22, 2020
1 parent 3a4dfab commit 48f7eb1
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 189 deletions.
12 changes: 2 additions & 10 deletions sylph-connectors/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,8 @@ subprojects {
def plugins = project(':sylph-dist').buildDir.path + "/plugins/${name}"
task buildPlugins(type: Copy) {

if (project.name == 'sylph-elasticsearch6') {
println(project)
from(project.files("build/libs")) {
include '*-shaded.jar' //只要这个包
}
}
else {
from(configurations.runtime)
from(jar)
}
from(configurations.runtime)
from(jar)

into plugins
//include '*.jar'
Expand Down
2 changes: 1 addition & 1 deletion sylph-connectors/sylph-elasticsearch5/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

dependencies {
compile 'com.github.harbby:es-shaded:5.6.0-1'
compile 'com.github.harbby:es-client:5.6.0-1:shaded'
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ public class Elasticsearch5Sink
private BulkRequestBuilder bulkBuilder;

@Autowired
public Elasticsearch5Sink(SinkContext context, ClientFactory clientFactory)
public Elasticsearch5Sink(SinkContext context, ElasticsearchSinkConfig config)
{
this.config = clientFactory.getConfig();
this.config = config;
this.schema = context.getSchema();
this.clientFactory = clientFactory;
this.clientFactory = new ClientFactory(config);

if (!Strings.isNullOrEmpty(config.getIdField())) {
int fieldIndex = schema.getFieldIndex(config.getIdField());
Expand Down
5 changes: 1 addition & 4 deletions sylph-connectors/sylph-elasticsearch6/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@

dependencies {
compile 'com.github.harbby:es-shaded:6.4.0-1'

compile 'org.apache.flink:flink-connector-elasticsearch6_2.12:1.7.2'

compile 'com.github.harbby:es-client:6.4.0-1:shaded'
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ public class Elasticsearch6Sink
private BulkRequestBuilder bulkBuilder;

@Autowired
public Elasticsearch6Sink(SinkContext context, ClientFactory clientFactory)
public Elasticsearch6Sink(SinkContext context, ElasticsearchSinkConfig config)
{
this.config = clientFactory.getConfig();
this.config = config;
this.schema = context.getSchema();
this.clientFactory = clientFactory;
this.clientFactory = new ClientFactory(config);

if (!Strings.isNullOrEmpty(config.getIdField())) {
int fieldIndex = schema.getFieldIndex(config.getIdField());
Expand Down

This file was deleted.

6 changes: 3 additions & 3 deletions sylph-dist/src/etc/sylph/sylph-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
#export JAVA_HOME=/opt/cloudera/parcels/jdk8

# set your HADOOP_CONF_DIR
export HADOOP_CONF_DIR=/ideal/hadoop/hadoop/etc/hadoop
export HADOOP_CONF_DIR=/data/hadoop/hadoop/etc/hadoop

# set your FLINK_HOME
export FLINK_HOME=/ideal/hadoop/flink
export FLINK_HOME=/data/hadoop/flink
export FLINK_PLUGINS_DIR=$FLINK_HOME/plugins
export FLINK_CONF_DIR=$FLINK_HOME/conf


# set your SPARK_HOME
export SPARK_HOME=/ideal/hadoop/spark
export SPARK_HOME=/data/hadoop/spark
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,14 @@ private static JobGraph compile(
StreamTableEnvironmentImpl tableEnv = (StreamTableEnvironmentImpl) StreamTableEnvironment.create(execEnv, settings);
StreamSqlBuilder streamSqlBuilder = new StreamSqlBuilder(tableEnv, connectorStore, new AntlrSqlParser());
Arrays.stream(sqlSplit).forEach(streamSqlBuilder::buildStreamBySql);
StreamGraph streamGraph;
try {
streamGraph = (StreamGraph) tableEnv.getPipeline(jobId);
}
catch (IllegalStateException e) {
streamGraph = execEnv.getStreamGraph(jobId);
}

StreamGraph streamGraph = (StreamGraph) tableEnv.getPipeline(jobId);
streamGraph.setJobName(jobId);
return streamGraph.getJobGraph();
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private static Serializable compile(String jobId, SqlFlow sqlFlow, ConnectorStor
};

JVMLauncher<Boolean> launcher = JVMLaunchers.<Boolean>newJvm()
.setConsole((line) -> System.out.println(new Ansi().fg(YELLOW).a("[" + jobId + "] ").fg(GREEN).a(line).reset()))
.setConsole((line) -> logger.info(new Ansi().fg(YELLOW).a("[" + jobId + "] ").fg(GREEN).a(line).reset().toString()))
.setCallable(() -> {
System.out.println("************ job start ***************");
appGetter.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private static Serializable compile(String jobId, SqlFlow sqlFlow, ConnectorStor
};

JVMLauncher<Boolean> launcher = JVMLaunchers.<Boolean>newJvm()
.setConsole((line) -> System.out.println(new Ansi().fg(YELLOW).a("[" + jobId + "] ").fg(GREEN).a(line).reset()))
.setConsole((line) -> logger.info(new Ansi().fg(YELLOW).a("[" + jobId + "] ").fg(GREEN).a(line).reset().toString()))
.setCallable(() -> {
System.out.println("************ job start ***************");
URL path = JavaVersion.class.getProtectionDomain().getCodeSource().getLocation();
Expand Down

0 comments on commit 48f7eb1

Please sign in to comment.