文件格式:
访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击URL
数据文件越大越好,至少100万行
./spark-shell --master spark://shulaibao2:7077 --executor-memory 512m --driver-memory 4540m
内存根据服务器内存大小
加载hdfs数据源到SparkContext->HaddopRDD
val rdd1 = sc.textFile("hdfs://shulaibao2:9010/home/hadoop/upload/test/sougou/SogouQ1.txt")
MappedRDD->FilterRdd
val rdd1 = sc.textFile("hdfs://shulaibao2:9010/home/hadoop/upload/test/sougou/SogouQ1.txt")
val rdd2=rdd1.map(.split("\t")).filter(.length==6)
数据结构: Array[Array[String]] = Array(Array(20111230000005, 57375476989eea12893c0c3811607bcf, wolf, 1, 1, http://www.qiyi.com/), Array(20111230000005, 66c5bb7774e31d0a22278249b26bc83a, json, 3, 1, http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1))
Shuffle
val rdd3 = rdd2.map(x=>(x(1),1))
**数据结构:**Array([(String, Int)]),数组从0开始索引,Tupple从1开始索引
Array:array(0)元素同类型 元素值可变
List:list(1) 元素同类型 元素值不可变
Tupple:t._1 元素可不同类型 元素值不可变
Val rdd4 = rdd3.reduceByKey(_+_).map(x=>(x._2,x._1)). sortByKey(false).map(x=>(x._2,x._1))
保存结果
rdd4.saveAsTextFile("hdfs://shulaibao2:9010/home/hadoop/upload/test/sougou/result1.out")
hadoop fs -ls /home/hadoop/upload/test/sougou
合并节点
hdfs dfs -getmerge hdfs://shulaibao2:9010/home/hadoop/upload/test/sougou/result1.out /home/hadoop/result1.out
手机扫一扫
移动阅读更方便
你可能感兴趣的文章