Spark SQL和CSl
阅读原文时间:2023年07月09日阅读:3

目录

1

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object Demo1Sess {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder()
      .master("local")
      .appName("Demo1Sess")
    // 设置spark sql产生shuffle后默认的分区数 => 并行度
    // 默认是 200
      .config("spark.sql.shuffle.partitions",3)
      .getOrCreate()

    // 从SparkSession获取SparkContext
    //    val sc: SparkContext = spark.sparkContext

    // json中每条数据都自带结构 可以直接转换成DF
    val stuDF: DataFrame = spark
      .read
      .format("json")
      .load("spark/data/students.json")

    stuDF.show()  //默认显示20条

    // 文本类的数据 默认是没有列名的 直接读进来是 _c0 _c1 _c2 ......
    // 可以通过schema手动指定列名,空格隔开字段和字段类型
    val stucsDF: DataFrame = spark
      .read
      .format("csv")
      .schema("id String,name String,age Int,gender String,clazz String")
      .load("scala/data/students.txt")

    stucsDF.show()

    // 直接将DataFrame注册成临时视图view
    stucsDF.createOrReplaceTempView("stu")

    // sql的方式
    val ageDF: DataFrame = spark.sql("select * from stu where age=22")
    ageDF.show()

    // 同rdd一样,操作算子可以触发job

    // DSL 类SQL的方式 介于SQL和代码中间的API

    val dslDF: DataFrame = stucsDF.where("age=23")
      .select("name", "age", "clazz")
    dslDF.show()

    // 统计班级人数
    stucsDF.groupBy("clazz")
      .count()
      .write
      .mode(SaveMode.Overwrite)
      .save("spark/data/clazz_cnt")

    // 保存的时候可以指定SaveMode
    // Overwrite 覆盖
    // Append 追加
    // 默认以parquet形式保存

  }

}

2

import Practice.Student
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}

object Demo2CreateDF {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder()
      .master("local")
      .appName("Demo2CreateDF")
      .config("spark.sql.shuffle.partitions", 3)
      .getOrCreate()

    /**
      * 1、读json数据
      */

    val jsonDF: DataFrame = spark.read
      .format("json")
      .load("spark/data/students.json")

    //    jsonDF.show() // 默认显示20条
    //    jsonDF.show(100) // 显示100条
    //    jsonDF.show(false) // 完全显示
//    jsonDF.show()

    /**
      * 2、读文本文件
      */

    val csvDF: DataFrame = spark.read
      .format("csv")
      //csv 格式读取默认是以逗号分隔
      .option("sep", ",")
      .schema("id String,name String,age Int,gender String,clazz String")
      .load("scala/data/students.txt")

//    csvDF.show()

    /**
      * 3、JDBC 读取MySQL的一张表转换成 Spark SQL中的DF
      */

    val jdbcDF: DataFrame = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://master:3306/student")
      .option("dbtable", "student")
      .option("user", "root")
      .option("password", "123456")
      .load()

//    jdbcDF.show()

    // 将数据以parquet格式保存
//    jdbcDF
//      .write
//      .mode(SaveMode.Overwrite)
//      .parquet("spark/data/stu_parquet")

    /**
      * 4、读取parquet文件
      * 无法直接查看,默认会进行压缩,而且自带表结构,读取时不需要指定schema
      * 默认使用snappy压缩方式进行压缩
      */

    spark.read
      .format("parquet")
      .load("spark/data/stu_parquet")
//      .show()

    // 将数据以orc格式保存
    //    jdbcDF.write.orc("spark/data/stu_orc")

    /**
      * 5、读取ORC格式的文件
      * 也会默认进行压缩,空间占用率最小,默认带有表结构,可以直接读取
      */

//    spark
//      .read
//      .format("orc")
//      .load("spark/data/stu_orc")

    /**
      * 6、从RDD构建DF
      */

    val stuRDD: RDD[String] = spark.sparkContext.textFile("scala/data/students.txt")
    val stuRDD2: RDD[Student] = stuRDD.map(line => {
      val splits: Array[String] = line.split(",")
      val id: String = splits(0)
      val name: String = splits(1)
      val age: String = splits(2)
      val gender: String = splits(3)
      val clazz: String = splits(4)
      Student(id, name, age, gender, clazz)
    })

    // 导入隐式转换
    import spark.implicits._
    val sDF: DataFrame = stuRDD2.toDF()
    sDF.show()

    // DataFrame to RDD
    val rdd: RDD[Row] = sDF.rdd
    rdd.foreach(row=>{
      val id: String = row.getAs[String]("id")
      val name: String = row.getAs[String]("name")
      println(s"$id,$name")
    })

  }

  case class Student(id:String,name:String,age: String, gender: String, clazz: String)

}

3

import org.apache.spark.sql.{DataFrame, SparkSession}

object DFapi {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .master("local")
      .appName("DFapi")
      .config("spark.sql.shuffle.partitions", 2)
      .getOrCreate()

    import spark.implicits._

    val stuDF: DataFrame = spark.read
      .format("csv")
      .option("sep", ",")
      .schema("id String,name String,age String,gender String,clazz String")
      .load("scala/data/students.txt")

    // 对多次使用的DF也可进行cache
    stuDF.cache()

    // 过滤 where
    // 过滤出 年龄 大于 23的学生
    // DSL

    // 字符串表达式
    stuDF.where("age>23")

    // 列表达式 (推荐),需要先导入隐式转换
    stuDF.where($"age" > 23)

    // 使用filter加函数的方式进行过滤
    stuDF.filter(row => {
      val age: String = row.getAs[String]("age")
      if (age.toInt > 23) {
        true
      }
      else {
        false
      }
    })

    // select
    stuDF.select($"id", $"name", $"age" + 100 as "newage")

    // 分组 groupBy
    // 聚合
    // 统计班级人数
    stuDF.groupBy($"clazz")
      .count().show()

    // 导入所有的sql函数
    import org.apache.spark.sql.functions._
    // 统计每个班的性别人数
    stuDF.groupBy($"clazz", $"gender")
      .agg(count($"gender"))
      .show()

    // 统计班级人数(数据可能有重复)
    stuDF.groupBy($"clazz")
      .agg(countDistinct($"id") as "去重人数")
      .show()

    // SQL 的方式
    stuDF.createOrReplaceTempView("stu")
    spark.sql(
      """
        |select clazz,count(distinct id)
        |from stu
        |group by clazz
      """.stripMargin
    ).show()

    // join
    val scoreDF: DataFrame = spark.read
      .format("csv")
      .schema("sid String,sub_id String,score Int")
      .load("scala/data/score.txt")

    // 当两张表的关联字段名字一样时
    // 在这里直接指定 "id"  默认是inner join
    //      .join(scoreDF, "id")
    // 可以将 "id" 放入 List 传入 再指定关联类型
    //      .join(scoreDF, List("id"), "left")
    // 如果 关联字段不一样
    stuDF.join(scoreDF,$"id"===$"sid","left").show()

    stuDF.unpersist()

  }

}

4

import org.apache.spark.sql.{DataFrame, SparkSession}

object DianXin {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .master("local")
      .appName("DianXin")
      .config("spark.sql.shuffle.partitions", 2)
      .getOrCreate()

    val dxDF: DataFrame = spark.read
      .format("csv")
      .option("sep", ",")
      .schema("mdn String,grid_id String,city_id String,county_id String,t String,start_time String,end_time String,date String")
      .load("spark/data/dianxin_data")

    // 导入隐式转换
    import spark.implicits._
    // 导入Spark SQL中所有的函数
    import org.apache.spark.sql.functions._

    // 按城市统计每个区县的游客人数top3
    dxDF.createOrReplaceTempView("dx")

    spark.sql(
      """
        |select tt1.city_id,tt1.county_id,tt1.sum,tt1.rk
        |from
        |(select t1. city_id,t1.county_id,t1.sum,row_number() over (partition by county_id order by t1.sum desc) as rk
        |from
        |(select  city_id,county_id,count(distinct mdn) as sum
        |from
        |dx
        |group by city_id,county_id) t1) tt1
        |where tt1.rk<3
        |
      """.stripMargin
    ).show()

  }
}