Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
zhonglangjiang committed Dec 5, 2019
1 parent f706b7f commit a420d46
Show file tree
Hide file tree
Showing 26 changed files with 46,197 additions and 11 deletions.
54 changes: 54 additions & 0 deletions recommender/ContentRecommender/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>recommender</artifactId>
<groupId>com.bigdata</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>ContentRecommender</artifactId>
<dependencies>

<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
</dependency>

<!-- Spark的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
</dependency>
<!-- 引入Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>

<!-- 加入MongoDB的驱动 -->
<!-- 用于代码方式连接MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用于Spark和MongoDB的对接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
log4j.rootLogger=warn, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.atguigu.content

import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix

/**
* Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
*
* Project: ECommerceRecommendSystem
* Package: com.atguigu.content
* Version: 1.0
*
* Created by wushengran on 2019/4/29 9:08
*/

case class Product( productId: Int, name: String, imageUrl: String, categories: String, tags: String )
case class MongoConfig( uri: String, db: String )

// 定义标准推荐对象
case class Recommendation( productId: Int, score: Double )

// 定义商品相似度列表
case class ProductRecs( productId: Int, recs: Seq[Recommendation] )

object ContentRecommender {
// 定义mongodb中存储的表名
val MONGODB_PRODUCT_COLLECTION = "Product"
val CONTENT_PRODUCT_RECS = "ContentBasedProductRecs"

def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
// 创建一个spark config
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ContentRecommender")
// 创建spark session
val spark = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits._
implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )

// 载入数据,做预处理
val productTagsDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_PRODUCT_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Product]
.map(
x => ( x.productId, x.name, x.tags.map(c=> if(c=='|') ' ' else c) )
)
.toDF("productId", "name", "tags")
.cache()

// TODO: 用TF-IDF提取商品特征向量
// 1. 实例化一个分词器,用来做分词,默认按照空格分
val tokenizer = new Tokenizer().setInputCol("tags").setOutputCol("words")
// 用分词器做转换,得到增加一个新列words的DF
val wordsDataDF = tokenizer.transform(productTagsDF)

// 2. 定义一个HashingTF工具,计算频次
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(800)
val featurizedDataDF = hashingTF.transform(wordsDataDF)

// 3. 定义一个IDF工具,计算TF-IDF
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
// 训练一个idf模型
val idfModel = idf.fit(featurizedDataDF)
// 得到增加新列features的DF
val rescaledDataDF = idfModel.transform(featurizedDataDF)

// 对数据进行转换,得到RDD形式的features
val productFeatures = rescaledDataDF.map{
row => ( row.getAs[Int]("productId"), row.getAs[SparseVector]("features").toArray )
}
.rdd
.map{
case (productId, features) => ( productId, new DoubleMatrix(features) )
}

// 两两配对商品,计算余弦相似度
val productRecs = productFeatures.cartesian(productFeatures)
.filter{
case (a, b) => a._1 != b._1
}
// 计算余弦相似度
.map{
case (a, b) =>
val simScore = consinSim( a._2, b._2 )
( a._1, ( b._1, simScore ) )
}
.filter(_._2._2 > 0.4)
.groupByKey()
.map{
case (productId, recs) =>
ProductRecs( productId, recs.toList.sortWith(_._2>_._2).map(x=>Recommendation(x._1,x._2)) )
}
.toDF()
productRecs.write
.option("uri", mongoConfig.uri)
.option("collection", CONTENT_PRODUCT_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()

spark.stop()
}
def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double ={
product1.dot(product2)/ ( product1.norm2() * product2.norm2() )
}
}
41 changes: 41 additions & 0 deletions recommender/DataLoader/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>recommender</artifactId>
<groupId>com.bigdata</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>DataLoader</artifactId>
<dependencies>
<!-- Spark的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<!-- 引入Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入MongoDB的驱动 -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>

</project>
4 changes: 4 additions & 0 deletions recommender/DataLoader/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
Loading

0 comments on commit a420d46

Please sign in to comment.