栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark中combinByKey实现reduceByKey和groupByKey

大数据系统 更新时间:发布时间: 百科书网 趣学号

Spark中combinByKey实现reduceByKey和groupByKey具体的实现函数

  def combineByKeyToReduceBy(sc:SparkContext): Unit ={
    val rdd = sc.parallelize(List(("a",1),("b",2),("c",3),("a",2),("a",3),("b",3),("c",2)))
    rdd.reduceByKey(_+_).foreach(println)
    println("这个是reduceByKey的示例,下面为使用combineByKey做ReduceByKey")
    val reducevalue: RDD[(String, Int)] = rdd.combineByKey(
      createCombiner1,
      mergevalue1,
      mergeCombiners1
    )
  reducevalue.foreach(println)
  }

  //这里的返回值成为下面的merValue1的c,参数v为传入rdd的键值对的值
  def createCombiner1(v:Int): Int ={
    v
  }
  //这里的参数c为一个迭代值,c+v的值成为c,不断迭代,
  // 参数v为键值对中的值,这里为同分区进行迭代
  def mergevalue1(c:Int,v:Int): Int ={
    c+v
  }
  //这里为不同分区的迭代,不同分区分别计算之后,再根据key将对应的value值相加
  // ,这里的c1和c2类似于上面函数那个的参数的作用
  def mergeCombiners1(c1:Int,c2:Int): Int ={
    c1+c2
  }


  
  def combineByKeyToGroupBy(sc:SparkContext): Unit ={
    val rdd = sc.parallelize(List(("a",1),("b",2),("c",3),("a",2),("a",3),("b",3),("c",2)))
    rdd.groupByKey().foreach(println)
    val groupByValue: RDD[(String, ListBuffer[String])] = rdd.combineByKey(
      createCombiner,
      mergevalue,
      mergeCombiners
    )
    groupByValue.foreach(println)
  }
  def createCombiner(v:Int):ListBuffer[String] = {
    println("======== this is createCombiner ==========")
    val list = new ListBuffer[String];
    list.append(v.toString)
    list
  }
  def mergevalue(ls:ListBuffer[String],v:Int):ListBuffer[String]={
    println("======== this is mergevalue ==========")
    ls.append(v.toString)
    ls
  }
  def mergeCombiners(ls1:ListBuffer[String],ls2:ListBuffer[String]):ListBuffer[String]={
    ls1++ls2
  }

main函数

def main(args: Array[String]): Unit = {
    val sc:SparkContext = SparkUtil.getDefaultSparkContest(SparkRDD.getClass.getName)
    //此处写函数测试

  }

util工具类

object SparkUtil {
  def main(args: Array[String]): Unit = {

  }

  val DEFAULT_MASTER_URL = "local[*]";

  def getSparkContext(appName:String,master:String):SparkContext
  = new SparkContext(new SparkConf().setAppName(appName).setMaster(master));

  def getDefaultSparkContest(appName:String):SparkContext = getSparkContext(appName,DEFAULT_MASTER_URL);

  def closeSparkContext(sc:SparkContext):Unit = if(sc != null) sc.close();
}

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/460618.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号