2 大数据实战系列-spark shell wordcount
阅读原文时间:2023年09月04日阅读:4
cd /home/data/app/hadoop/spark-2.1.1-bin-hadoop2.7/bin
./spark-shell --master spark://shulaibao2:7077 --executor-memory 512m --driver-memory 4540m

初始化sc->SparkContext   spark->SparkSession
  • 2.1创建hdfs文件夹

Hadoop fs - mkdir -p /home/hadoop/upload/test

  • 2.2 上传数据源到hdfs

    Hadoop fs -put /home/data/app/hadoop/hadoop-2.8.0/etc/hadoop/core-site.xml
    /home/hadoop/upload/test

  • 2.3 验证hdfs文件列表

    Hadoop fs -ls /home/hadoop/upload/test

Scala-> spark shell:

scala>val rdd=sc.textFile("hdfs://shulaibao2:9010/home/hadoop/upload/test/core-site.xml")
scala>rdd.cache()
scala>val wordcount=rdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
scala>wordcount.take(10)
scala>val wordsort=wordcount.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
scala>wordsort.take(10)

Python -> spark submit:

spark = SparkSession.builder.appName("WordCountAPP").getOrCreate()
    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    print(lines.collect())

    counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)

    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))

    spark.stop()

java version: 
备注:楼主也是java程序员,但使用sprak确实不适合使用java开发。 
例如:

List<Tuple2<String, Integer>> output = counts.collect();
    for (Tuple2<?,?> tuple : output) {
      System.out.println(tuple._1() + ": " + tuple._2());
}
  • 元组、列表都是scala封装jar不容易抓到本质的数据结构
  • Java做数据分析代码冗长_.split(” “)或者lambda函数,java需要实现FlatMapFunction接口