算法设计与实现 基于物品的协同过滤又称Item-Based CF. 基于Spark的Item-Based CF算法其实现原理和步骤与经典方法基本一致,不同的地方主要在于具体步骤内的并行化计算。
相似度算法 在Spark MLlib中提供了余弦相似度的分布式实现,org.apache.spark.mllib.linalg.distributed包中的IndexedRowMatrix是一个分布式矩阵类,其中提供了一个columnSimilarities方法用于计算该矩阵各列之间的余弦相似度。
预测值计算 采用加权求和的方法计算预测值.
实现步骤: 步骤分解 Step 1:读取用户评分数据,设用户数为U,物品数目为I,将数据转换为以用户为行,物品为列的二维矩阵R,R维度为(U×I)。矩阵的每一个数据表示某个用户对特定物品的评分。
Step 2:计算R每列之间的相似度,可以得到维度(I×I)的矩阵S。S(i,j)表示物品i和物品j之间的相似度。
Step 3:使用预测值计算公式计算用户对未评分物品的预测评分。得到预测评分矩阵P,P维度为(U×I),P(i,j) 表示用户i对物品j的预测评分。
Step 4:对用户i进行推荐。找出P的第i行中评分最高的前K个物品推荐给用户。K是需要推荐的物品数量。
基于Spark的实现 鉴于从Spark 2.0.0开始基于Dataset的API成为了主要编程API,本文采用Dataset的API进行实现,使用的语言为Scala。基于Spark平台Item-Based CF可以分为4步实现:
Step 1:读取评分数据集,这里是从Hive数据仓库读取数据,udata是表名,userId、itemId、rating是3个字段,分别是[用户ID,物品ID,评分]。这里按照8:2的比例将数据集分为训练集和测试集。
1 2 3 4 5 6 7 8 9 def dataSet (): (DataFrame , DataFrame ) = { val table = " udata" val df = spark.sql(s"select userId,itemId,rating from test.$table " ) val Array (training, test) = df.randomSplit(Array (0.8 , 0.2 )) training.cache() test.cache() (training, test) }
Step 2:将数据集转换成以用户为行、物品为列的二维评分矩阵,矩阵的每一个行是一个用户对所有物品的评分。然后求出评分矩阵每列之间的相似度,得到一个以物品ID为行和列,以相似度为数据的矩阵,这个矩阵就是物品相似度矩阵。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def parseToMatrix (data: DataFrame ): CoordinateMatrix = { val parsedData = data.rdd.map { case Row (user: Int , item: Int , rate: Int ) => MatrixEntry (user, item, rate.toDouble) } new CoordinateMatrix (parsedData) } def standardCosine (matrix: CoordinateMatrix ): RDD [MatrixEntry ] = { val similarity = matrix.toIndexedRowMatrix().columnSimilarities() val sim = similarity.entries sim }
Step 3:计算测试集相似物品表。将测试集与训练集、物品相似度表进行左联接,得到测试集相似物品表,字段为[用户ID,物品ID,实际评分,相似物品评分,相似度]。然后将该表注册成临时表testAndSim,并且缓存起来,供下一步使用。
1 2 3 4 5 6 7 8 9 10 11 val testItemSim = spark.sql( """ |select test.userId,test.itemId,test.rating testRating,training.rating,sim.sim |from test | left join training on test.userId=training.userId | left join itemSim sim on test.itemId=sim.itemX and training.itemId=sim.itemY """ .stripMargin ) testItemSim.cache() testItemSim.createOrReplaceTempView("testAndSim" )
Step 4:预测测试集中用户对物品的评分。对上一步得到的测试集相似物品表testAndSim进行计算,将该表数据按照(userId,itemId)进行分组,取每组相似度前K个物品的评分。最后依照预测值计算公式求出预测值,得到预测评分表testAndPre,其中pre字段就是对应的预测评分值。
1 2 3 4 5 6 7 8 9 10 11 12 13 val sqlRank = "select userId,itemId,testRating,rating,sim," + "rank() over (partition by userId,itemId order by sim desc) rank\n" + "from testAndSim" val testAndPre = spark.sql( "select userId,itemId,first(testRating) rate,nvl(sum(rating*sim)/sum(abs(sim)),0) pre\n" + "from( " + " select *" + " from (" + sqlRank + ") t " + s" where rank <= $k " + ") w " + "group by userId,itemId" )