Skip to content

Commit

Permalink
爱奇艺实时流统计
Browse files Browse the repository at this point in the history
  • Loading branch information
linwt authored Sep 19, 2018
1 parent d33a3c2 commit 062fef5
Show file tree
Hide file tree
Showing 25 changed files with 1,422 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Aiqiyi_SparkStreaming/Aiqiyi_SparkStreaming.iml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4" />
113 changes: 113 additions & 0 deletions Aiqiyi_SparkStreaming/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Aiqiyi_SparkStreaming</groupId>
<artifactId>Aiqiyi_SparkStreaming</artifactId>
<version>1.0-SNAPSHOT</version>
<inceptionYear>2008</inceptionYear>
<properties>
<scala.version>2.11.8</scala.version>
<kafka.version>0.11.0.2</kafka.version>
<spark.version>2.1.0</spark.version>
<hadoop.version>2.6.4</hadoop.version>
<hbase.version>2.0.2</hbase.version>
</properties>

<dependencies>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.99.2</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>0.99.2</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>

</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>
3 changes: 3 additions & 0 deletions Aiqiyi_SparkStreaming/src/main/scala/caseclass/ClickLog.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package caseclass

case class ClickLog(ip: String, time:String, id: Int, url: String, statusCode: Int)
3 changes: 3 additions & 0 deletions Aiqiyi_SparkStreaming/src/main/scala/caseclass/Search.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package caseclass

case class Search(searchId: String, searchCount: Long)
3 changes: 3 additions & 0 deletions Aiqiyi_SparkStreaming/src/main/scala/caseclass/Type.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package caseclass

case class Type(typeId: String, typeCount: Long)
22 changes: 22 additions & 0 deletions Aiqiyi_SparkStreaming/src/main/scala/dao/SearchDao.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package dao

import caseclass.Search
import org.apache.hadoop.hbase.util.Bytes
import utils.HBaseUtil

import scala.collection.mutable.ListBuffer

object SearchDao {

val tableName = "search"
val cf = "info"
val column = "searchCount"

def save(list: ListBuffer[Search]): Unit = {
val table = HBaseUtil.getTable(tableName)
for(ele <- list) {
table.incrementColumnValue(Bytes.toBytes(ele.searchId),Bytes.toBytes(cf),Bytes.toBytes(column),ele.searchCount)
}
}
}

44 changes: 44 additions & 0 deletions Aiqiyi_SparkStreaming/src/main/scala/dao/TypeDao.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package dao

import caseclass.Type
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.util.Bytes
import utils.HBaseUtil
import scala.collection.mutable.ListBuffer

object TypeDao {

val tableName = "type"
val cf = "info"
val column = "typeCount"

def save(list:ListBuffer[Type]): Unit ={
val table = HBaseUtil.getTable(tableName)
for(ele <- list){
table.incrementColumnValue(Bytes.toBytes(ele.typeId),Bytes.toBytes(cf),Bytes.toBytes(column),ele.typeCount)
}
}

def count(typeId:String) : Long={
val table =HBaseUtil.getTable(tableName)
val get = new Get(Bytes.toBytes(typeId))
val value = table.get(get).getValue(Bytes.toBytes(cf), Bytes.toBytes(column))
if(value == null){
0L
}else{
Bytes.toLong(value)
}
}

def main(args: Array[String]): Unit = {
val list = new ListBuffer[Type]
list.append(Type("20171122_8",20))
list.append(Type("20171122_9", 40))
list.append(Type("20171122_10", 60))
save(list)
print(count("20171122_8")+"-->"+count("20171122_9")+"-->"+count("20171122_10"))
// 第一次:20-->40-->60
// 第二次:40-->80-->120
}

}
95 changes: 95 additions & 0 deletions Aiqiyi_SparkStreaming/src/main/scala/main/StreamingApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package main

import caseclass._
import dao.{SearchDao, TypeDao}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import utils.DateUtil

import scala.collection.mutable.ListBuffer

object StreamingApp {

def main(args: Array[String]): Unit = {
val ssc = new StreamingContext("local[*]", "StreamingApp", Seconds(5))

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "mini1:9092,mini1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("flumeTopic")
val logs = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
).map(_.value())
// logs.print()
//156.187.29.132 2017-11-20 00:39:26 "GET www/2 HTTP/1.0" https://search.yahoo.com/search?p=猎场 200

// 清洗日志
val cleanLog = logs.map(line => {
val splits = line.split("\t")
val ip = splits(0)
val time = DateUtil.parse(splits(1))
val str = splits(2).split(" ")(1)
var id = 0
if(str.startsWith("www")) {
id = str.split("/")(1).toInt
}
val url = splits(3)
val statusCode = splits(4).toInt
ClickLog(ip, time, id, url, statusCode)
}).filter(log => log.id != 0)
// cleanLog.print()
// ClickLog(187.29.10.124,20180917,6,https://search.yahoo.com/search?p=猎场,404)
// ClickLog(124.143.132.30,20180917,4,-,302)

// 统计每个类别访问量,并保存到数据库
cleanLog.map(log => {
(log.time+"_"+log.id, 1)
// (20180917_6, 1)
}).reduceByKey(_+_).foreachRDD(rdd => {
rdd.foreachPartition(partition => {
val list = new ListBuffer[Type]
partition.foreach(record => {
list.append(Type(record._1, record._2))
})
TypeDao.save(list)
})
})

// 统计从搜索引擎引流过来的类别的访问量,并保存到数据库
cleanLog.map(log => {
val url = log.url.replace("//", "/")
val splits = url.split("/")
var host = ""
if(splits.length > 2){
host = splits(1)
}
(host, log.time, log.id)
// (search.yahoo.com,20180917,6)
}).filter(x => x._1 != "").map(x =>
(x._2+"_"+x._1+"_"+x._3, 1)
// (20180917_search.yahoo.com_6, 1)
).reduceByKey(_+_).foreachRDD(rdd => {
rdd.foreachPartition(partition => {
val list = new ListBuffer[Search]
partition.foreach(record => {
list.append(Search(record._1, record._2))
})
SearchDao.save(list)
})
})

ssc.start()
ssc.awaitTermination()
}
}
23 changes: 23 additions & 0 deletions Aiqiyi_SparkStreaming/src/main/scala/utils/DateUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package utils

import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat

object DateUtil {

val source = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
val target = FastDateFormat.getInstance("yyyyMMdd")

def parse(time: String): String = {
return target.format(new Date(getTime(time)))
}

def getTime(time: String) = {
source.parse(time).getTime
}

def main(args: Array[String]): Unit = {
print(parse("2017-05-11 11:20:54"))
}
}

50 changes: 50 additions & 0 deletions Aiqiyi_SparkStreaming/src/main/scala/utils/HBaseUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package utils

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HTable, Put}
import org.apache.hadoop.hbase.util.Bytes

object HBaseUtil {

def getConf() = {
var conf: Configuration = null
conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "mini1:2181")
conf.set("hbase.rootdir", "hdfs://mini1:9000/hbase")
conf
}

def getTable(tableName: String): HTable = {
var table: HTable = null
try {
table = new HTable(getConf(), tableName)
}
catch {
case e: Exception => e.printStackTrace()
}
table
}

def put(tableName: String, rowKey: String, cf: String, col: String, value: Long): Unit = {
var table = getTable(tableName)
var put = new Put(Bytes.toBytes(rowKey))
put.add(Bytes.toBytes(cf), Bytes.toBytes(col), Bytes.toBytes(value))
try {
table.put(put)
}
catch {
case e: Exception => e.printStackTrace()
}
}

def main(args: Array[String]): Unit = {
val tableName = "type"
val rowkey = "20180919_2"
val cf = "info"
val colum = "typeCount"
val value = 592
HBaseUtil.put(tableName, rowkey, cf, colum, value)
}
}

Loading

0 comments on commit 062fef5

Please sign in to comment.