栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 前沿技术 > 云计算 > 云平台

spark实现swing算法 -附Scala代码

云平台 更新时间:发布时间: 百科书网 趣学号

本文记录了在工作中使用swing算法实现i2i的相关代码内容,如果做相关工作可以邮件和我联系 liangz1996@hotmail.com
itemcf相关内容参考之前spark实现itemcf-附scala代码

package *

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._

import scala.math.{pow, sqrt}
import scala.util.Random


object CollaborativeFilteringUtils extends Serializable {

    
    def getI2IMap(orgDf: DataFrame,
                  sign: Int = 1,
                  topN: Int = 100,
                  minSimScore: Double = -10000D,
                  whiteSet: Broadcast[Set[String]] = null,
                  blackSet: Broadcast[Set[String]] = null): RDD[(String, Map[String, Double])] = {
        orgDf
            .rdd
            .flatMap(r => Array(
                (r.getString(0), (r.getString(1), sign * r.getDouble(2))), // AB BA 都需要计算相似度
                (r.getString(1), (r.getString(0), sign * r.getDouble(2)))
            ))
            .filter(row => row._2._2 >= minSimScore)
            .filter(row => {
                var flag = true
                if (whiteSet != null)
                    flag = whiteSet.value.contains(row._2._1)
                flag
            })
            .filter(row => {
                var flag = true
                if (blackSet != null)
                    flag = !blackSet.value.contains(row._2._1)
                flag
            })
            .distinct()
            .groupByKey()
            .map(row => {
                val id = row._1
                val map = row._2.toArray.sortBy(_._2).reverse.slice(0, topN).toMap
                (id, map)
            })
    }


    
    def metrics(sparkSession: SparkSession,
                input: DataFrame,
                output: DataFrame,
                i2iType: String): Unit = {
        println(s"当前i2i的计算方式是${i2iType}")
        import sparkSession.implicits._
        println(s"当前日志中共有${input.selectExpr("user").distinct().count()}个user")
        println(s"当前日志中共有${input.selectExpr("item").distinct().count()}个item")
        val temp1 = output
            .rdd
            .flatMap(row => Array(
                (row.getString(0), row.getString(1), row.getDouble(2)),
                (row.getString(1), row.getString(0), row.getDouble(2))
            ))
            .toDF("item1", "item2", "sim")
        println(s"有${temp1.selectExpr("item1").distinct().count()}个item可以进行i2i召回")
        val temp2 = temp1.groupBy("item1").agg(countDistinct("item2").as("count"), min("sim").as("minSim"), max("sim").as("maxSim"))
        val maxMinItem2: Array[(Long, Long)] = temp2.agg(max("count"), min("count"))
            .collect()
            .map(row => (row.getLong(0), row.getLong(1)))
        println(s"一个item最多有${maxMinItem2(0)._1}个可召回结果,最少有${maxMinItem2(0)._2}个结果")
        val maxMinSim: Array[(Double, Double)] = temp2.agg(max("maxSim"), min("minSim"))
            .collect()
            .map(row => (row.getDouble(0), row.getDouble(1)))
        println(s"两个item之间最大的相似度是${maxMinSim(0)._1},最小的是${maxMinSim(0)._2}n")
    }


    
    def itemCF(spark: SparkSession,
               orgDf: DataFrame,
               version: String = "V1"): DataFrame = {
        import spark.implicits._
        val multiply2ColUDF: UserDefinedFunction = udf((a: Double, b: Double) => a * b)
        val divide2ColUDF: UserDefinedFunction = udf((a: Double, b: Double, c: Double) => a / (b * c))
        val divide1ColUDF: UserDefinedFunction = udf((a: Double, b: Double, c: Double) => a * b / math.sqrt(c))
        val column = orgDf.columns
        val userItemScore = orgDf
            .selectExpr(s"cast(${column(0)} as string) as user", s"cast(${column(1)} as string) as item", s"cast(${column(2)} as double) as score")
            .cache()

        // item的向量模长
        val itemLength = userItemScore
            .rdd
            .map(row => (row.getString(1), row.getDouble(2)))
            .filter(row => row._2 != 0)
            .groupByKey()
            .mapValues(x => sqrt(x.toArray.map(ii => pow(ii, 2)).sum))
            .toDF("item", "length")

        var resultDf: DataFrame = null
        version match {
            case "V1" => {
                // 计算cos距离的分子
                val itemPairNumerator = userItemScore.selectExpr("user", "item as item1", "score as score1")
                    .join(userItemScore.selectExpr("user", "item as item2", "score as score2"), Seq("user"))
                    // 只要当一个用户V对两个商品都有行为时,在计算cos距离时,在第V个位置上才会计算出非0
                    .where("item1 < item2")
                    .withColumn("temp", multiply2ColUDF(col("score1"), col("score2")))
                    .groupBy("item1", "item2").agg(sum("temp").as("numerator"))
                resultDf = itemPairNumerator
                    .join(itemLength.selectExpr("item as item1", "length as length1"), Seq("item1"))
                    .join(itemLength.selectExpr("item as item2", "length as length2"), Seq("item1"))
                    // 两个item之间的相似度,即用全局用户组成的向量之间的cos距离,cos距离见 https://blog.csdn.net/m0_37192554/article/details/107359291
                    .withColumn("sim", divide2ColUDF(col("numerator"), col("length1"), col("length2")))
                    .select("item1", "item2", "sim")
            }
            case "V2" => {
                // 结合swing的方法,在计算两个item之间相似度时,对重度用户做了惩罚
                val userItemSet = userItemScore
                    .rdd
                    .map(row => (row.getAs[String]("user"), row.getAs[String]("item")))
                    .groupByKey()
                    .map(row => (row._1, row._2.toSet.size))
                    .toDF("user", "setSize") // user,userSetSize
                    .where("setSize > 0")
                    .repartition(1000)
                val itemPairNumerator = userItemScore.selectExpr("user", "item as item1", "score as score1")
                    .join(userItemScore.selectExpr("user", "item as item2", "score as score2"), Seq("user"))
                    .join(userItemSet, Seq("user"))
                    // 只要当一个用户V对两个商品都有行为时,在计算cos距离时,在第V个位置上才会计算出非0
                    .where("item1 < item2")
                    .withColumn("temp", divide1ColUDF(col("score1"), col("score2"), col("setSize")))
                    .groupBy(s"item1", s"item2").agg(sum("temp").as("numerator"))
                resultDf = itemPairNumerator
                    .join(itemLength.selectExpr("item as item1", "length as length1"), Seq("item1"))
                    .join(itemLength.selectExpr("item as item2", "length as length2"), Seq("item1"))
                    // 两个item之间的相似度,即用全局用户组成的向量之间的cos距离,cos距离见 https://blog.csdn.net/m0_37192554/article/details/107359291
                    .withColumn("sim", divide2ColUDF(col("numerator"), col("length1"), col("length2")))
                    .select("item1", s"item2", "sim")
            }
        }
        resultDf
    }


    
    def swing(sparkSession: SparkSession,
              orgDf: DataFrame,
              alpha: Int = 1,
              minCoUser: Long = -1L,
              topItemPair: Int = 100000,
              isFilter: Boolean = false,
              version: String = "V1"): DataFrame = {
        import sparkSession.implicits._
        val joinStringUDF: UserDefinedFunction = udf((a: String, b: String) => a + "_" + b)
        val joinDoubleUDF: UserDefinedFunction = udf((a: Double, b: Double) => a.toString + "_" + b.toString)

        def splitDouble(s: String): Array[Double] = s.split("_").map(_.toDouble)

        val column = orgDf.columns
        val userItemScore = orgDf
            .selectExpr(s"cast(${column(0)} as string) as user", s"cast(${column(1)} as string) as item", s"cast(${column(2)} as double) as score") // user,item,score
            .repartition(1000)
            .cache()
        val itemLength = userItemScore // 每个item的行为分模长
            .rdd
            .map(row => (row.getAs[String]("item"), row.getAs[Double]("score")))
            .groupByKey()
            .mapValues(row => sqrt(row.toArray.map(ii => pow(ii, 2)).sum))
            .toDF(s"item", "length")
        val userItemPair = userItemScore.selectExpr("item as item1", "user", "score as score1")
            .join(userItemScore.selectExpr("item as item2", "user", "score as score2"), Seq("user"))
            .where("item1 < item2") // AB BA 只要一次
            .join(itemLength.selectExpr("item as item1", "length as length1"), Seq("item1"))
            .join(itemLength.selectExpr("item as item2", "length as length2"), Seq("item2"))
            .withColumn("itemPair", joinStringUDF(col("item1"), col("item2")))
            .withColumn("scorePair", joinDoubleUDF(col("score1"), col("score2")))
            .withColumn("lengthPair", joinDoubleUDF(col("length1"), col("length2")))
            .selectExpr("user", "itemPair", "scorePair", "lengthPair") // user对两个item都有行为,其中scorePair是两个行为的分数
            .distinct()

        val itemPairCount = userItemPair
            .groupBy("itemPair").count()
        val minCount = // 每个item对儿之间,最少需要有多少个用户对这两个item都有过行为
            if (minCoUser != -1L)
                minCoUser
            else itemPairCount
                .sort(desc("count"))
                .selectExpr("count")
                .limit(topItemPair)
                .rdd
                .map(_.getLong(0))
                .collect()(topItemPair - 1)
        println(s"两个item必须有${minCount}个用户都有行为时,这两个item才会被计算相似度n")

        val itemPairUser = userItemPair
            .join(
                itemPairCount.where(s"count >= $minCount"), Seq("itemPair") // 只要共现频次高的
            )
            .filter(row => { // 是否基于比例进行采样
                var flag = true
                if (isFilter) {
                    val ratio = minCount.toDouble / row.getAs[Long]("count").toDouble
                    flag = ratio > Random.nextDouble
                }
                flag
            })
            .repartition(1000)
        val itemPairUserPair = itemPairUser.selectExpr("itemPair", "user as user1", "scorePair as scorePair1", "lengthPair")
            .join(itemPairUser.selectExpr("itemPair", "user as user2", "scorePair as scorePair2", "lengthPair"), Seq("itemPair"))
            .where("user1 <= user2")
            .withColumn("userPair", joinStringUDF(col("user1"), col("user2")))

        var itemSim: DataFrame = null
        version match {
            case "V1" => {
                itemSim = itemPairUserPair
                    // userPair,itemPair,setSizePair,scorePair1,scorePair2
                    // 这两个用户,对于这两个item都有过正向行为,构成了一个swing结构,其中setSizePair是这两个用户的item集合大小,scorePair1是用户1对两个item的行为分
                    .rdd
                    .map(row => (row.getAs[String]("userPair"), row.getAs[String]("itemPair")))
                    .repartition(10000)
                    .groupByKey()
                    .filter(row => row._2.nonEmpty)
                    .flatMap(row => {
                        val swing = 1D / (alpha + row._2.size).toDouble // 基础swing分
                        row._2.map(ii => (ii, swing))
                    })
                    .repartition(10000)
                    .reduceByKey(_ + _) // 对于两个item而言,将所有的swing得分求和,即是两者之间的相似度
                    .map(row => {
                        val temp = row._1.split("_")
                        (temp(0), temp(1), row._2)
                    })
                    .toDF("item1", "item2", "sim")
            }
            case "V2" => {
                val userItemSet = userItemScore
                    .rdd
                    .map(row => (row.getAs[String]("user"), row.getAs[String]("item")))
                    .groupByKey()
                    .map(row => (row._1, row._2.toSet.size))
                    .toDF("user", "setSize") // user,userSetSize
                    .where("setSize > 0")
                    .repartition(1000)
                itemSim = itemPairUserPair
                    .join(userItemSet.selectExpr("user as user1", "setSize as setSize1"), Seq("user1"))
                    .join(userItemSet.selectExpr("user as user2", "setSize as setSize2"), Seq("user2"))
                    .withColumn("setSizePair", joinDoubleUDF(col("setSize1"), col("setSize2")))
                    .rdd
                    .map(row => ((row.getAs[String]("userPair"), row.getAs[String]("setSizePair")), row.getAs[String]("itemPair")))
                    .repartition(10000)
                    .groupByKey()
                    .filter(row => row._2.nonEmpty)
                    .flatMap(row => {
                        val Array(setSize1, setSize2) = splitDouble(row._1._2)
                        val W_uv = (1D / math.sqrt(setSize1)) * (1D / math.sqrt(setSize2)) // 用户权重
                        val swing = 1D / (alpha + row._2.size).toDouble // 基础swing分
                        row._2.map(ii => (ii, W_uv * swing))
                    })
                    .repartition(10000)
                    .reduceByKey(_ + _) // 对于两个item而言,将所有的swing得分求和,即是两者之间的相似度
                    .map(row => {
                        val temp = row._1.split("_")
                        (temp(0), temp(1), row._2)
                    })
                    .toDF("item1", "item2", "sim")
            }
            case "V3" => {
                val userItemSet = userItemScore
                    .rdd
                    .map(row => (row.getAs[String]("user"), row.getAs[String]("item")))
                    .groupByKey()
                    .map(row => (row._1, row._2.toSet.size))
                    .toDF("user", "setSize") // user,userSetSize
                    .where("setSize > 0")
                    .repartition(1000)
                itemSim = itemPairUserPair
                    .join(userItemSet.selectExpr("user as user1", "setSize as setSize1"), Seq("user1"))
                    .join(userItemSet.selectExpr("user as user2", "setSize as setSize2"), Seq("user2"))
                    .withColumn("setSizePair", joinDoubleUDF(col("setSize1"), col("setSize2")))
                    .rdd
                    .map(row =>
                        (
                            (row.getAs[String]("userPair"), row.getAs[String]("setSizePair")),
                            (row.getAs[String]("itemPair"), row.getAs[String]("scorePair1"), row.getAs[String]("scorePair2"), row.getAs[String]("lengthPair"))
                        )
                    )
                    .repartition(10000)
                    .groupByKey()
                    .filter(row => row._2.nonEmpty)
                    .flatMap(row => {
                        val Array(setSize1, setSize2) = splitDouble(row._1._2)
                        row._2.map(ii => {
                            val Array(user1Item1Score, user1Item2Score) = splitDouble(ii._2)
                            val Array(user2Item1Score, user2Item2Score) = splitDouble(ii._3)
                            val Array(length1, length2) = splitDouble(ii._4)
                            val W_uv = (1D / math.sqrt(setSize1)) * (1D / math.sqrt(setSize2)) // 用户权重
                            val swing = 1D / (alpha + row._2.size).toDouble // 基础swing分
                            
                            val S_uvij = (user1Item1Score * user2Item1Score + user1Item2Score * user2Item2Score) / math.sqrt(length1 * length2)
                            (ii._1, S_uvij * W_uv * swing)
                        })
                    })
                    .repartition(10000)
                    .reduceByKey(_ + _) // 对于两个item而言,将所有的swing得分求和,即是两者之间的相似度
                    .map(row => {
                        val temp = row._1.split("_")
                        (temp(0), temp(1), row._2)
                    })
                    .toDF("item1", "item2", "sim")
            }
        }
        //        metrics(sparkSession, userItemScore, itemSim, s"swing_$version")
        itemSim
    }

}

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/1071837.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号