Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
harbby committed Mar 26, 2019
1 parent 0f54474 commit ef8e6a8
Show file tree
Hide file tree
Showing 49 changed files with 853 additions and 266 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ allprojects {
joda_time: '2.9.3',
log4j12 : '1.7.21',
guice : '4.2.1',
gadtry : '1.4.0-rc2',
gadtry : '1.4.1-rc1',
guava : '25.1-jre',
jackson : '2.9.5',
jersey : '2.27'
Expand Down
2 changes: 2 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ include 'sylph-connectors'
include 'sylph-connectors:sylph-kafka'
include 'sylph-connectors:sylph-mysql'
include 'sylph-connectors:sylph-hdfs'
include 'sylph-connectors:sylph-kafka08'
include 'sylph-connectors:sylph-kafka09'
include 'sylph-connectors:sylph-hbase'
include 'sylph-connectors:sylph-elasticsearch6'
Expand All @@ -38,3 +39,4 @@ include 'sylph-yarn'
//include 'sylph-clickhouse'
//include 'sylph-elasticsearch5'


Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import ideal.sylph.annotation.Name;
import ideal.sylph.etl.PluginConfig;
import ideal.sylph.etl.Row;
import ideal.sylph.etl.Schema;
import ideal.sylph.etl.SinkContext;
import ideal.sylph.etl.api.RealTimeSink;
import org.slf4j.Logger;
Expand All @@ -43,7 +44,7 @@ public class ClickHouseSink

private final ClickHouseSinkConfig config;
private final String prepareStatementQuery;
private final Row.Schema schema;
private final Schema schema;
private int idIndex = -1;
private transient Connection connection;
private transient PreparedStatement statement;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import ideal.sylph.annotation.Name;
import ideal.sylph.etl.PluginConfig;
import ideal.sylph.etl.Row;
import ideal.sylph.etl.Schema;
import ideal.sylph.etl.SinkContext;
import ideal.sylph.etl.api.RealTimeSink;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand All @@ -43,7 +44,7 @@ public class Elasticsearch5Sink
implements RealTimeSink
{
private static final int MAX_BATCH_BULK = 50;
private final Row.Schema schema;
private final Schema schema;
private final ElasticsearchSinkConfig config;

private TransportClient client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import ideal.sylph.annotation.Name;
import ideal.sylph.etl.PluginConfig;
import ideal.sylph.etl.Row;
import ideal.sylph.etl.Schema;
import ideal.sylph.etl.SinkContext;
import ideal.sylph.etl.api.RealTimeSink;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
Expand All @@ -43,7 +44,7 @@ public class Elasticsearch6Sink
implements RealTimeSink
{
private static final int MAX_BATCH_BULK = 50;
private final Row.Schema schema;
private final Schema schema;
private final ElasticsearchSinkConfig config;

private TransportClient client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import ideal.sylph.annotation.Name;
import ideal.sylph.etl.PluginConfig;
import ideal.sylph.etl.Row;
import ideal.sylph.etl.Schema;
import ideal.sylph.etl.SinkContext;
import ideal.sylph.etl.api.RealTimeSink;
import ideal.sylph.plugins.hbase.tuple.Tuple2;
Expand All @@ -44,7 +45,7 @@ public class HbaseSink
private String tableName;
private transient HbaseHelper hbaseHelper;
private int rowkeyIndex = -1;
private final Row.Schema schema;
private final Schema schema;
private final HbaseConfig config;
private Map<String, Tuple2<String, String>> columMapping;
private static final Logger logger = LoggerFactory.getLogger(HbaseSink.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public class BytesUtil
{
private BytesUtil(){}
private BytesUtil() {}

private static final Logger logger = LoggerFactory.getLogger(HbaseSink.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package ideal.sylph.plugins.hbase.util;

import ideal.sylph.etl.Row;
import ideal.sylph.etl.Schema;
import ideal.sylph.plugins.hbase.HbaseSink;
import ideal.sylph.plugins.hbase.exception.ColumMappingException;
import ideal.sylph.plugins.hbase.tuple.Tuple2;
Expand All @@ -27,7 +27,7 @@

public class ColumUtil
{
private ColumUtil(){}
private ColumUtil() {}

private static final String FAMILY_DEFAULT = "0";
private static final Logger log = LoggerFactory.getLogger(HbaseSink.class);
Expand All @@ -39,7 +39,7 @@ private ColumUtil(){}
* @param columnMappingStr Field information to be mapped.
* @return Table field mapping result.
*/
public static Map<String, Tuple2<String, String>> mapping(Row.Schema schema, String columnMappingStr)
public static Map<String, Tuple2<String, String>> mapping(Schema schema, String columnMappingStr)
throws Exception
{
Map<String, Tuple2<String, String>> columnMapping = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import ideal.sylph.annotation.Version;
import ideal.sylph.etl.PluginConfig;
import ideal.sylph.etl.Row;
import ideal.sylph.etl.Schema;
import ideal.sylph.etl.SinkContext;
import ideal.sylph.etl.api.RealTimeSink;
import ideal.sylph.plugins.hdfs.factory.HDFSFactorys;
Expand All @@ -41,7 +42,7 @@ public class HdfsSink
private static final Logger logger = LoggerFactory.getLogger(HdfsSink.class);
private final HdfsSinkConfig config;
private final String sinkTable;
private final Row.Schema schema;
private final Schema schema;
private int eventTimeIndex = -1;

private HDFSFactory hdfsFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package ideal.sylph.plugins.hdfs.factory;

import ideal.sylph.etl.Row;
import ideal.sylph.etl.Schema;
import ideal.sylph.plugins.hdfs.parquet.HDFSFactory;
import ideal.sylph.plugins.hdfs.parquet.ParquetFactory;
import ideal.sylph.plugins.hdfs.txt.TextFileFactory;
Expand Down Expand Up @@ -73,7 +73,7 @@ public abstract static class Builder
{
protected String tableName;
protected String writeTableDir;
protected Row.Schema schema;
protected Schema schema;

/**
* 注意在两级key 这个是用来区分不同的表的 仅此而已
Expand All @@ -91,7 +91,7 @@ public Builder writeTableDir(String writeTableDir)
return this;
}

public Builder schema(Row.Schema schema)
public Builder schema(Schema schema)
{
this.schema = schema;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package ideal.sylph.plugins.hdfs.txt;

import ideal.sylph.etl.Row;
import ideal.sylph.etl.Schema;
import ideal.sylph.plugins.hdfs.parquet.HDFSFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -52,14 +53,14 @@ public class TextFileFactory

private final String writeTableDir;
private final String table;
private final Row.Schema schema;
private final Schema schema;

private volatile boolean closed = false;

public TextFileFactory(
final String writeTableDir,
final String table,
final Row.Schema schema)
final Schema schema)
{
requireNonNull(writeTableDir, "writeTableDir is null");
this.writeTableDir = writeTableDir.endsWith("/") ? writeTableDir : writeTableDir + "/";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package ideal.sylph.plugins.hdfs.utils;

import ideal.sylph.etl.Row;
import ideal.sylph.etl.Field;

import java.util.List;

Expand All @@ -29,13 +29,13 @@ private ParquetUtil() {}
* @param fields 实际写入Parquet的字段集合
* @return String 返回字符串
*/
public static String buildSchema(List<Row.Field> fields)
public static String buildSchema(List<Field> fields)
{
StringBuilder sb = new StringBuilder("message row { ");

for (Row.Field field : fields) {
for (Field field : fields) {
String fieldName = field.getName();
Class<?> type = field.getJavaType();
Class<?> type = field.getJavaTypeClass();
switch (type.getSimpleName()) {
case "String":
sb.append("optional binary ").append(fieldName).append(" (UTF8); ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
*/
package ideal.sylph.plugins.kafka.flink;

import ideal.sylph.etl.SourceContext;
import ideal.sylph.etl.Schema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.types.Row;

import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;

public class JsonSchema
Expand All @@ -32,13 +37,39 @@ public class JsonSchema
private static final ObjectMapper MAPPER = new ObjectMapper();
private final RowTypeInfo rowTypeInfo;

public JsonSchema(SourceContext context)
public JsonSchema(Schema schema)
{
ideal.sylph.etl.Row.Schema schema = context.getSchema();
this.rowTypeInfo = schemaToRowTypeInfo(schema);
}

TypeInformation<?>[] types = schema.getFieldTypes().stream().map(TypeExtractor::createTypeInfo).toArray(TypeInformation<?>[]::new);
public static RowTypeInfo schemaToRowTypeInfo(Schema schema)
{
TypeInformation<?>[] types = schema.getFieldTypes().stream().map(JsonSchema::getFlinkType)
.toArray(TypeInformation<?>[]::new);
String[] names = schema.getFieldNames().toArray(new String[0]);
this.rowTypeInfo = new RowTypeInfo(types, names);
return new RowTypeInfo(types, names);
}

private static TypeInformation<?> getFlinkType(Type type)
{
if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType() == Map.class) {
Type[] arguments = ((ParameterizedType) type).getActualTypeArguments();
Type valueType = arguments[1];
TypeInformation<?> valueInfo = getFlinkType(valueType);
return new MapTypeInfo<>(TypeExtractor.createTypeInfo(arguments[0]), valueInfo);
}
else if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType() == List.class) {
TypeInformation<?> typeInformation = getFlinkType(((ParameterizedType) type).getActualTypeArguments()[0]);
if (typeInformation.isBasicType() && typeInformation != Types.STRING) {
return Types.PRIMITIVE_ARRAY(typeInformation);
}
else {
return Types.OBJECT_ARRAY(typeInformation);
}
}
else {
return TypeExtractor.createTypeInfo(type);
}
}

@Override
Expand All @@ -50,7 +81,14 @@ public Row deserialize(byte[] messageKey, byte[] message, String topic, int part
String[] names = rowTypeInfo.getFieldNames();
Row row = new Row(names.length);
for (int i = 0; i < names.length; i++) {
row.setField(i, map.get(names[i]));
Object value = map.get(names[i]);
Class<?> aClass = rowTypeInfo.getTypeAt(i).getTypeClass();
if (aClass.isArray()) {
row.setField(i, MAPPER.convertValue(value, aClass));
}
else {
row.setField(i, value);
}
}
return row;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public KafkaSource(StreamExecutionEnvironment execEnv, KafkaSourceConfig config,
properties.put("auto.offset.reset", offsetMode); //latest earliest

KeyedDeserializationSchema<Row> deserializationSchema = "json".equals(config.getValueType()) ?
new JsonSchema(context) : new RowDeserializer();
new JsonSchema(context.getSchema()) : new RowDeserializer();

List<String> topicSets = Arrays.asList(topics.split(","));
//org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
Expand Down
8 changes: 8 additions & 0 deletions sylph-connectors/sylph-kafka08/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

dependencies {
compileOnly(group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: deps.flink) {
exclude(module: 'flink-shaded-hadoop2')
}

compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.8_2.11', version: deps.flink
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
2 changes: 2 additions & 0 deletions sylph-connectors/sylph-kafka08/build/tmp/jar/MANIFEST.MF
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Manifest-Version: 1.0

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Manifest-Version: 1.0

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Manifest-Version: 1.0

Loading

0 comments on commit ef8e6a8

Please sign in to comment.