2.2 RDD:计算 transform->action
x = sc.parallelize([2,3,4], 2)[Task不能跨分片,task数为2] neutral_zero_value = (0,1) # sum: x+0 = x, product: 1*x = x seqOp = (lambda aggregated, el: (aggregated[0] + el, aggregated[1] * el)) combOp = (lambda aggregated, el: (aggregated[0] + el[0], aggregated[1] * el[1])) y = x.aggregate(neutral_zero_value,seqOp,combOp)[seqOp:各分片聚合计算 combOp:聚合各个分片 neutral_zero_value:初始化设定值,都要参与计算] # computes (cumulative sum, cumulative product) print(x.collect()) print(y) [2, 3, 4] (9, 24)
x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]) zeroValue = [] # empty list is 'zero value' for append operation mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)]) mergeComb = (lambda agg1,agg2: agg1 + agg2 ) y = x.aggregateByKey(zeroValue,mergeVal,mergeComb) print(x.collect()) print(y.collect()) [('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)] [('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]
2.2.3 cache(persistent)
2.2.4 cartesian(笛卡儿积)
# cartesian x = sc.parallelize(['A','B']) y = sc.parallelize(['C','D']) z = x.cartesian(y) print(x.collect()) print(y.collect()) print(z.collect()) ['A', 'B'] ['C', 'D'] [('A', 'C'), ('A', 'D'), ('B', 'C'), ('B', 'D')]
# coalesce x = sc.parallelize([1,2,3,4,5],2) y = x.coalesce(numPartitions=1) print(x.glom().collect()) print(y.glom().collect()) [[1, 2], [3, 4, 5]] [[1, 2, 3, 4, 5]]
# cogroup x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))]) y = sc.parallelize([('A',8),('B',7),('A',6),('D',(5,5))]) z = x.cogroup(y) print(x.collect()) print(y.collect()) for key,val in list(z.collect()): print(key, [list(i) for i in val]) [('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))] [('A', 8), ('B', 7), ('A', 6), ('D', (5, 5))] A [[2, (1, 1)], [8, 6]] C [[4], []] B [[(3, 3)], [7]] D [[], [(5, 5)]]
# combineByKey x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]) createCombiner = (lambda el: [(el,el**2)]) mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)]) # append to aggregated mergeComb = (lambda agg1,agg2: agg1 + agg2 ) # append agg1 with agg2 y = x.combineByKey(createCombiner,mergeVal,mergeComb) print(x.collect()) print(y.collect()) [('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)] [('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]
# filter x = sc.parallelize([1,2,3]) y = x.filter(lambda x: x%2 == 1) # filters out even elements print(x.collect()) print(y.collect()) [1, 2, 3] [1, 3] 2.2.14 first 备注:Return the first element of this rdd
# flatMap x = sc.parallelize([1,2,3]) y = x.flatMap(lambda x: (x, 100*x, x**2)) print(x.collect()) print(y.collect()) [1, 2, 3] [1, 100, 1, 2, 200, 4, 3, 300, 9]
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) def f(x): return x x.flatMapValues(f).collect() [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
# fold x = sc.parallelize([1,2,3]) neutral_zero_value = 0 # 0 for sum, 1 for multiplication y = x.fold(neutral_zero_value,lambda obj, accumulated: accumulated + obj) # computes cumulative sum print(x.collect()) print(y) [1, 2, 3] 6
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> from operator import add[python标准库:常用运算函数库] >>> sorted(rdd.foldByKey(0, add).collect()) [('a', 2), ('b', 1)]
>>> def f(x): print(x) >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)[循环rdd的所有元素,无返回值]
# foreachPartition from __future__ import print_function x = sc.parallelize([1,2,3],5) def f(parition): '''side effect: append the current RDD partition contents to a file''' f1=open("./foreachPartitionExample.txt", 'a+') print([el for el in parition],file=f1) # first clear the file contents open('./foreachPartitionExample.txt', 'w').close() y = x.foreachPartition(f) # writes into foreachExample.txt print(x.glom().collect()) print(y) # foreach returns 'None' # print the contents of foreachExample.txt with open("./foreachPartitionExample.txt", "r") as foreachExample: print (foreachExample.read()) [[], [1], [], [2], [3]] None [] [] [1] [2] [3]
2.2.21 fullOuterJoin
备注:Perform a right outer join of self and other.
2.2.22 glom()[合并rdd的分片的所有元素]
2.2.23 groupby
# groupBy x = sc.parallelize([1,2,3]) y = x.groupBy(lambda x: 'A' if (x%2 == 1) else 'B' ) print(x.collect()) # y is nested, this iterates through it print([(j[0],[i for i in j[1]]) for j in y.collect()]) [1, 2, 3] [('A', [1, 3]), ('B', [2])]
# groupByKey x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)]) y = x.groupByKey() print(x.collect()) print([(j[0],[i for i in j[1]]) for j in y.collect()]) [('B', 5), ('B', 4), ('A', 3), ('A', 2), ('A', 1)] [('A', [3, 2, 1]), ('B', [5, 4])]
# groupWith x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))]) y = sc.parallelize([('B',(7,7)),('A',6),('D',(5,5))]) z = sc.parallelize([('D',9),('B',(8,8))]) a = x.groupWith(y,z) print(x.collect()) print(y.collect()) print(z.collect()) print("Result:") for key,val in list(a.collect()): print(key, [list(i) for i in val]) [('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))] [('B', (7, 7)), ('A', 6), ('D', (5, 5))] [('D', 9), ('B', (8, 8))] Result: D [[], [(5, 5)], [9]] C [[4], [], []] B [[(3, 3)], [(7, 7)], [(8, 8)]] A [[2, (1, 1)], [6], []]
rdd = sc.parallelize(range(51)) rdd.histogram(2) ([0, 25, 50], [25, 26]) rdd.histogram([0, 5, 25, 50]) ([0, 5, 25, 50], [5, 20, 26])[分组区间,每区间元素个数] rdd.histogram([0, 15, 30, 45, 60]) # evenly spaced buckets ([0, 15, 30, 45, 60], [15, 15, 15, 6]) rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"]) rdd.histogram(("a", "b", "c")) (('a', 'b', 'c'), [2, 2])
2.2.27 join
x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])
y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)]) z = x.join(y)
print(x.collect())
print(y.collect())
print(z.collect())
[('C', 4), ('B', 3), ('A', 2), ('A', 1)]
[('A', 8), ('B', 7), ('A', 6), ('D', 5)]
[('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('B', (3, 7))]
2.2.28 leftoutjoin
# leftOuterJoin x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)]) y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)]) z = x.leftOuterJoin(y) print(x.collect()) print(y.collect()) print(z.collect()) [('C', 4), ('B', 3), ('A', 2), ('A', 1)] [('A', 8), ('B', 7), ('A', 6), ('D', 5)] [('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('C', (4, None)), ('B', (3, 7))]
# rightOuterJoin x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)]) y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)]) z = x.rightOuterJoin(y) print(x.collect()) print(y.collect()) print(z.collect()) [('C', 4), ('B', 3), ('A', 2), ('A', 1)] [('A', 8), ('B', 7), ('A', 6), ('D', 5)] [('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('B', (3, 7)), ('D', (None, 5))]
# keyBy x = sc.parallelize([1,2,3]) y = x.keyBy(lambda x: x**2) print(x.collect()) print(y.collect()) [1, 2, 3] [(1, 1), (4, 2), (9, 3)] 2.2.31 lookup(key)
# mapPartitions x = sc.parallelize([1,2,3], 2) def f(iterator): yield sum(iterator) y = x.mapPartitions(f) # glom() flattens elements on the same partition print(x.glom().collect()) print(y.glom().collect()) [[1], [2, 3]] [[1], [5]]
# mapPartitionsWithIndex x = sc.parallelize([1,2,3], 2) def f(partitionIndex, iterator): yield (partitionIndex,sum(iterator)) y = x.mapPartitionsWithIndex(f) # glom() flattens elements on the same partition print(x.glom().collect()) print(y.glom().collect()) [[1], [2, 3]] [[(0, 1)], [(1, 5)]]
>>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) >>> def f(x): return len(x) >>> x.mapValues(f).collect() [('a', 3), ('b', 1)]
# partitionBy x = sc.parallelize([(0,1),(1,2),(2,3)],2) y = x.partitionBy(numPartitions = 3, partitionFunc = lambda x: x) # only key is passed to paritionFunc print(x.glom().collect()) print(y.glom().collect()) [[(0, 1)], [(1, 2), (2, 3)]] [[(0, 1)], [(1, 2)], [(2, 3)]]
# rdd持久化,降低内存消耗 reqrdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_SER) Cache: Persist this RDD with the default storage level (MEMORY_ONLY).
# pipe x = sc.parallelize(['A', 'Ba', 'C', 'AD']) y = x.pipe('grep -i "A"') # calls out to grep, may fail under Windows print(x.collect()) print(y.collect()) ['A', 'Ba', 'C', 'AD'] ['A', 'Ba', 'AD']
# reduce x = sc.parallelize([1,2,3]) y = x.reduce(lambda obj, accumulated: obj + accumulated) # computes a cumulative sum print(x.collect()) print(y) [1, 2, 3] 6
# reduceByKey x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]) y = x.reduceByKey(lambda agg, obj: agg + obj) print(x.collect()) print(y.collect()) [('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)] [('A', 12), ('B', 3)]
# reduceByKeyLocally x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)]) y = x.reduceByKeyLocally(lambda agg, obj: agg + obj) print(x.collect()) print(y) [('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)] {'A': 12, 'B': 3} 备注:This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.
# repartition x = sc.parallelize([1,2,3,4,5],2) y = x.repartition(numPartitions=3) print(x.glom().collect()) print(y.glom().collect()) [[1, 2], [3, 4, 5]] [[], [1, 2, 3, 4], [5]]
rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)]) rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, True) rdd2.glom().collect() [[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]]
# sample x = sc.parallelize(range(7)) # call 'sample' 5 times ylist = [x.sample(withReplacement=False, fraction=0.5) for i in range(5)] print('x = ' + str(x.collect())) for cnt,y in zip(range(len(ylist)), ylist): print('sample:' + str(cnt) + ' y = ' + str(y.collect())) x = [0, 1, 2, 3, 4, 5, 6] sample:0 y = [0, 2, 5, 6] sample:1 y = [2, 6] sample:2 y = [0, 4, 5, 6] sample:3 y = [0, 2, 6] sample:4 y = [0, 3, 4]
# sampleByKey x = sc.parallelize([('A',1),('B',2),('C',3),('B',4),('A',5)]) y = x.sampleByKey(withReplacement=False, fractions={'A':0.5, 'B':1, 'C':0.2}) print(x.collect()) print(y.collect()) [('A', 1), ('B', 2), ('C', 3), ('B', 4), ('A', 5)] [('B', 2), ('C', 3), ('B', 4)]
# sampleStdev x = sc.parallelize([1,3,2]) y = x.sampleStdev() # divides by N-1 print(x.collect()) print(y) [1, 3, 2] 1.0
# sampleVariance x = sc.parallelize([1,3,2]) y = x.sampleVariance() # divides by N-1 print(x.collect()) print(y) [1, 3, 2] 1.0
# sortByKey x = sc.parallelize([('B',1),('A',2),('C',3)]) y = x.sortByKey() print(x.collect()) print(y.collect()) [('B', 1), ('A', 2), ('C', 3)] [('A', 2), ('B', 1), ('C', 3)]
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] sc.parallelize(tmp).sortBy(lambda x: x[0]).collect() [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] sc.parallelize(tmp).sortBy(lambda x: x[1]).collect() [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) y = sc.parallelize([("a", 3), ("c", None)]) sorted(x.subtract(y).collect()) [('a', 1), ('b', 4), ('b', 5)] 2.2.54 subtractByKey x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) y = sc.parallelize([("a", 3), ("c", None)]) sorted(x.subtractByKey(y).collect()) [('b', 4), ('b', 5)]
# take x = sc.parallelize([1,3,1,2,3]) y = x.take(num = 3) print(x.collect()) print(y) [1, 3, 1, 2, 3] [1, 3, 1]
# takeOrdered x = sc.parallelize([1,3,1,2,3]) y = x.takeOrdered(num = 3) print(x.collect()) print(y) [1, 3, 1, 2, 3] [1, 1, 2]
# takeSample x = sc.parallelize(range(7)) # call 'sample' 5 times ylist = [x.takeSample(withReplacement=False, num=3) for i in range(5)] print('x = ' + str(x.collect())) for cnt,y in zip(range(len(ylist)), ylist): print('sample:' + str(cnt) + ' y = ' + str(y)) # no collect on y x = [0, 1, 2, 3, 4, 5, 6] sample:0 y = [0, 2, 6] sample:1 y = [6, 4, 2] sample:2 y = [2, 0, 4] sample:3 y = [5, 4, 1] sample:4 y = [3, 1, 4]
add = lambda x, y: x + y rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10) rdd.treeAggregate(0, add, add) -5 rdd.treeAggregate(0, add, add, 1) -5 rdd.treeAggregate(0, add, add, 2) -5 rdd.treeAggregate(0, add, add, 5) -5 rdd.treeAggregate(0, add, add, 10) -5
add = lambda x, y: x + y rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10) rdd.treeReduce(add) -5 rdd.treeReduce(add, 1) -5 rdd.treeReduce(add, 2) -5 rdd.treeReduce(add, 5) -5 rdd.treeReduce(add, 10) -5
# zip x = sc.parallelize(['B','A','A']) # zip expects x and y to have same #partitions and #elements/partition y = x.map(lambda x: ord(x)) z = x.zip(y) print(x.collect()) print(y.collect()) print(z.collect()) ['B', 'A', 'A'] [66, 65, 65] [('B', 66), ('A', 65), ('A', 65)]
# zipWithIndex x = sc.parallelize(['B','A','A'],2) y = x.zipWithIndex() print(x.glom().collect()) print(y.collect()) [['B'], ['A', 'A']] [('B', 0), ('A', 1), ('A', 2)]
# zipWithUniqueId x = sc.parallelize(['B','A','A'],2) y = x.zipWithUniqueId() print(x.glom().collect()) print(y.collect()) [['B'], ['A', 'A']] [('B', 0), ('A', 1), ('A', 3)]
手机扫一扫
移动阅读更方便
你可能感兴趣的文章