Skip to content

Commit

Permalink
Adjust the test source to generate data with a delay for easy warkmar…
Browse files Browse the repository at this point in the history
…k test
  • Loading branch information
ideal committed Aug 28, 2018
1 parent 41ac012 commit 00004f3
Showing 1 changed file with 9 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package ideal.sylph.plugins.flink.source

import java.util
import java.util.Date
import java.util.Random
import java.util.concurrent.TimeUnit

import ideal.sylph.annotation.{Description, Name, Version}
Expand Down Expand Up @@ -68,23 +68,19 @@ class TestSource extends Source[StreamTableEnvironment, DataStream[Row]] {

@throws[Exception]
override def run(sourceContext: SourceFunction.SourceContext[Row]): Unit = {
val startTime = System.currentTimeMillis
val numElements = 20000000
val random = new Random
val numKeys = 10
var value = 1L
var count = 0L
var count = 1L
while (running) {
val user_id = "uid:" + value
val msg = new JSONObject(Map[String, String]("user_id" -> user_id, "ip" -> "127.0.0.1")).toString()
val serverTime: java.lang.Long = new Date().getTime()
val row = Row.of("key" + value, msg, serverTime)
val eventTime: java.lang.Long = System.currentTimeMillis - random.nextInt(10 * 1000) //表示数据已经产生了 但是会有10秒以内的延迟
val user_id = "uid:" + count
val msg = JSONObject(Map[String, String]("user_id" -> user_id, "ip" -> "127.0.0.1")).toString()
val row = Row.of("key" + count, msg, eventTime)
sourceContext.collect(row)
count += 1
value += 1
if (value > numKeys) value = 1L
if (count > numKeys) count = 1L
TimeUnit.MILLISECONDS.sleep(100)
}
val endTime = System.currentTimeMillis
}

override def getProducedType: TypeInformation[Row] = {
Expand All @@ -93,7 +89,7 @@ class TestSource extends Source[StreamTableEnvironment, DataStream[Row]] {
TypeExtractor.createTypeInfo(classOf[String]),
TypeExtractor.createTypeInfo(classOf[Long]) //createTypeInformation[String]
)
val rowTypeInfo = new RowTypeInfo(types, Array("key", "value", "server_time"))
val rowTypeInfo = new RowTypeInfo(types, Array("key", "value", "event_time"))
//createTypeInformation[Row]
rowTypeInfo
}
Expand Down

0 comments on commit 00004f3

Please sign in to comment.