3 大数据实战系列-spark shell分析日志
阅读原文时间:2023年09月04日阅读:1

文件格式: 
访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击URL 
数据文件越大越好,至少100万行

./spark-shell --master spark://shulaibao2:7077 --executor-memory 512m --driver-memory 4540m

内存根据服务器内存大小

  • 加载hdfs数据源到SparkContext->HaddopRDD

    val rdd1 = sc.textFile("hdfs://shulaibao2:9010/home/hadoop/upload/test/sougou/SogouQ1.txt")

  • MappedRDD->FilterRdd

    val rdd1 = sc.textFile("hdfs://shulaibao2:9010/home/hadoop/upload/test/sougou/SogouQ1.txt")
    val rdd2=rdd1.map(.split("\t")).filter(.length==6)

数据结构: Array[Array[String]] = Array(Array(20111230000005, 57375476989eea12893c0c3811607bcf, wolf, 1, 1, http://www.qiyi.com/), Array(20111230000005, 66c5bb7774e31d0a22278249b26bc83a, json, 3, 1, http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1))

  • Shuffle

    val rdd3 = rdd2.map(x=>(x(1),1))

**数据结构:**Array([(String, Int)]),数组从0开始索引,Tupple从1开始索引 
Array:array(0)元素同类型 元素值可变 
List:list(1) 元素同类型 元素值不可变 
Tupple:t._1 元素可不同类型 元素值不可变

Val rdd4 = rdd3.reduceByKey(_+_).map(x=>(x._2,x._1)). sortByKey(false).map(x=>(x._2,x._1))
  • 保存结果

    rdd4.saveAsTextFile("hdfs://shulaibao2:9010/home/hadoop/upload/test/sougou/result1.out")
    hadoop fs -ls /home/hadoop/upload/test/sougou

  • 合并节点

    hdfs dfs -getmerge hdfs://shulaibao2:9010/home/hadoop/upload/test/sougou/result1.out /home/hadoop/result1.out