combineByKey算子求解平均值实例_scala combinebykey求平均值-程序员宅基地

技术标签: 实例  

不同场景平均值算法


求平均值系列之一:

val input = sc.parallelize(Seq(("t1", 1), ("t1", 2), ("t1", 3), ("t2", 2), ("t2", 5)))
val result = input.combineByKey( 
(v) => (v, 1), 
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) 
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
result.collectAsMap().foreach(println(_)) 


-----------------测试运行结果:--------------------
scala> val input = sc.parallelize(Seq(("t1", 1), ("t1", 2), ("t1", 3), ("t2", 2), ("t2", 5)))
input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[17] at parallelize at <console>:25


scala> val result = input.combineByKey( 
     | (v) => (v, 1), 
     | (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
     | (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) 
     | ).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
result: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[19] at map at <console>:31


scala> result.collectAsMap().foreach(println(_)) 
(t1,2.0)
(t2,3.5)



求平均值系列之二:

val testData = sc.parallelize(Seq(("t1", (1,2)), ("t1", (2,4)), ("t1", (3,5)), ("t2", (2,1)), ("t2", (5,2))))
(t2,(6,3))(t1,(4,2))


val result = testData.combineByKey( 
(v) => (v._1, v._2), 
(acc: (Int, Int), v) => (acc._1 + v._1, acc._2 + v._2),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) 
).map{ case (key, value) => (key, value._1 / value._2) }
result.collectAsMap().foreach(println(_)) 


val result = testData.combineByKey( 
(v) => (v._1, v._2), 
(acc: (Int, Int), v) => (acc._1 + v._1, acc._2 + v._2),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) 
).map{ case (key, value) => (key, value._1 / value._2) }
result.collectAsMap().foreach(println(_)) 


求平均值算法验证,下面不能直接使用于生成环境


val testData = sc.parallelize(Seq(("t1", (1,2)), ("t1", (2,4)), ("t1", (3,5)), ("t2", (2,1)), ("t2", (5,2))))
 
 val tt = testData.combineByKey((_: String) => (0, 0),(pair: (Int, Int), value: String) =>(pair._1 + Integer.parseInt(value.split("#")(0)), pair._2 + Integer.parseInt(value.split("#")(1))),(pair1: (Int, Int), pair2: (Int, Int)) =>(pair1._1 + pair2._1, pair2._2 + pair2._2))


(t2,(5,2))(t1,(2,0))
val tt = testData.combineByKey((_: String) => (0, 0),(pair: (Int, Int), value: String) => (pair._1 + Integer.parseInt(value.split("#")(0)), pair._2 + Integer.parseInt(value.split("#")(1))),(pair1: (Int, Int), pair2: (Int, Int)) =>( pair1._1 + pair2._1, pair2._2 + pair2._2 ))




val averages: RDD[String, Double] = sumCountPairs.mapValues {
  case (sum, 0L) => 0D
  case (sum, count) => sum.toDouble / count
}


val sumCountPairs:RDD[(String, (Int, Long))] = testData.combineByKey(
  (_: Int) => (0, 0L),
  (pair: (Int, Long), value: Int) =>
    (pair._1 + value, pair._2 + 1L),
  (pair1: (Int, Long), pair2: (Int, Long)) =>
    (pair1._1 + pair2._1, pair2._2 + pair2._2)
)


val sumCountPairs = testData.combineByKey(
  (_: Int) => (0, 0L),
  (pair: (Int, Long), value: Int) =>
    (pair._1 + value, pair._2 + 1L),
  (pair1: (Int, Long), pair2: (Int, Long)) =>
    (pair1._1 + pair2._1, pair2._2 + pair2._2)
)


val averages = sumCountPairs.mapValues {
  case (sum, 0L) => 0D
  case (sum, count) => sum.toDouble / count
}



案例拾遗:


题主示例代码中 testData 这个 RDD 的类型是已经确定为 RDD[(String, Int)],然后通过 RDD.rddToRDDPairFunctions 这个隐式类型转换转为 PairRDDFunctions[String, Int],从而获得 reduceByKey 和 combineByKey 这两个 methods。然后来对比下二者的函数签名: class PairRDDFunctions[K, V](...) {
  def reduceByKey(func: (V, V) => V): RDD[(K, V)]


  def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)]
}
可以看到 reduceByKey 的 func 参数的类型只依赖于 PairRDDFunction 的类型参数 V,在这个例子里也就是 Int。于是 func 的类型已经确定为 (Int, Int) => Int,所以就不需要额外标识类型了。而 combineByKey 比 reduceByKey 更加通用,它允许各个 partition 在 shuffle 前先做 local reduce 得到一个类型为 C 的中间值,待 shuffle 后再做合并得到各个 key 对应的 C。以求均值为例,我们可以让每个 partiton 先求出单个 partition 内各个 key 对应的所有整数的和 sum 以及个数 count,然后返回一个 pair (sum, count)。在 shuffle 后累加各个 key 对应的所有 sum 和 count,再相除得到均值:val sumCountPairs: RDD[(String, (Int, Long))] = testData.combineByKey(
  (_: Int) => (0, 0L),


  (pair: (Int, Long), value: Int) =>
    (pair._1 + value, pair._2 + 1L),


  (pair1: (Int, Long), pair2: (Int, Long)) =>
    (pair1._1 + part2._1, pair2._2 + pair2._2)
)


val averages: RDD[String, Double] = sumCountPairs.mapValues {
  case (sum, 0L) => 0D
  case (sum, count) => sum.toDouble / count
}
由于 C 这个 类型参数是任意的,并不能从 testData 的类型直接推导出来,所以必须明确指定。只不过题主的例子是最简单的用 reduceByKey 就可以搞定的情况,也就是 V 和 C 完全相同,于是就看不出区别了。




val listRDD = sc.parallelize(List(1,2,3,4,4,5)).map(x => (x,1))
def combineByKey[C](createCombiner: Int => C,mergeValue: (C, Int) => C,mergeCombiners: (C, C) => C): org.apache.spark.rdd.RDD[(Int, C)]


val sumandcnt = listRDD.combineByKey((_: Int)=>(0, 0),(pair:(Int,Int),value:Int)=>(pair._1 + value, pair._2 + 1),(pair1:(Int, Int),pair2:(Int, Int))=>(pair1._1 + pair2._1, pair2._2 + pair2._2))
val ll =sumandcnt.mapValues {
  case (sum, 0) => 0D
  case (sum, count) => sum.toDouble / count
}


val rdd = List(1,2,3,4)
val input = sc.parallelize(rdd)
val result = input.aggregate((0,0))(
(acc,value) => (acc._1 + value, acc._2 + 1),
(acc1,acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)




result: (Int, Int) = (10, 4)
val avg = result._1 / result._2
avg: Int = 2.5




程序的详细过程大概如下:
首先定义一个初始值 (0, 0),即我们期待的返回类型的初始值。
(acc,value) => (acc._1 + value, acc._2 + 1), value是函数定义里面的T,这里是List里面的元素。所以acc._1 + value, acc._2 + 1的过程如下:
0+1, 0+1
1+2, 1+1
3+3, 2+1
6+4, 3+1
结果为 (10,4)。在实际Spark执行中是分布式计算,可能会把List分成多个分区,假如3个,p1(1,2), p2(3), p3(4),经过计算各分区的的结果 (3,2), (3,1), (4,1),这样,执行 (acc1,acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) 就是 (3+3+4,2+1+1) 即 (10,4),然后再计算平均值。




版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/andyliuzhii/article/details/76851197

智能推荐

class和struct的区别-程序员宅基地

文章浏览阅读101次。4.class可以有⽆参的构造函数,struct不可以,必须是有参的构造函数,⽽且在有参的构造函数必须初始。2.Struct适⽤于作为经常使⽤的⼀些数据组合成的新类型,表示诸如点、矩形等主要⽤来存储数据的轻量。1.Class⽐较适合⼤的和复杂的数据,表现抽象和多级别的对象层次时。2.class允许继承、被继承,struct不允许,只能继承接⼝。3.Struct有性能优势,Class有⾯向对象的扩展优势。3.class可以初始化变量,struct不可以。1.class是引⽤类型,struct是值类型。

android使用json后闪退,应用闪退问题:从json信息的解析开始就会闪退-程序员宅基地

文章浏览阅读586次。想实现的功能是点击顶部按钮之后按关键字进行搜索,已经可以从服务器收到反馈的json信息,但从json信息的解析开始就会闪退,加载listview也不知道行不行public abstract class loadlistview{public ListView plv;public String js;public int listlength;public int listvisit;public..._rton转json为什么会闪退

如何使用wordnet词典,得到英文句子的同义句_get_synonyms wordnet-程序员宅基地

文章浏览阅读219次。如何使用wordnet词典,得到英文句子的同义句_get_synonyms wordnet

系统项目报表导出功能开发_积木报表 多线程-程序员宅基地

文章浏览阅读521次。系统项目报表导出 导出任务队列表 + 定时扫描 + 多线程_积木报表 多线程

ajax 如何从服务器上获取数据?_ajax 获取http数据-程序员宅基地

文章浏览阅读1.1k次,点赞9次,收藏9次。使用AJAX技术的好处之一是它能够提供更好的用户体验,因为它允许在不重新加载整个页面的情况下更新网页的某一部分。另外,AJAX还使得开发人员能够创建更复杂、更动态的Web应用程序,因为它们可以在后台与服务器进行通信,而不需要打断用户的浏览体验。在Web开发中,AJAX(Asynchronous JavaScript and XML)是一种常用的技术,用于在不重新加载整个页面的情况下,从服务器获取数据并更新网页的某一部分。使用AJAX,你可以创建异步请求,从而提供更快的响应和更好的用户体验。_ajax 获取http数据

Linux图形终端与字符终端-程序员宅基地

文章浏览阅读2.8k次。登录退出、修改密码、关机重启_字符终端

随便推点

Python与Arduino绘制超声波雷达扫描_超声波扫描建模 python库-程序员宅基地

文章浏览阅读3.8k次,点赞3次,收藏51次。前段时间看到一位发烧友制作的超声波雷达扫描神器,用到了Arduino和Processing,可惜啊,我不会Processing更看不懂人家的程序,咋办呢?嘿嘿,所以我就换了个思路解决,因为我会一点Python啊,那就动手吧!在做这个案例之前先要搞明白一个问题:怎么将Arduino通过超声波检测到的距离反馈到Python端?这个嘛,我首先想到了串行通信接口。没错!就是串口。只要Arduino将数据发送给COM口,然后Python能从COM口读取到这个数据就可以啦!我先写了一个测试程序试了一下,OK!搞定_超声波扫描建模 python库

凯撒加密方法介绍及实例说明-程序员宅基地

文章浏览阅读4.2k次。端—端加密指信息由发送端自动加密,并且由TCP/IP进行数据包封装,然后作为不可阅读和不可识别的数据穿过互联网,当这些信息到达目的地,将被自动重组、解密,而成为可读的数据。不可逆加密算法的特征是加密过程中不需要使用密钥,输入明文后由系统直接经过加密算法处理成密文,这种加密后的数据是无法被解密的,只有重新输入明文,并再次经过同样不可逆的加密算法处理,得到相同的加密密文并被系统重新识别后,才能真正解密。2.使用时,加密者查找明文字母表中需要加密的消息中的每一个字母所在位置,并且写下密文字母表中对应的字母。_凯撒加密

工控协议--cip--协议解析基本记录_cip协议embedded_service_error-程序员宅基地

文章浏览阅读5.7k次。CIP报文解析常用到的几个字段:普通类型服务类型:[0x00], CIP对象:[0x02 Message Router], ioi segments:[XX]PCCC(带cmd和func)服务类型:[0x00], CIP对象:[0x02 Message Router], cmd:[0x101], fnc:[0x101]..._cip协议embedded_service_error

如何在vs2019及以后版本(如vs2022)上添加 添加ActiveX控件中的MFC类_vs添加mfc库-程序员宅基地

文章浏览阅读2.4k次,点赞9次,收藏13次。有时候我们在MFC项目开发过程中,需要用到一些微软已经提供的功能,如VC++使用EXCEL功能,这时候我们就能直接通过VS2019到如EXCEL.EXE方式,生成对应的OLE头文件,然后直接使用功能,那么,我们上篇文章中介绍了vs2017及以前的版本如何来添加。但由于微软某些方面考虑,这种方式已被放弃。从上图中可以看出,这一功能,在从vs2017版本15.9开始,后续版本已经删除了此功能。那么我们如果仍需要此功能,我们如何在新版本中添加呢。_vs添加mfc库

frame_size (1536) was not respected for a non-last frame_frame_size (1024) was not respected for a non-last-程序员宅基地

文章浏览阅读785次。用ac3编码,执行编码函数时报错入如下:[ac3 @ 0x7fed7800f200] frame_size (1536) was not respected for anon-last frame (avcodec_encode_audio2)用ac3编码时每次送入编码器的音频采样数应该是1536个采样,不然就会报上述错误。这个数字并非刻意固定,而是跟ac3内部的编码算法原理相关。全网找不到,国内音视频之路还有很长的路,音视频人一起加油吧~......_frame_size (1024) was not respected for a non-last frame

Android移动应用开发入门_在安卓移动应用开发中要在活动类文件中声迷你一个复选框变量-程序员宅基地

文章浏览阅读230次,点赞2次,收藏2次。创建Android应用程序一个项目里面可以有很多模块,而每一个模块就对应了一个应用程序。项目结构介绍_在安卓移动应用开发中要在活动类文件中声迷你一个复选框变量

推荐文章

热门文章

相关标签