spark textfile rdd 日记
阅读原文时间:2023年07月11日阅读:3

批量处理模板方法, 核心处理方法为内部方法

def batchProces(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int) = {

//自定义RDD,此处为demo  
val dataRDD = sc.makeRDD(List(1, 2), numPartitions)  
dataRDD.mapPartitions(iterator => {

  val rawData = iterator.toList  
  var lstT = new ListBuffer\[(Int, Int)\]()

  rawData.foreach(v => {  
    if (lstT.size < 50) {  
      lstT.append((v, 1))  
    } else {  
      //每50处理一次  
      procesData()  
    }  
  })

  //剩余的继续处理  
  procesData()

  //批量处理逻辑  
  def procesData() = {

    //核心处理逻辑  
    // doProcess  
    //很重要  
    lstT.clear()  
  }

  lstT.iterator

}).map((\_, 1)).reduceByKey(\_ + \_).sortByKey().saveAsTextFile("hdfs://hdfscluster/tmp/logs/")  

}

批量处理模板方法, 核心处理方法为外部方法

def process_outer(lst: List[(Int, Int)]) = {
//外部核心处理逻辑,如Request请求等
RequestUtil.postJson("http://xxx", "{paraData}", 1000)
}

def batchProces_processOuter(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int) = {
val fooCount = sc.longAccumulator("fooCount")
//自定义RDD,此处为demo
val dataRDD = sc.makeRDD(List(1, 2), numPartitions)
dataRDD.foreachPartition(iterator => {

  val rawData = iterator.toList  
  var lstT = new ListBuffer\[(Int, Int)\]()

  rawData.foreach(v => {  
    if (lstT.size < 50) {  
      lstT.append((v, 1))  
    } else {  
      //每50处理一次  
      process\_outer(lstT.toList)  
      fooCount.add(lstT.size)  
      lstT.clear()  
    }  
  })

  //剩余的继续处理  
  if (lstT.size > 0) {  
    process\_outer(lstT.toList)  
    fooCount.add(lstT.size)  
    lstT.clear()  
  }  
});  
println("total =>" + fooCount.value)  

}

针对文本文件RDD的一些处理逻辑:

//针对单个文件,每行数据超长的情况, 先对行进行拆分,再重新分区,将数据交给多个executor去执行
def bigLine(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int) = {
val fileRDD = sc.textFile("hdfs://hdfscluster/tmp/logs/abc.txt", numPartitions)

//对于长文本, 先拆分,然后重新分区,提高并发机器利用率, 减少job执行时间  
fileRDD.flatMap(\_.split(",")).repartition(24).foreach(println(\_))  

}

//针对无规律零散路径,循环内部使用sc
def handlerPath_lingsan(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int, filep: String) = {
val rawPath: List[String] = List("hdfs://hdfscluster/tmp1/path1", "hdfs://hdfscluster/tmp2/path2", "hdfs://hdfscluster/tmp3/path3")
val lsResult = rawPath.flatMap(v => {
sc.textFile(v).map((_, 1)).collect().toList
}).toList.foreach(println(_))
}

//针对文件夹,
def handlerPath_directroy(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int, filep: String) = {
//按行输出指定文件夹下所有文件,分区有效
val txtRDD = sc.textFile("hdfs://hdfscluster/tmp1/*", numPartitions)
//重新分区,便于输出结果
txtRDD.map((_, 1)).repartition(1)
.saveAsTextFile("hdfs://hdfscluster/tmp/logs/ssoHot3")
}

//针对文件夹,且路径下文件数量比较多且比较小的情况
def handlerPath_directroy(sc: SparkContext, locationFlag: Int, minid: Int, maxid: Int, numPartitions: Int, filep: String) = {

//返回结果key=文件路径,val=文件内容, 如果content太大的话,容易造成OOM  
val dirRDD = sc.wholeTextFiles("hdfs://hdfscluster/tmp1/\*", numPartitions)  
dirRDD.flatMap(v => {  
  v.\_2.split(System.lineSeparator()).map((\_, 1))  
}).repartition(1).saveAsTextFile("hdfs://hdfscluster/tmp/logs/ssoHot3")

}

//java scala转换

def java_scala_collection_convert = {
var lstT = new ListBuffer[Int]()
//注意java,scala转换
import scala.collection.JavaConverters._
val lstBack = SensitiveDevice.batchDecrypt(lstT.toList.asJava).asScala
}