参考《Spark快速大数据分析》动物书中的第四章"键值对操作",由于pair RDD的一些特殊操作,没有和前面两篇的API归纳放在一起做示例
前面的几个api —— reduceByKey()函数、foldByKey()函数、groupByKey()函数、combineByKey()函数、mapValues()函数、flatMapValues()函数、keys()函数、values()函数和sortByKey函数是针对一个Pair RDD的操作
而后的几个api —— subtractByKey()函数、join()函数、rightOuterJoin()函数、leftOuterJoin()函数和cogroup()函数是针对两个Pair RDD的函数
Pair RDD行动操作API归纳:https://www.cnblogs.com/kuluo/p/12567221.html
idea + spark 2.4.5 + scala 2.11.12
val testList = List("a a a b b b", "b b c c c", "c d d")
val testRdd = sc.parallelize(testList)
testRdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).foreach(pair => print(s"$pair "))
(d,2) (a,3) (b,5) (c,4)
val testList = List("a a a b b b", "b b c c c", "c d d")
val testRdd = sc.parallelize(testList)
testRdd.flatMap(_.split(" ")).map((_, 1)).foldByKey(0)(_ + _).foreach(pair => print(s"$pair "))
(d,2) (a,3) (b,5) (c,4)
对pair RDD中的每个值应用一个函数而不改变键
val testList = List("a a a b b b", "b b c c c", "c d d")
val testRdd = sc.parallelize(testList)
testRdd.flatMap(_.split(" ")).map((_, 1))
.reduceByKey(_ + _)
.mapValues(_ * 2)
.foreach(pair => print(s"$pair "))
(d,4) (a,6) (b,10) (c,8)
对pair RDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录
val testList = List("a a a b b b", "b b c c c", "c d d")
val testRdd = sc.parallelize(testList)
testRdd.flatMap(_.split(" ")).map((_, 1))
.reduceByKey(_ + _)
.flatMapValues(_ to 5)
.foreach(pair => print(s"$pair "))
(d,2) (d,3) (d,4) (d,5) (a,3) (a,4) (a,5) (b,5) (c,4) (c,5)
val testList = List("a a a b b b", "b b c c c", "c d d")
val testRdd = sc.parallelize(testList)
testRdd.flatMap(_.split(" ")).map((_, 1))
* (d,CompactBuffer(1, 1)) (a,CompactBuffer(1, 1, 1)) (b,CompactBuffer(1, 1, 1, 1, 1)) (c,CompactBuffer(1, 1, 1, 1))
.mapValues(_.sum) .foreach(pair => _print(s"$pair "))
(d,CompactBuffer(1, 1)) (a,CompactBuffer(1, 1, 1)) (b,CompactBuffer(1, 1, 1, 1, 1)) (c,CompactBuffer(1, 1, 1, 1))
(d,2) (a,3) (b,5) (c,4)
This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` or `PairRDDFunctions.reduceByKey` will provide much better performance.
val testList = List("a a a b b b", "b b c c c", "c d d")
val testRdd = sc.parallelize(testList)
testRdd.flatMap(_.split(" ")).map((_, 1))
.combineByKey(x => x,
(x:Int, y:Int) => x + y,
(x:Int, y:Int) => x + y)
.foreach(pair => print(s"$pair "))
(d,2) (a,3) (b,5) (c,4)
* (a,3) (b,5) (c,4) (d,2)
val testList1 = List("a a a b b b", "b b c c c", "c d d")
* (a,5) (b,4) (c,6)
val testList2 = List("a a a a a b b", "b b c c c c c", "c")
val testRdd1 = sc.parallelize(testList1)
val testRdd2 = sc.parallelize(testList2)
* union_取并集,构造_(a,3) (a,5) (b,5) (b,4) (c,4) (c,6) (d,2)
val testRdd3 = testRdd1.flatMap(_.split(" ")).map((_, 1))
.combineByKey(x => x,
(x: Int, y: Int) => x + y,
(x: Int, y: Int) => x + y)
testRdd2.flatMap(_.split(" ")).map((_, 1))
.combineByKey(x => x,
(x: Int, y: Int) => x + y,
(x: Int, y: Int) => x + y)
x => (x, 1),
(x: (Int, Int), y) => (x._1 + y, x._2 + 1),
(x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)
).mapValues(value => 1.0 * value._1 / value._2).foreach(pair => print(s"$pair "))
(d,2.0) (a,4.0) (b,4.5) (c,5.0)
val testList = List("a a a b b b", "b b c c c", "c d d")
val testRdd = sc.parallelize(testList)
testRdd.flatMap(_.split(" ")).map((_, 1))
.reduceByKey(_ + _)
.foreach(key => print(s"$key "))
d a b c
val testList = List("a a a b b b", "b b c c c", "c d d")
val testRdd = sc.parallelize(testList)
testRdd.flatMap(_.split(" ")).map((_, 1))
.reduceByKey(_ + _)
.foreach(value => print(s"$value "))
2 3 5 4
val testList = List("a a a b b b", "b b c c c", "c d d")
val testRdd = sc.parallelize(testList)
testRdd.flatMap(_.split(" ")).map((_, 1))
.reduceByKey(_ + _)
.foreach(pair => print(s"$pair "))
(d,2) (c,4) (b,5) (a,3)
* (a,3) (b,5) (c,4) (d,2)
val testList1 = List("a a a b b b", "b b c c c", "c d d")
* (a,5) (b,4) (c,6)
val testList2 = List("a a a a a b b", "b b c c c c c", "c")
val testRdd1 = sc.parallelize(testList1)
val testRdd2 = sc.parallelize(testList2)
testRdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
.subtractByKey(testRdd2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _))
.foreach(pair => print(s"$pair "))
* (a,3) (b,5) (c,4) (d,2)
val testList1 = List("a a a b b b", "b b c c c", "c d d")
* (a,5) (b,4) (c,6)
val testList2 = List("a a a a a b b", "b b c c c c c", "c")
val testRdd1 = sc.parallelize(testList1)
val testRdd2 = sc.parallelize(testList2)
testRdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
.join(testRdd2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _))
.foreach(pair => print(s"$pair "))
(a,(3,5)) (b,(5,4)) (c,(4,6))
* (a,3) (b,5) (c,4) (d,2)
val testList1 = List("a a a b b b", "b b c c c", "c d d")
* (a,5) (b,4) (c,6)
val testList2 = List("a a a a a b b", "b b c c c c c", "c")
val testRdd1 = sc.parallelize(testList1)
val testRdd2 = sc.parallelize(testList2)
testRdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
.rightOuterJoin(testRdd2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _))
.foreach(pair => print(s"$pair "))
(a,(Some(3),5)) (b,(Some(5),4)) (c,(Some(4),6))
* (a,3) (b,5) (c,4) (d,2)
val testList1 = List("a a a b b b", "b b c c c", "c d d")
* (a,5) (b,4) (c,6)
val testList2 = List("a a a a a b b", "b b c c c c c", "c")
val testRdd1 = sc.parallelize(testList1)
val testRdd2 = sc.parallelize(testList2)
testRdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
.leftOuterJoin(testRdd2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _))
.foreach(pair => print(s"$pair "))
(d,(2,None)) (a,(3,Some(5))) (b,(5,Some(4))) (c,(4,Some(6)))
* (a,3) (b,5) (c,4) (d,2)
val testList1 = List("a a a b b b", "b b c c c", "c d d")
* (a,5) (b,4) (c,6)
val testList2 = List("a a a a a b b", "b b c c c c c", "c")
val testRdd1 = sc.parallelize(testList1)
val testRdd2 = sc.parallelize(testList2)
testRdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
.cogroup(testRdd2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _))
.foreach(pair => print(s"$pair "))
(d,(CompactBuffer(2),CompactBuffer())) (a,(CompactBuffer(3),CompactBuffer(5)))
(b,(CompactBuffer(5),CompactBuffer(4))) (c,(CompactBuffer(4),CompactBuffer(6)))