cd /home/data/app/hadoop/spark-2.1.1-bin-hadoop2.7/bin
./spark-shell --master spark://shulaibao2:7077 --executor-memory 512m --driver-memory 4540m
初始化sc->SparkContext spark->SparkSession
Hadoop fs - mkdir -p /home/hadoop/upload/test
2.2 上传数据源到hdfs
Hadoop fs -put /home/data/app/hadoop/hadoop-2.8.0/etc/hadoop/core-site.xml
/home/hadoop/upload/test
2.3 验证hdfs文件列表
Hadoop fs -ls /home/hadoop/upload/test
Scala-> spark shell:
scala>val rdd=sc.textFile("hdfs://shulaibao2:9010/home/hadoop/upload/test/core-site.xml")
scala>rdd.cache()
scala>val wordcount=rdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
scala>wordcount.take(10)
scala>val wordsort=wordcount.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
scala>wordsort.take(10)
Python -> spark submit:
spark = SparkSession.builder.appName("WordCountAPP").getOrCreate()
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
print(lines.collect())
counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
spark.stop()
java version:
备注:楼主也是java程序员,但使用sprak确实不适合使用java开发。
例如:
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章