Skip to content

Commit

Permalink
kafkaSource supports json parser
Browse files Browse the repository at this point in the history
  • Loading branch information
harbby committed Jan 7, 2019
1 parent 0e3a71e commit 68006df
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 84 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (C) 2018 The Sylph Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.plugins.kafka;

import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.etl.PluginConfig;

public class KafkaSourceConfig
extends PluginConfig
{
private static final long serialVersionUID = 2L;

@Name("kafka_topic")
@Description("this is kafka topic list")
private String topics = "test1";

@Name("kafka_broker")
@Description("this is kafka broker list")
private String brokers = "localhost:9092";

@Name("kafka_group_id")
@Description("this is kafka_group_id")
private String groupid = "sylph_streamSql_test1";

@Name("auto.offset.reset")
@Description("this is auto.offset.reset mode")
private String offsetMode = "latest";

@Name("value_type")
@Description("this is kafka String value Type, use json")
private String valueType;

public String getTopics()
{
return topics;
}

public String getBrokers()
{
return brokers;
}

public String getGroupid()
{
return groupid;
}

public String getOffsetMode()
{
return offsetMode;
}

public String getValueType()
{
return valueType;
}

private KafkaSourceConfig() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (C) 2018 The Sylph Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.plugins.kafka.flink;

import com.fasterxml.jackson.databind.ObjectMapper;
import ideal.sylph.etl.SourceContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.types.Row;

import java.io.IOException;
import java.util.Map;

public class JsonSchema
implements KeyedDeserializationSchema<Row>
{
private static final ObjectMapper MAPPER = new ObjectMapper();
private final RowTypeInfo rowTypeInfo;

public JsonSchema(SourceContext context)
{
ideal.sylph.etl.Row.Schema schema = context.getSchema();

TypeInformation<?>[] types = schema.getFieldTypes().stream().map(TypeExtractor::createTypeInfo).toArray(TypeInformation<?>[]::new);
String[] names = schema.getFieldNames().toArray(new String[0]);
this.rowTypeInfo = new RowTypeInfo(types, names);
}

@Override
public Row deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)
throws IOException
{
@SuppressWarnings("unchecked")
Map<String, Object> map = MAPPER.readValue(message, Map.class);
String[] names = rowTypeInfo.getFieldNames();
Row row = new Row(names.length);
for (int i = 0; i < names.length; i++) {
row.setField(i, map.get(names[i]));
}
return row;
}

@Override
public boolean isEndOfStream(Row nextElement)
{
return false;
}

@Override
public TypeInformation<Row> getProducedType()
{
return rowTypeInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
import ideal.sylph.annotation.Description;
import ideal.sylph.annotation.Name;
import ideal.sylph.annotation.Version;
import ideal.sylph.etl.PluginConfig;
import ideal.sylph.etl.SourceContext;
import ideal.sylph.etl.api.Source;
import ideal.sylph.plugins.kafka.KafkaSourceConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
Expand Down Expand Up @@ -53,29 +54,31 @@ public class KafkaSource
/**
* 初始化(driver阶段执行)
**/
public KafkaSource(StreamExecutionEnvironment execEnv, KafkaSourceConfig config)
public KafkaSource(StreamExecutionEnvironment execEnv, KafkaSourceConfig config, SourceContext context)
{
requireNonNull(execEnv, "execEnv is null");
requireNonNull(config, "config is null");
loadStream = Suppliers.memoize(() -> {
String topics = config.topics;
String brokers = config.brokers; //需要把集群的host 配置到程序所在机器
String groupid = config.groupid; //消费者的名字
String offset = config.offsetMode; //latest earliest
String topics = config.getTopics();
String groupId = config.getGroupid(); //消费者的名字
String offsetMode = config.getOffsetMode(); //latest earliest

Properties properties = new Properties();
properties.put("bootstrap.servers", brokers);
properties.put("bootstrap.servers", config.getBrokers()); //需要把集群的host 配置到程序所在机器
//"enable.auto.commit" -> (false: java.lang.Boolean), //不自动提交偏移量
// "session.timeout.ms" -> "30000", //session默认是30秒 超过5秒不提交offect就会报错
// "heartbeat.interval.ms" -> "5000", //10秒提交一次 心跳周期
properties.put("group.id", groupid); //注意不同的流 group.id必须要不同 否则会出现offect commit提交失败的错误
properties.put("auto.offset.reset", offset); //latest earliest
properties.put("group.id", groupId); //注意不同的流 group.id必须要不同 否则会出现offect commit提交失败的错误
properties.put("auto.offset.reset", offsetMode); //latest earliest

KeyedDeserializationSchema<Row> deserializationSchema = "json".equals(config.getValueType()) ?
new JsonSchema(context) : new RowDeserializer();

List<String> topicSets = Arrays.asList(topics.split(","));
//org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
DataStream<Row> stream = execEnv.addSource(new FlinkKafkaConsumer010<Row>(
topicSets,
new RowDeserializer(),
deserializationSchema,
properties)
);
return stream;
Expand Down Expand Up @@ -109,6 +112,7 @@ public Row deserialize(byte[] messageKey, byte[] message, String topic, int part
);
}

@Override
public TypeInformation<Row> getProducedType()
{
TypeInformation<?>[] types = new TypeInformation<?>[] {
Expand All @@ -121,28 +125,4 @@ public TypeInformation<Row> getProducedType()
return new RowTypeInfo(types, KAFKA_COLUMNS);
}
}

public static class KafkaSourceConfig
extends PluginConfig
{
private static final long serialVersionUID = 2L;

@Name("kafka_topic")
@Description("this is kafka topic list")
private String topics = "test1";

@Name("kafka_broker")
@Description("this is kafka broker list")
private String brokers = "localhost:9092";

@Name("kafka_group_id")
@Description("this is kafka_group_id")
private String groupid = "sylph_streamSql_test1";

@Name("auto.offset.reset")
@Description("this is auto.offset.reset mode")
private String offsetMode = "latest";

private KafkaSourceConfig() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package ideal.sylph.plugins.kafka.spark
import ideal.sylph.annotation.{Description, Name, Version}
import ideal.sylph.etl.PluginConfig
import ideal.sylph.etl.api.Source
import org.apache.kafka.clients.consumer.ConsumerRecord
import ideal.sylph.plugins.kafka.KafkaSourceConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
Expand All @@ -38,60 +38,42 @@ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
@Description("this spark kafka source inputStream")
@SerialVersionUID(1L)
class MyKafkaSource(@transient private val ssc: StreamingContext, private val config: KafkaSourceConfig) extends Source[DStream[Row]] {

/**
* load stream
**/
private lazy val kafkaStream: InputDStream[ConsumerRecord[String, String]] = {
val topics = config.topics
val brokers = config.brokers //需要把集群的host 配置到程序所在机器
val groupid = config.groupid //消费者的名字
val offset = config.offsetMode //
private lazy val kafkaStream: DStream[Row] = {
val topics = config.getTopics
val brokers = config.getBrokers //需要把集群的host 配置到程序所在机器
val groupid = config.getGroupid //消费者的名字
val offsetMode = config.getOffsetMode

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"enable.auto.commit" -> (false: java.lang.Boolean), //不自动提交偏移量
// "session.timeout.ms" -> "30000", //session默认是30秒 超过5秒不提交offect就会报错
// "session.timeout.ms" -> "30000", //session默认是30秒
// "heartbeat.interval.ms" -> "5000", //10秒提交一次 心跳周期
"group.id" -> groupid, //注意不同的流 group.id必须要不同 否则会出现offect commit提交失败的错误
"auto.offset.reset" -> offset //latest earliest
"auto.offset.reset" -> offsetMode //latest earliest
)

val topicSets = topics.split(",")
KafkaUtils.createDirectStream[String, String](
ssc, PreferConsistent, Subscribe[String, String](topicSets, kafkaParams))
}

override def getSource: DStream[Row] = {
val schema: StructType = StructType(Array(
StructField("topic", StringType, nullable = true),
StructField("value", StringType, true),
StructField("key", StringType, true)
StructField("_topic", StringType, nullable = true),
StructField("_key", StringType, true),
StructField("_message", StringType, true),
StructField("_partition", IntegerType, true),
StructField("_offset", LongType, true)
))

kafkaStream.map(record =>
new GenericRowWithSchema(Array(record.topic(), record.value(), record.key()), schema)
val topicSets = topics.split(",")
val inputStream = KafkaUtils.createDirectStream[String, String](
ssc, PreferConsistent, Subscribe[String, String](topicSets, kafkaParams))

inputStream.map(record =>
new GenericRowWithSchema(Array(record.topic(), record.key(), record.value(), record.partition(), record.offset()), schema)
).asInstanceOf[DStream[Row]] //.window(Duration(10 * 1000))
}
}

@SerialVersionUID(2L)
private[this] class KafkaSourceConfig extends PluginConfig {
@Name("kafka_topic")
@Description("this is kafka topic list")
var topics: String = "test1,test2"

@Name("kafka_broker")
@Description("this is kafka broker list")
var brokers: String = "localhost:9092"

@Name("kafka_group_id")
@Description("this is kafka_group_id")
var groupid: String = "streamEtl1"

@Name("auto.offset.reset")
@Description("this is auto.offset.reset mode")
var offsetMode = "latest"
override def getSource: DStream[Row] = kafkaStream
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,6 @@ class SocketSource(@transient private val ssc: StreamingContext, private val con
}).reduce((x, y) => x.union(y))
}

def addSink(sink: Sink[JavaRDD[Row]], transForms: List[TransForm[DStream[Row]]]): Unit = {

var transStream = loadStream
transForms.foreach(transForm => {
transStream = transForm.transform(transStream)
})

transStream.foreachRDD(rdd => sink.run(rdd))
}

override def getSource: DStream[Row] = loadStream
}

Expand Down
26 changes: 26 additions & 0 deletions sylph-etl-api/src/main/java/ideal/sylph/etl/SourceContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (C) 2018 The Sylph Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.etl;

import java.io.Serializable;

public interface SourceContext
extends Serializable
{
public Row.Schema getSchema();

public String getSinkTable();
}
Loading

0 comments on commit 68006df

Please sign in to comment.