
本文记录了在工作中使用swing算法实现i2i的相关代码内容,如果做相关工作可以邮件和我联系 liangz1996@hotmail.com
itemcf相关内容参考之前spark实现itemcf-附scala代码
package *
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import scala.math.{pow, sqrt}
import scala.util.Random
object CollaborativeFilteringUtils extends Serializable {
def getI2IMap(orgDf: DataFrame,
sign: Int = 1,
topN: Int = 100,
minSimScore: Double = -10000D,
whiteSet: Broadcast[Set[String]] = null,
blackSet: Broadcast[Set[String]] = null): RDD[(String, Map[String, Double])] = {
orgDf
.rdd
.flatMap(r => Array(
(r.getString(0), (r.getString(1), sign * r.getDouble(2))), // AB BA 都需要计算相似度
(r.getString(1), (r.getString(0), sign * r.getDouble(2)))
))
.filter(row => row._2._2 >= minSimScore)
.filter(row => {
var flag = true
if (whiteSet != null)
flag = whiteSet.value.contains(row._2._1)
flag
})
.filter(row => {
var flag = true
if (blackSet != null)
flag = !blackSet.value.contains(row._2._1)
flag
})
.distinct()
.groupByKey()
.map(row => {
val id = row._1
val map = row._2.toArray.sortBy(_._2).reverse.slice(0, topN).toMap
(id, map)
})
}
def metrics(sparkSession: SparkSession,
input: DataFrame,
output: DataFrame,
i2iType: String): Unit = {
println(s"当前i2i的计算方式是${i2iType}")
import sparkSession.implicits._
println(s"当前日志中共有${input.selectExpr("user").distinct().count()}个user")
println(s"当前日志中共有${input.selectExpr("item").distinct().count()}个item")
val temp1 = output
.rdd
.flatMap(row => Array(
(row.getString(0), row.getString(1), row.getDouble(2)),
(row.getString(1), row.getString(0), row.getDouble(2))
))
.toDF("item1", "item2", "sim")
println(s"有${temp1.selectExpr("item1").distinct().count()}个item可以进行i2i召回")
val temp2 = temp1.groupBy("item1").agg(countDistinct("item2").as("count"), min("sim").as("minSim"), max("sim").as("maxSim"))
val maxMinItem2: Array[(Long, Long)] = temp2.agg(max("count"), min("count"))
.collect()
.map(row => (row.getLong(0), row.getLong(1)))
println(s"一个item最多有${maxMinItem2(0)._1}个可召回结果,最少有${maxMinItem2(0)._2}个结果")
val maxMinSim: Array[(Double, Double)] = temp2.agg(max("maxSim"), min("minSim"))
.collect()
.map(row => (row.getDouble(0), row.getDouble(1)))
println(s"两个item之间最大的相似度是${maxMinSim(0)._1},最小的是${maxMinSim(0)._2}n")
}
def itemCF(spark: SparkSession,
orgDf: DataFrame,
version: String = "V1"): DataFrame = {
import spark.implicits._
val multiply2ColUDF: UserDefinedFunction = udf((a: Double, b: Double) => a * b)
val divide2ColUDF: UserDefinedFunction = udf((a: Double, b: Double, c: Double) => a / (b * c))
val divide1ColUDF: UserDefinedFunction = udf((a: Double, b: Double, c: Double) => a * b / math.sqrt(c))
val column = orgDf.columns
val userItemScore = orgDf
.selectExpr(s"cast(${column(0)} as string) as user", s"cast(${column(1)} as string) as item", s"cast(${column(2)} as double) as score")
.cache()
// item的向量模长
val itemLength = userItemScore
.rdd
.map(row => (row.getString(1), row.getDouble(2)))
.filter(row => row._2 != 0)
.groupByKey()
.mapValues(x => sqrt(x.toArray.map(ii => pow(ii, 2)).sum))
.toDF("item", "length")
var resultDf: DataFrame = null
version match {
case "V1" => {
// 计算cos距离的分子
val itemPairNumerator = userItemScore.selectExpr("user", "item as item1", "score as score1")
.join(userItemScore.selectExpr("user", "item as item2", "score as score2"), Seq("user"))
// 只要当一个用户V对两个商品都有行为时,在计算cos距离时,在第V个位置上才会计算出非0
.where("item1 < item2")
.withColumn("temp", multiply2ColUDF(col("score1"), col("score2")))
.groupBy("item1", "item2").agg(sum("temp").as("numerator"))
resultDf = itemPairNumerator
.join(itemLength.selectExpr("item as item1", "length as length1"), Seq("item1"))
.join(itemLength.selectExpr("item as item2", "length as length2"), Seq("item1"))
// 两个item之间的相似度,即用全局用户组成的向量之间的cos距离,cos距离见 https://blog.csdn.net/m0_37192554/article/details/107359291
.withColumn("sim", divide2ColUDF(col("numerator"), col("length1"), col("length2")))
.select("item1", "item2", "sim")
}
case "V2" => {
// 结合swing的方法,在计算两个item之间相似度时,对重度用户做了惩罚
val userItemSet = userItemScore
.rdd
.map(row => (row.getAs[String]("user"), row.getAs[String]("item")))
.groupByKey()
.map(row => (row._1, row._2.toSet.size))
.toDF("user", "setSize") // user,userSetSize
.where("setSize > 0")
.repartition(1000)
val itemPairNumerator = userItemScore.selectExpr("user", "item as item1", "score as score1")
.join(userItemScore.selectExpr("user", "item as item2", "score as score2"), Seq("user"))
.join(userItemSet, Seq("user"))
// 只要当一个用户V对两个商品都有行为时,在计算cos距离时,在第V个位置上才会计算出非0
.where("item1 < item2")
.withColumn("temp", divide1ColUDF(col("score1"), col("score2"), col("setSize")))
.groupBy(s"item1", s"item2").agg(sum("temp").as("numerator"))
resultDf = itemPairNumerator
.join(itemLength.selectExpr("item as item1", "length as length1"), Seq("item1"))
.join(itemLength.selectExpr("item as item2", "length as length2"), Seq("item1"))
// 两个item之间的相似度,即用全局用户组成的向量之间的cos距离,cos距离见 https://blog.csdn.net/m0_37192554/article/details/107359291
.withColumn("sim", divide2ColUDF(col("numerator"), col("length1"), col("length2")))
.select("item1", s"item2", "sim")
}
}
resultDf
}
def swing(sparkSession: SparkSession,
orgDf: DataFrame,
alpha: Int = 1,
minCoUser: Long = -1L,
topItemPair: Int = 100000,
isFilter: Boolean = false,
version: String = "V1"): DataFrame = {
import sparkSession.implicits._
val joinStringUDF: UserDefinedFunction = udf((a: String, b: String) => a + "_" + b)
val joinDoubleUDF: UserDefinedFunction = udf((a: Double, b: Double) => a.toString + "_" + b.toString)
def splitDouble(s: String): Array[Double] = s.split("_").map(_.toDouble)
val column = orgDf.columns
val userItemScore = orgDf
.selectExpr(s"cast(${column(0)} as string) as user", s"cast(${column(1)} as string) as item", s"cast(${column(2)} as double) as score") // user,item,score
.repartition(1000)
.cache()
val itemLength = userItemScore // 每个item的行为分模长
.rdd
.map(row => (row.getAs[String]("item"), row.getAs[Double]("score")))
.groupByKey()
.mapValues(row => sqrt(row.toArray.map(ii => pow(ii, 2)).sum))
.toDF(s"item", "length")
val userItemPair = userItemScore.selectExpr("item as item1", "user", "score as score1")
.join(userItemScore.selectExpr("item as item2", "user", "score as score2"), Seq("user"))
.where("item1 < item2") // AB BA 只要一次
.join(itemLength.selectExpr("item as item1", "length as length1"), Seq("item1"))
.join(itemLength.selectExpr("item as item2", "length as length2"), Seq("item2"))
.withColumn("itemPair", joinStringUDF(col("item1"), col("item2")))
.withColumn("scorePair", joinDoubleUDF(col("score1"), col("score2")))
.withColumn("lengthPair", joinDoubleUDF(col("length1"), col("length2")))
.selectExpr("user", "itemPair", "scorePair", "lengthPair") // user对两个item都有行为,其中scorePair是两个行为的分数
.distinct()
val itemPairCount = userItemPair
.groupBy("itemPair").count()
val minCount = // 每个item对儿之间,最少需要有多少个用户对这两个item都有过行为
if (minCoUser != -1L)
minCoUser
else itemPairCount
.sort(desc("count"))
.selectExpr("count")
.limit(topItemPair)
.rdd
.map(_.getLong(0))
.collect()(topItemPair - 1)
println(s"两个item必须有${minCount}个用户都有行为时,这两个item才会被计算相似度n")
val itemPairUser = userItemPair
.join(
itemPairCount.where(s"count >= $minCount"), Seq("itemPair") // 只要共现频次高的
)
.filter(row => { // 是否基于比例进行采样
var flag = true
if (isFilter) {
val ratio = minCount.toDouble / row.getAs[Long]("count").toDouble
flag = ratio > Random.nextDouble
}
flag
})
.repartition(1000)
val itemPairUserPair = itemPairUser.selectExpr("itemPair", "user as user1", "scorePair as scorePair1", "lengthPair")
.join(itemPairUser.selectExpr("itemPair", "user as user2", "scorePair as scorePair2", "lengthPair"), Seq("itemPair"))
.where("user1 <= user2")
.withColumn("userPair", joinStringUDF(col("user1"), col("user2")))
var itemSim: DataFrame = null
version match {
case "V1" => {
itemSim = itemPairUserPair
// userPair,itemPair,setSizePair,scorePair1,scorePair2
// 这两个用户,对于这两个item都有过正向行为,构成了一个swing结构,其中setSizePair是这两个用户的item集合大小,scorePair1是用户1对两个item的行为分
.rdd
.map(row => (row.getAs[String]("userPair"), row.getAs[String]("itemPair")))
.repartition(10000)
.groupByKey()
.filter(row => row._2.nonEmpty)
.flatMap(row => {
val swing = 1D / (alpha + row._2.size).toDouble // 基础swing分
row._2.map(ii => (ii, swing))
})
.repartition(10000)
.reduceByKey(_ + _) // 对于两个item而言,将所有的swing得分求和,即是两者之间的相似度
.map(row => {
val temp = row._1.split("_")
(temp(0), temp(1), row._2)
})
.toDF("item1", "item2", "sim")
}
case "V2" => {
val userItemSet = userItemScore
.rdd
.map(row => (row.getAs[String]("user"), row.getAs[String]("item")))
.groupByKey()
.map(row => (row._1, row._2.toSet.size))
.toDF("user", "setSize") // user,userSetSize
.where("setSize > 0")
.repartition(1000)
itemSim = itemPairUserPair
.join(userItemSet.selectExpr("user as user1", "setSize as setSize1"), Seq("user1"))
.join(userItemSet.selectExpr("user as user2", "setSize as setSize2"), Seq("user2"))
.withColumn("setSizePair", joinDoubleUDF(col("setSize1"), col("setSize2")))
.rdd
.map(row => ((row.getAs[String]("userPair"), row.getAs[String]("setSizePair")), row.getAs[String]("itemPair")))
.repartition(10000)
.groupByKey()
.filter(row => row._2.nonEmpty)
.flatMap(row => {
val Array(setSize1, setSize2) = splitDouble(row._1._2)
val W_uv = (1D / math.sqrt(setSize1)) * (1D / math.sqrt(setSize2)) // 用户权重
val swing = 1D / (alpha + row._2.size).toDouble // 基础swing分
row._2.map(ii => (ii, W_uv * swing))
})
.repartition(10000)
.reduceByKey(_ + _) // 对于两个item而言,将所有的swing得分求和,即是两者之间的相似度
.map(row => {
val temp = row._1.split("_")
(temp(0), temp(1), row._2)
})
.toDF("item1", "item2", "sim")
}
case "V3" => {
val userItemSet = userItemScore
.rdd
.map(row => (row.getAs[String]("user"), row.getAs[String]("item")))
.groupByKey()
.map(row => (row._1, row._2.toSet.size))
.toDF("user", "setSize") // user,userSetSize
.where("setSize > 0")
.repartition(1000)
itemSim = itemPairUserPair
.join(userItemSet.selectExpr("user as user1", "setSize as setSize1"), Seq("user1"))
.join(userItemSet.selectExpr("user as user2", "setSize as setSize2"), Seq("user2"))
.withColumn("setSizePair", joinDoubleUDF(col("setSize1"), col("setSize2")))
.rdd
.map(row =>
(
(row.getAs[String]("userPair"), row.getAs[String]("setSizePair")),
(row.getAs[String]("itemPair"), row.getAs[String]("scorePair1"), row.getAs[String]("scorePair2"), row.getAs[String]("lengthPair"))
)
)
.repartition(10000)
.groupByKey()
.filter(row => row._2.nonEmpty)
.flatMap(row => {
val Array(setSize1, setSize2) = splitDouble(row._1._2)
row._2.map(ii => {
val Array(user1Item1Score, user1Item2Score) = splitDouble(ii._2)
val Array(user2Item1Score, user2Item2Score) = splitDouble(ii._3)
val Array(length1, length2) = splitDouble(ii._4)
val W_uv = (1D / math.sqrt(setSize1)) * (1D / math.sqrt(setSize2)) // 用户权重
val swing = 1D / (alpha + row._2.size).toDouble // 基础swing分
val S_uvij = (user1Item1Score * user2Item1Score + user1Item2Score * user2Item2Score) / math.sqrt(length1 * length2)
(ii._1, S_uvij * W_uv * swing)
})
})
.repartition(10000)
.reduceByKey(_ + _) // 对于两个item而言,将所有的swing得分求和,即是两者之间的相似度
.map(row => {
val temp = row._1.split("_")
(temp(0), temp(1), row._2)
})
.toDF("item1", "item2", "sim")
}
}
// metrics(sparkSession, userItemScore, itemSim, s"swing_$version")
itemSim
}
}