批量处理模板方法, 核心处理方法为内部方法
def batchProces(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int) = {
//自定义RDD,此处为demo
val dataRDD = sc.makeRDD(List(1, 2), numPartitions)
dataRDD.mapPartitions(iterator => {
val rawData = iterator.toList
var lstT = new ListBuffer\[(Int, Int)\]()
rawData.foreach(v => {
if (lstT.size < 50) {
lstT.append((v, 1))
} else {
//每50处理一次
procesData()
}
})
//剩余的继续处理
procesData()
//批量处理逻辑
def procesData() = {
//核心处理逻辑
// doProcess
//很重要
lstT.clear()
}
lstT.iterator
}).map((\_, 1)).reduceByKey(\_ + \_).sortByKey().saveAsTextFile("hdfs://hdfscluster/tmp/logs/")
}
批量处理模板方法, 核心处理方法为外部方法
def process_outer(lst: List[(Int, Int)]) = {
//外部核心处理逻辑,如Request请求等
RequestUtil.postJson("http://xxx", "{paraData}", 1000)
}
def batchProces_processOuter(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int) = {
val fooCount = sc.longAccumulator("fooCount")
//自定义RDD,此处为demo
val dataRDD = sc.makeRDD(List(1, 2), numPartitions)
dataRDD.foreachPartition(iterator => {
val rawData = iterator.toList
var lstT = new ListBuffer\[(Int, Int)\]()
rawData.foreach(v => {
if (lstT.size < 50) {
lstT.append((v, 1))
} else {
//每50处理一次
process\_outer(lstT.toList)
fooCount.add(lstT.size)
lstT.clear()
}
})
//剩余的继续处理
if (lstT.size > 0) {
process\_outer(lstT.toList)
fooCount.add(lstT.size)
lstT.clear()
}
});
println("total =>" + fooCount.value)
}
针对文本文件RDD的一些处理逻辑:
//针对单个文件,每行数据超长的情况, 先对行进行拆分,再重新分区,将数据交给多个executor去执行
def bigLine(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int) = {
val fileRDD = sc.textFile("hdfs://hdfscluster/tmp/logs/abc.txt", numPartitions)
//对于长文本, 先拆分,然后重新分区,提高并发机器利用率, 减少job执行时间
fileRDD.flatMap(\_.split(",")).repartition(24).foreach(println(\_))
}
//针对无规律零散路径,循环内部使用sc
def handlerPath_lingsan(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int, filep: String) = {
val rawPath: List[String] = List("hdfs://hdfscluster/tmp1/path1", "hdfs://hdfscluster/tmp2/path2", "hdfs://hdfscluster/tmp3/path3")
val lsResult = rawPath.flatMap(v => {
sc.textFile(v).map((_, 1)).collect().toList
}).toList.foreach(println(_))
}
//针对文件夹,
def handlerPath_directroy(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int, filep: String) = {
//按行输出指定文件夹下所有文件,分区有效
val txtRDD = sc.textFile("hdfs://hdfscluster/tmp1/*", numPartitions)
//重新分区,便于输出结果
txtRDD.map((_, 1)).repartition(1)
.saveAsTextFile("hdfs://hdfscluster/tmp/logs/ssoHot3")
}
//针对文件夹,且路径下文件数量比较多且比较小的情况
def handlerPath_directroy(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int, filep: String) = {
//返回结果key=文件路径,val=文件内容, 如果content太大的话,容易造成OOM
val dirRDD = sc.wholeTextFiles("hdfs://hdfscluster/tmp1/\*", numPartitions)
dirRDD.flatMap(v => {
v.\_2.split(System.lineSeparator()).map((\_, 1))
}).repartition(1).saveAsTextFile("hdfs://hdfscluster/tmp/logs/ssoHot3")
}
//java scala转换
def java_scala_collection_convert = {
var lstT = new ListBuffer[Int]()
//注意java,scala转换
import scala.collection.JavaConverters._
val lstBack = SensitiveDevice.batchDecrypt(lstT.toList.asJava).asScala
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章