Skip to content

Commit

Permalink
0.2.0.rc1
Browse files Browse the repository at this point in the history
  • Loading branch information
harbby committed Oct 10, 2018
1 parent 340835e commit 5b462ce
Show file tree
Hide file tree
Showing 32 changed files with 322 additions and 241 deletions.
6 changes: 4 additions & 2 deletions ideal-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ dependencies {
exclude(module: 'guice')
exclude(module: 'guava')
exclude(module: "guice-multibindings")
exclude(module: 'commons-lang3')
}
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.8.1'


compile (group: 'com.google.inject.extensions', name: 'guice-multibindings', version: deps.guice){
exclude(module: "guava")
}
Expand All @@ -14,6 +18,4 @@ dependencies {
compile group: 'com.google.guava', name: 'guava', version: deps.guava

compile group: 'org.slf4j', name: 'slf4j-log4j12', version: deps.log4j12

testCompile group: 'org.javassist', name: 'javassist', version: '3.22.0-GA'
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.text.DateFormat;
import java.util.Date;

import static org.apache.commons.lang3.ObjectUtils.firstNonNull;
import static com.google.common.base.MoreObjects.firstNonNull;

public class DirClassLoader
extends URLClassLoader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Name("mysql")
@Description("this is mysql Sink, if table not execit ze create table")
Expand All @@ -41,10 +45,20 @@ public class MysqlSink
private Connection connection;
private PreparedStatement statement;
private int num = 0;
private final String prepareStatementQuery;
private final String[] keys;

public MysqlSink(MysqlConfig mysqlConfig)
{
this.config = mysqlConfig;
this.prepareStatementQuery = config.saveQuery.replaceAll("\\$\\{.*?}", "?");
// parser sql query ${key}
Matcher matcher = Pattern.compile("(?<=\\$\\{)(.+?)(?=\\})").matcher(config.saveQuery);
List<String> builder = new ArrayList<>();
while (matcher.find()) {
builder.add(matcher.group());
}
this.keys = builder.toArray(new String[0]);
}

@Override
Expand All @@ -53,7 +67,7 @@ public boolean open(long partitionId, long version)
try {
Class.forName("com.mysql.jdbc.Driver");
this.connection = DriverManager.getConnection(config.jdbcUrl, config.user, config.password);
this.statement = connection.prepareStatement(config.insertQuery);
this.statement = connection.prepareStatement(prepareStatementQuery);
}
catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException("MysqlSink open fail", e);
Expand All @@ -62,15 +76,18 @@ public boolean open(long partitionId, long version)
}

@Override
public void process(Row value)
public void process(Row row)
{
try {
for (int i = 0; i < value.size(); i++) {
statement.setObject(i + 1, value.getAs(i));
int i = 1;
for (String key : keys) {
Object value = isNumeric(key) ? row.getAs(Integer.parseInt(key)) : row.getAs(key);
statement.setObject(i, value);
i += 1;
}
statement.addBatch();
// submit batch
if (num >= 5) {
if (num >= 50) {
statement.executeBatch();
num = 0;
}
Expand Down Expand Up @@ -106,18 +123,32 @@ public static class MysqlConfig
{
@Name("url")
@Description("this is mysql jdbc url")
private final String jdbcUrl = "jdbc:mysql://localhost:3306/pop?characterEncoding=utf-8&useSSL=false";
private String jdbcUrl = "jdbc:mysql://localhost:3306/pop?characterEncoding=utf-8&useSSL=false";

@Name("userName")
@Description("this is mysql userName")
private final String user = "demo";
private String user = "demo";

@Name("password")
@Description("this is mysql password")
private final String password = "demo";
private String password = "demo";

@Name("query")
@Description("this is mysql save query")
private String saveQuery = "insert into your_table values(${0},${1},${2})";
/*
* demo: insert into your_table values(${0},${1},${2})
* demo: replace into table select '${0}', ifnull((select cnt from table where id = '${0}'),0)+{1};
* */
}

@Name("insert.query")
@Description("this is mysql insert.query")
private final String insertQuery = "insert into mysql_table_sink values(?,?,?)";
private static boolean isNumeric(String str)
{
for (int i = str.length(); --i >= 0; ) {
if (!Character.isDigit(str.charAt(i))) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,19 @@ public static class KafkaSourceConfig

@Name("kafka_topic")
@Description("this is kafka topic list")
private final String topics = "test1";
private String topics = "test1";

@Name("kafka_broker")
@Description("this is kafka broker list")
private final String brokers = "localhost:9092";
private String brokers = "localhost:9092";

@Name("kafka_group_id")
@Description("this is kafka_group_id")
private final String groupid = "sylph_streamSql_test1";
private String groupid = "sylph_streamSql_test1";

@Name("auto.offset.reset")
@Description("this is auto.offset.reset mode")
private final String offsetMode = "latest";
private String offsetMode = "latest";

private KafkaSourceConfig() {}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import scala.util.parsing.json.JSONObject
/**
* test source
**/
@Name("test")
@Name("flink_test_source")
@Description("this flink test source inputStream")
@Version("1.0.0")
@SerialVersionUID(2L) //使用注解来制定序列化id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,18 @@
*/
package ideal.sylph.plugins.flink.transform

import ideal.sylph.annotation.Description
import ideal.sylph.etl.api.TransForm
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.types.Row

@Description("use test")
@SerialVersionUID(2L) //使用注解来制定序列化id
class SqlWindow extends TransForm[DataStream[Row]] {
class TestSqlWindow extends TransForm[DataStream[Row]] {

override def transform(stream: DataStream[Row]): DataStream[Row] = {

// val tb = stream.map(row => {
// val value = row.getField(1).asInstanceOf[String]
// val json = new JSONObject(value.replaceAll("\\}\u0001\\{", ","))
// (
// json.getString("user_id"),
// json.getString("client_type"),
// new Timestamp(json.getLong("server_time"))
// )
// })

// println(tb.dataType)
// tableEnv.registerDataStream("tp", tb, 'user_id, 'client_type,'rowtime.rowtime)
// val result2:Table = tableEnv.sql(
// """SELECT user_id, count(1) FROM tp
// | GROUP BY HOP(proctime, INTERVAL '5' SECOND, INTERVAL '5' MINUTE),
// | user_id""".stripMargin
// )
val execEnv: StreamExecutionEnvironment = stream.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(execEnv)
val result2: Table = tableEnv.sqlQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,18 @@ class MyKafkaSource(@transient private val ssc: StreamingContext, private val co
@SerialVersionUID(2L)
private[this] class KafkaSourceConfig extends PluginConfig {
@Name("kafka_topic")
@Description("this is kafka topic list") val topics: String = null
@Description("this is kafka topic list")
var topics: String = "test1,test2"

@Name("kafka_broker")
@Description("this is kafka broker list") val brokers: String = null
@Description("this is kafka broker list")
var brokers: String = "localhost:9092"

@Name("kafka_group_id")
@Description("this is kafka_group_id") val groupid: String = null
@Description("this is kafka_group_id")
var groupid: String = "streamEtl1"

@Name("auto.offset.reset")
@Description("this is auto.offset.reset mode") var offsetMode = "latest"
@Description("this is auto.offset.reset mode")
var offsetMode = "latest"
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,6 @@ class SocketSource(@transient private val ssc: StreamingContext, private val con
@SerialVersionUID(2L)
private[this] class SocketSourceConfig extends PluginConfig {
@Name("socket_hosts")
@Description("this is socket_hosts list") val hosts: String = null
@Description("this is socket_hosts list")
var hosts: String = "localhost:9999"
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,15 @@ public EtlResource(
@Path("save")
@Consumes({MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON})
@Produces({MediaType.APPLICATION_JSON})
public Map saveJob(@Context HttpServletRequest request)
public Map saveJob(@Context HttpServletRequest request, @QueryParam("actuator") String actuator)
{
requireNonNull(actuator, "actuator is null");
try {
String jobId = requireNonNull(request.getParameter("jobId"), "job jobId 不能为空");
String flow = request.getParameter("graph");
String configString = request.getParameter("config");

sylphContext.saveJob(jobId, flow, ImmutableMap.of("type", "StreamETL", "config", parserJobConfig(configString)));
sylphContext.saveJob(jobId, flow, ImmutableMap.of("type", actuator, "config", parserJobConfig(configString)));
Map out = ImmutableMap.of(
"jobId", jobId,
"type", "save",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package ideal.sylph.controller.action;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import ideal.sylph.spi.SylphContext;
import ideal.sylph.spi.job.JobActuator;
Expand All @@ -32,7 +33,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;
import static com.google.common.base.Preconditions.checkArgument;

@javax.inject.Singleton
@Path("/plugin")
Expand All @@ -54,7 +55,7 @@ public PluginMangerResource(
@Path("actuators")
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public List<String> getAllActuators(@QueryParam("type") String type)
public List<String> getETLActuators()
{
List<String> names = sylphContext.getAllActuatorsInfo()
.stream()
Expand All @@ -69,8 +70,7 @@ public List<String> getAllActuators(@QueryParam("type") String type)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Map getAllPlugins(@QueryParam("actuator") String actuator)
{
//test Object a1 = uriInfo.getQueryParameters();
requireNonNull(actuator, "actuator is null");
checkArgument(!Strings.isNullOrEmpty(actuator), "actuator not setting");
Map plugins = sylphContext.getPlugins(actuator).stream().map(pluginInfo -> {
Map config = pluginInfo.getPluginConfig().stream()
.collect(Collectors.toMap(
Expand Down
14 changes: 8 additions & 6 deletions sylph-controller/src/main/webapp/app/etl.html
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,16 @@
<link rel="stylesheet" href="libs/codemirror/lib/codemirror.css">
<script src="libs/codemirror/lib/codemirror.js"></script>
<script src="libs/codemirror/mode/properties/properties.js"></script>
<script src="libs/codemirror/mode/javascript/javascript.js"></script>
<script src="libs/codemirror/addon/selection/active-line.js"></script>
<script src="libs/codemirror/addon/edit/matchbrackets.js"></script>

<!-- user -->
<script type="text/javascript" src="js/public.js"></script>
<script type="text/javascript" src="js/etl.js"></script>
</head>

<style>
.window {
background-color: #eeeeef;
text-align: center;
line-height: 30px;
line-height: 800%;
z-index: 24;
cursor: pointer;
box-shadow: 2px 2px 19px #aaa;
Expand Down Expand Up @@ -228,7 +225,7 @@ <h4 class="modal-title">警告</h4>


var nodeTextEditor = CodeMirror.fromTextArea(document.getElementById("nodeText"), {
mode: 'properties',
mode: 'javascript',
lineNumbers: true,
styleActiveLine: true,
matchBrackets: true
Expand All @@ -241,4 +238,9 @@ <h4 class="modal-title">警告</h4>
});
};
</script>

<!-- user -->
<script type="text/javascript" src="js/public.js"></script>
<script type="text/javascript" src="js/etl.js"></script>

</html>
Loading

0 comments on commit 5b462ce

Please sign in to comment.