SparkSQL学习笔记
阅读原文时间:2023年07月08日阅读:1

概述

冠状病毒来临,宅在家中给国家做贡献之际,写一篇随笔记录SparkSQL的学习笔记,目的有二,一是记录整理之前的知识作为备忘录,二是分享技术,大家共同进步,有问题也希望大家不吝赐教。总体而言,大数据主要包含三种操作:长时间运行的批量数据处理;交互式运行的数据查询;实时数据流处理。sparkSQL特点:数据兼容,不仅兼容hive,还可以从rdd,parquet文件,json文件获取数据,支持从rdbms获取数据。性能优化,采用内存列式存储、自定义序列化器等方式提升性能。组件扩展,sql的语法解析器、分析器、优化器都可以重新定义和扩展。Spark SQL 是spark中用于处理结构化数据的模块。Spark SQL相对于RDD的API来说,提供更多结构化数据信息和计算方法。Spark SQL 提供更多额外的信息进行优化。可以通过SQL或DataSet API方式同Spark SQL进行交互。无论采用哪种方法,哪种语言进行计算操作,实际上都用相同的执行引擎,因此使用者可以在不同的API中进行切换,选择一种最自然的方式去 执行一个转换,一种使用Spark SQL 的方法是进行SQL查询。Spark SQL 可以从存在的Hive中读取数据。当在编程语言中使用SQL的时候,结果将返回一个DataSet或DataFrame类型封装的对象。你也可以通过命令行或JDBC/ODBC方式使用SQL接口。

SparkSession

SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。

  在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建
和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;
对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的
API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame
API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼
容,SQLContext和HiveContext也被保存下来。

  SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以
在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了
sparkContext,所以计算实际上是由sparkContext完成的。

特点:

---- 为用户提供一个统一的切入点使用Spark 各项功能

---- 允许用户通过它调用 DataFrame 和 Dataset 相关 API 来编写程序

---- 减少了用户需要了解的一些概念,可以很容易的与 Spark 进行交互

---- 与 Spark 交互之时不需要显示的创建 SparkConf, SparkContext 以及 SQlContext,这些对象已经封闭在 SparkSession 中

DataFrames

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前 者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升 运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

DataFrame是个分布式集合,其中数据别组织成命名的列,可以看做关系数据库中的表,底层做了很多优化,可以通过很多数据源进行构建,比如RDD、结构化文件、外部数据库、Hive表。DataFrame的前身是SchemaRDD。Spark1.3开始SchemaRDD更改为DataFrame。区别,不继承RDD,自己实现了RDD的大部分功能。可以在DataFrame上调用RDD的方法转化成另外一个RDD。ataFrame可以看做分布式Row对象的集合,其提供了由列组成的详细模式信息,使其可以得到优化。DataFrame 不仅有比RDD更多的算子,还可以进行执行计划的优化。DataSet包含了DataFrame的功能,Spark2.0中两者统一,DataFrame表示为DataSet[Row],即DataSet的子集。使用API尽量使用DataSet ,不行再选用DataFrame,其次选择RDD。

要使用DataFrame,在2.0中需要SparkSession这个类,创建这个类的方法如下:

1 // $example on:init_session$
2 val spark = SparkSession
3 .builder()
4 .appName("Spark SQL basic example").master("local[1]")
5 .config("spark.some.config.option", "some-value")
6 .getOrCreate()

通过SparkSession,应用程序可以通过存在的RDD、或者Hive表中或者Spark数据源中中创建DataFrame,下面是从JSON文件中创建DataFrame:

1 // $example on:create_df$
2 /* people.json
3 {"name":"Michael"}
4 {"name":"Andy", "age":30}
5 {"name":"Justin", "age":19}*/
6 val df = spark.read.json("F:\\spark\\people.json")
7
8 // Displays the content of the DataFrame to stdout
9 df.show()
10 // +----+-------+
11 // | age| name|
12 // +----+-------+
13 // |null|Michael|
14 // | 30| Andy|
15 // | 19| Justin|
16 // +----+-------+

DataFrame 为DataSet[Row],可以执行一些非强制类型的转换,例子如下:

1 // $example on:untyped_ops$
2 // This import is needed to use the $-notation
3 import spark.implicits._
4 // Print the schema in a tree format
5 df.printSchema()
6 // root
7 // |-- age: long (nullable = true)
8 // |-- name: string (nullable = true)
9
10 // Select only the "name" column
11 df.select("name").show()
12 // +-------+
13 // | name|
14 // +-------+
15 // |Michael|
16 // | Andy|
17 // | Justin|
18 // +-------+
19
20 // Select everybody, but increment the age by 1
21 df.select($"name", $"age" + 1).show()
22 // +-------+---------+
23 // | name|(age + 1)|
24 // +-------+---------+
25 // |Michael| null|
26 // | Andy| 31|
27 // | Justin| 20|
28 // +-------+---------+
29
30 // Select people older than 21
31 df.filter($"age" > 21).show()
32 // +---+----+
33 // |age|name|
34 // +---+----+
35 // | 30|Andy|
36 // +---+----+
37
38 // Count people by age
39 df.groupBy("age").count().show()
40 // +----+-----+
41 // | age|count|
42 // +----+-----+
43 // | 19| 1|
44 // |null| 1|
45 // | 30| 1|
46 // +----+-----+
47 // $example off:untyped_ops$
48
49 // $example on:run_sql$
50 // Register the DataFrame as a SQL temporary view

我自己理解是,DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对 一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。在程序中使用SQL查询。在SparkSession中可以用程序的方式运行SQL查询,结果作为一个DataFrame返回。

1 // $example on:run_sql$
2 // Register the DataFrame as a SQL temporary view
3 df.createOrReplaceTempView("people")
4
5 val sqlDF = spark.sql("SELECT * FROM people")
6 sqlDF.show()
7 // +----+-------+
8 // | age| name|
9 // +----+-------+
10 // |null|Michael|
11 // | 30| Andy|
12 // | 19| Justin|
13 // +----+-------+
14 // $example off:run_sql$
15
16 // $example on:global_temp_view$
17 // Register the DataFrame as a global temporary view
18 df.createGlobalTempView("people")
19
20 // Global temporary view is tied to a system preserved database `global_temp`
21 spark.sql("SELECT * FROM global_temp.people").show()
22 // +----+-------+
23 // | age| name|
24 // +----+-------+
25 // |null|Michael|
26 // | 30| Andy|
27 // | 19| Justin|
28 // +----+-------+
29
30 // Global temporary view is cross-session
31 spark.newSession().sql("SELECT * FROM global_temp.people").show()
32 // +----+-------+
33 // | age| name|
34 // +----+-------+
35 // |null|Michael|
36 // | 30| Andy|
37 // | 19| Justin|
38 // +----+-------+
39 // $example off:global_temp_view$

DataSet

DataSet是分布式的数据集合。DataSet是在Spark1.6中添加的新的接口。它集中了RDD的优点(强类型 和可以用强大lambda函数)以及Spark SQL优化的执行引擎。DataSet可以通过JVM的对象进行构建,可以用函数式的转换(map/flatmap/filter)进行多种操作。DataSet API 在Scala和Java中都是可以用的。DataSet 通过Encoder实现了自定义的序列化格式,使得某些操作可以在无需序列化情况下进行。另外Dataset还进行了包括Tungsten优化在内的很多性能方面的优化。

DataSet不同于RDD,没有使用Java序列化器或者Kryo进行序列化,而是使用一个特定的编码器进行序列化,这些序列化器可以自动生成,而且在spark执行很多操作(过滤、排序、hash)的时候不用进行反序列化。

DataSet的创建:

1 import spark.implicits._
2 // $example on:create_ds$
3 // Encoders are created for case classes
4 val caseClassDS = Seq(Person("Andy", 32)).toDS()
5 caseClassDS.show()
6 // +----+---+
7 // |name|age|
8 // +----+---+
9 // |Andy| 32|
10 // +----+---+
11
12 // Encoders for most common types are automatically provided by importing spark.implicits._
13 val primitiveDS = Seq(1, 2, 3).toDS()
14 primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
15
16 // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
17 val path = "F:\\spark\\people.json"
18 val peopleDS = spark.read.json(path).as[Person]
19 peopleDS.show()
20 // +----+-------+
21 // | age| name|
22 // +----+-------+
23 // |null|Michael|
24 // | 30| Andy|
25 // | 19| Justin|
26 // +----+-------+
27 // $example off:create_ds$

正如上述,DataSet不光有各个字段名,而且有其详细的类型,使其在编译的时候就可以进行错误的检查。

DataSet的数据库基本操作:

1 val df1 = spark.createDataset(Seq(("aaa", 1, 2), ("bbb", 3, 4), ("ccc", 3, 5),
2 ("bbb", 4, 6)) ).toDF("key1","key2","key3")
3 val df2 = spark.createDataset(Seq(("aaa", 2, 2), ("bbb", 3, 5), ("ddd", 3, 5),
4 ("bbb", 4, 6), ("eee", 1, 2), ("aaa", 1, 5), ("fff",5,6)))
5 .toDF("key1","key2","key4")
6
7 df1.printSchema
8 df2.printSchema
9 /*root
10 |-- key1: string (nullable = true)
11 |-- key2: integer (nullable = false)
12 |-- key3: integer (nullable = false)
13
14 root
15 |-- key1: string (nullable = true)
16 |-- key2: integer (nullable = false)
17 |-- key4: integer (nullable = false)*/
18 df1.show()
19 df2.show()
20 /*+----+----+----+
21 |key1|key2|key3|
22 +----+----+----+
23 | aaa| 1| 2|
24 | bbb| 3| 4|
25 | ccc| 3| 5|
26 | bbb| 4| 6|
27 +----+----+----+
28
29 +----+----+----+
30 |key1|key2|key4|
31 +----+----+----+
32 | aaa| 2| 2|
33 | bbb| 3| 5|
34 | ddd| 3| 5|
35 | bbb| 4| 6|
36 | eee| 1| 2|
37 | aaa| 1| 5|
38 | fff| 5| 6|
39 +----+----+----+*/
40 //内连接 select * from df1 join df2 on df1.key1=df2.key1
41 val df3 = df1.join(df2,"key1")
42 df3.printSchema()
43 df3.show()
44 /* root
45 |-- key1: string (nullable = true)
46 |-- key2: integer (nullable = false)
47 |-- key3: integer (nullable = false)
48 |-- key2: integer (nullable = false)
49 |-- key4: integer (nullable = false)*/
50 /*+----+----+----+----+----+
51 |key1|key2|key3|key2|key4|
52 +----+----+----+----+----+
53 | aaa| 1| 2| 1| 5|
54 | aaa| 1| 2| 2| 2|
55 | bbb| 3| 4| 4| 6|
56 | bbb| 3| 4| 3| 5|
57 | bbb| 4| 6| 4| 6|
58 | bbb| 4| 6| 3| 5|
59 +----+----+----+----+----+*/
60
61
62 //还是内连接,这次用joinWith。和join的区别是连接后的新Dataset的schema会不一样,注意和上面的对比一下。
63 val df4=df1.joinWith(df2,df1("key1")===df2("key1"))
64 df4.printSchema
65 df4.show
66 /*root
67 |-- _1: struct (nullable = false)
68 | |-- key1: string (nullable = true)
69 | |-- key2: integer (nullable = false)
70 | |-- key3: integer (nullable = false)
71 |-- _2: struct (nullable = false)
72 | |-- key1: string (nullable = true)
73 | |-- key2: integer (nullable = false)
74 | |-- key4: integer (nullable = false)*/
75 /*+---------+---------+
76 | _1| _2|
77 +---------+---------+
78 |[aaa,1,2]|[aaa,1,5]|
79 |[aaa,1,2]|[aaa,2,2]|
80 |[bbb,3,4]|[bbb,4,6]|
81 |[bbb,3,4]|[bbb,3,5]|
82 |[bbb,4,6]|[bbb,4,6]|
83 |[bbb,4,6]|[bbb,3,5]|
84 +---------+---------+*/
85 //外连接
86 val df5 = df1.join(df2,df1("key1")===df2("key1"), "outer")
87 df5.printSchema
88 df5.show
89 /*root
90 |-- key1: string (nullable = true)
91 |-- key2: integer (nullable = true)
92 |-- key3: integer (nullable = true)
93 |-- key1: string (nullable = true)
94 |-- key2: integer (nullable = true)
95 |-- key4: integer (nullable = true)*/
96 /* +----+----+----+----+----+----+
97 |key1|key2|key3|key1|key2|key4|
98 +----+----+----+----+----+----+
99 |null|null|null| ddd| 3| 5|
100 | ccc| 3| 5|null|null|null|
101 | aaa| 1| 2| aaa| 2| 2|
102 | aaa| 1| 2| aaa| 1| 5|
103 | bbb| 3| 4| bbb| 3| 5|
104 | bbb| 3| 4| bbb| 4| 6|
105 | bbb| 4| 6| bbb| 3| 5|
106 | bbb| 4| 6| bbb| 4| 6|
107 |null|null|null| fff| 5| 6|
108 |null|null|null| eee| 1| 2|
109 +----+----+----+----+----+----+*/
110 //左外连接
111 val df6 = df1.join(df2,df1("key1")===df2("key1"), "left_outer")
112 df6.show
113 /*+----+----+----+----+----+----+
114 |key1|key2|key3|key1|key2|key4|
115 +----+----+----+----+----+----+
116 | aaa| 1| 2| aaa| 1| 5|
117 | aaa| 1| 2| aaa| 2| 2|
118 | bbb| 3| 4| bbb| 4| 6|
119 | bbb| 3| 4| bbb| 3| 5|
120 | ccc| 3| 5|null|null|null|
121 | bbb| 4| 6| bbb| 4| 6|
122 | bbb| 4| 6| bbb| 3| 5|
123 +----+----+----+----+----+----+*/
124
125
126 //右外链接
127 val df7 = df1.join(df2,df1("key1")===df2("key1"), "right_outer")
128 df7.show()
129 /* +----+----+----+----+----+----+
130 |key1|key2|key3|key1|key2|key4|
131 +----+----+----+----+----+----+
132 | aaa| 1| 2| aaa| 2| 2|
133 | bbb| 4| 6| bbb| 3| 5|
134 | bbb| 3| 4| bbb| 3| 5|
135 |null|null|null| ddd| 3| 5|
136 | bbb| 4| 6| bbb| 4| 6|
137 | bbb| 3| 4| bbb| 4| 6|
138 |null|null|null| eee| 1| 2|
139 | aaa| 1| 2| aaa| 1| 5|
140 |null|null|null| fff| 5| 6|
141 +----+----+----+----+----+----+*/
142 //左半连接
143 val df8 = df1.join(df2,df1("key1")===df2("key1"), "leftsemi")
144 df8.show()
145 /* +----+----+----+
146 |key1|key2|key3|
147 +----+----+----+
148 | aaa| 1| 2|
149 | bbb| 3| 4|
150 | bbb| 4| 6|
151 +----+----+----+*/
152 //笛卡尔连接
153 val df9 = df1.crossJoin(df2)
154 df9.show()
155 /*+----+----+----+----+----+----+
156 |key1|key2|key3|key1|key2|key4|
157 +----+----+----+----+----+----+
158 | aaa| 1| 2| aaa| 2| 2|
159 | aaa| 1| 2| bbb| 3| 5|
160 | aaa| 1| 2| ddd| 3| 5|
161 | aaa| 1| 2| bbb| 4| 6|
162 | aaa| 1| 2| eee| 1| 2|
163 | aaa| 1| 2| aaa| 1| 5|
164 | aaa| 1| 2| fff| 5| 6|
165 | bbb| 3| 4| aaa| 2| 2|
166 | bbb| 3| 4| bbb| 3| 5|
167 | bbb| 3| 4| ddd| 3| 5|
168 | bbb| 3| 4| bbb| 4| 6|
169 | bbb| 3| 4| eee| 1| 2|
170 | bbb| 3| 4| aaa| 1| 5|
171 | bbb| 3| 4| fff| 5| 6|
172 | ccc| 3| 5| aaa| 2| 2|
173 | ccc| 3| 5| bbb| 3| 5|
174 | ccc| 3| 5| ddd| 3| 5|
175 | ccc| 3| 5| bbb| 4| 6|
176 | ccc| 3| 5| eee| 1| 2|
177 | ccc| 3| 5| aaa| 1| 5|
178 +----+----+----+----+----+----+*/
179 //基于两个公共字段key1和key的等值连接
180 val df10 = df1.join(df2, Seq("key1","key2"))
181 df10.show()
182 ////+----+----+----+----+
183 ////|key1|key2|key3|key4|
184 ////+----+----+----+----+
185 ////| aaa| 1| 2| 5|
186 // | bbb| 3| 4| 5|
187 // | bbb| 4| 6| 6|
188 // +----+----+----+----+
189
190 //select df1.*,df2.* from df1 join df2 on df1.key1=df2.key1 and df1.key2>df2.key2
191 val df11 = df1.join(df2, df1("key1")===df2("key1") && df1("key2")>df2("key2"))
192 df11.show()
193 //+----+----+----+----+----+----+
194 //|key1|key2|key3|key1|key2|key4|
195 // +----+----+----+----+----+----+
196 //| bbb| 4| 6| bbb| 3| 5|
197 //+----+----+----+----+----+----+

DataSet与RDD互操作

Spark SQL 支持两种不同的方法用于将存在的RDD转成Datasets。

第一种方法使用反射去推断包含特定类型对象的RDD模式(schema),该模式使你的代码更加简练,不过你必须在写Spark的程序的时候已经知道模式信息(比如RDD中的对象是自己定义的case class类型)。

第二种方法是通过一个编程接口,此时你需要构造一个模式,将其应用到一个已经存在的RDD上将其转化为DataFrame,该方法适用于运行之前还不知道列以及列的类型。

用反射推断模式

Spark SQL的Scala接口支持将包含case class的RDD自动转换为DataFrame。case
class定义了表的模式,case class的参数名被反射读取并成为表的列名。case
class也可以嵌套或者包含复杂类型(如序列或者数组)。示例如下:

1 import spark.implicits._
2 // Create an RDD of Person objects from a text file, convert it to a Dataframe
3 val peopleDF = spark.sparkContext
4 .textFile("F:\\spark\\people.json")
5 .map(_.split(","))
6 .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
7 .toDF()
8 // Register the DataFrame as a temporary view
9 peopleDF.createOrReplaceTempView("people")
10
11 // SQL statements can be run by using the sql methods provided by Spark
12 val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
13
14 // The columns of a row in the result can be accessed by field index
15 teenagersDF.map(teenager => "Name: " + teenager(0)).show()
16 // +------------+
17 // | value|
18 // +------------+
19 // |Name: Justin|
20 // +------------+
21
22 // or by field name
23 teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
24 // +------------+
25 // | value|
26 // +------------+
27 // |Name: Justin|
28 // +------------+
29
30 // No pre-defined encoders for Dataset[Map[K,V]], define explicitly
31 implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
32 // Primitive types and case classes can be also defined as
33 implicit val stringIntMapEncoder: Encoder[Map[String, Int]] = ExpressionEncoder()
34
35 // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
36 teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
37 // Array(Map("name" -> "Justin", "age" -> 19))

编程指定模式

当case class不能提前定义时(比如记录的结构被编码为字符串,或者当文本数据集被解析时不同用户需要映射不同的字段),可以通过下面三步来将RDD转换为DataFrame:

1、从原始RDD创建得到一个包含Row对象的RDD。

2、创建一个与第1步中Row的结构相匹配的StructType,以表示模式信息。

3、通过createDataFrame()将模式信息应用到第1步创建的RDD上。

举例说明:

1 // 1、Create an RDD
2 val peopleRDD = spark.sparkContext.textFile("F:\\spark\\people.json")
3
4 // The schema is encoded in a string
5 val schemaString = "name age"
6
7 ///2、Generate the schema based on the string of schema
8 val fields = schemaString.split(" ")
9 .map(fieldName => StructField(fieldName, StringType, nullable = true))
10 val schema = StructType(fields)
11
12 // Convert records of the RDD (people) to Rows
13 val rowRDD = peopleRDD
14 .map(_.split(","))
15 .map(attributes => Row(attributes(0), attributes(1).trim))
16
17 //3、 Apply the schema to the RDD
18 val peopleDF = spark.createDataFrame(rowRDD, schema)
19
20 // Creates a temporary view using the DataFrame
21 peopleDF.createOrReplaceTempView("people")
22
23 // SQL can be run over a temporary view created using DataFrames
24 val results = spark.sql("SELECT name FROM people")
25
26 // The results of SQL queries are DataFrames and support all the normal RDD operations
27 // The columns of a row in the result can be accessed by field index or by field name
28 results.map(attributes => "Name: " + attributes(0)).show()
29 // +-------------+
30 // | value|
31 // +-------------+
32 // |Name: Michael|
33 // | Name: Andy|
34 // | Name: Justin|
35 // +-------------+

数据保存格式

使用mode

df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\\444")

数据源

Parquet 格式是被许多其他的数据处理系统支持的列数据格式类型。Spark Sql支持在读写Parquet文件的时候自动保存原始数据的模式信息。在写Parquet文件时候,所有的列将会因为兼容原因转成nullable。

编程方式加载Parquet数据:

1 // Encoders for most common types are automatically provided by importing spark.implicits._
2 import spark.implicits._
3
4 val peopleDF = spark.read.json("examples/src/main/resources/people.json")
5
6 // DataFrames can be saved as Parquet files, maintaining the schema information
7 peopleDF.write.parquet("people.parquet")
8
9 // Read in the parquet file created above
10 // Parquet files are self-describing so the schema is preserved
11 // The result of loading a Parquet file is also a DataFrame
12 val parquetFileDF = spark.read.parquet("people.parquet")
13
14 // Parquet files can also be used to create a temporary view and then used in SQL statements
15 parquetFileDF.createOrReplaceTempView("parquetFile")
16 val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
17 namesDF.map(attributes => "Name: " + attributes(0)).show()
18 // +------------+
19 // | value|
20 // +------------+
21 // |Name: Justin|
22 // +------------+

Parquet分区自动发现

在很多系统中(如Hive),表分区是一个通用的优化方法。在一个分区的表中,数据通常存储在不同的目录中,列名和列值通常被编码在分区目录名中以区分不 同的分区。Parquet数据源能够自动地发现和推断分区信息。 如下是人口分区表目录结构,其中gender和country是分区列:

path

└── to

└── table

├── gender=male

│ ├── …

│ │

│ ├── country=US

│ │ └── data.parquet

│ ├── country=CN

│ │ └── data.parquet

│ └── …

└── gender=female

├── …

├── country=US

│ └── data.parquet

├── country=CN

│ └── data.parquet

└── …

传递/path/to/table 给SparkSession.read.parquet或者SparkSession.read.load,Spark SQL

将会自动从路径中提取分区信息,返回的DataFrame的分区信息如下:

root

|-- name: string (nullable = true)

|-- age: long (nullable = true)

|-- gender: string (nullable = true)

|-- country: string (nullable = true)

注意上述分区的数据类型是自动推断的。目前支持数值类型和string类型。

如果你不想自动推断分区列的数据类型。自动推断分区列是通过spark.sql.sources.partitionColumnTypeInference.enabled,选项,默认为ture。当类型推断不可用时候,自动指定分区的列为string类型。

从spark1.6开始、分区默认只发现给定路径下的分区。比如用户传递/path/to/table/gender=male
作为读取数据路径,gender将不被作为一个分区列。你可以在数据源选项中设置basePath来指定分区发现应该开始的基路径,那样像上述设
置,gender将会被作为分区列.

Parquet模式合并

就像ProtocolBuffer、Avro和Thrift,Parquet也支持模式演化(schema
evolution)。这就意味着你可以向一个简单的模式逐步添加列从而构建一个复杂的模式。这种方式可能导致模式信息分散在不同的Parquet文件
中,Parquet数据源能够自动检测到这种情况并且合并所有这些文件中的模式信息。

但是由于模式合并是相对昂贵的操作,并且绝大多数情况下不是必须的,因此从Spark 1.5.0开始缺省关闭模式合并。开启方式:在读取Parquet文件时,

1、 设置数据源选项mergeSchema为true,

2、 或者设置全局的SQL选项spark.sql.parquet.mergeSchema为true。

示例如下:

1 // This is used to implicitly convert an RDD to a DataFrame.
2 import spark.implicits._
3
4 // Create a simple DataFrame, store into a partition directory
5 val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
6 squaresDF.write.parquet("data/test_table/key=1")
7
8 // Create another DataFrame in a new partition directory,
9 // adding a new column and dropping an existing column
10 val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
11 cubesDF.write.parquet("data/test_table/key=2")
12
13 // Read the partitioned table
14 val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
15 mergedDF.printSchema()
16
17 // The final schema consists of all 3 columns in the Parquet files together
18 // with the partitioning column appeared in the partition directory paths
19 // root
20 // |-- value: int (nullable = true)
21 // |-- square: int (nullable = true)
22 // |-- cube: int (nullable = true)
23 // |-- key : int (nullable = true)

Spark SQL 可以自动推断出JSON 数据集的模式,把它加载为一个DataSet[Row].在通过SparkSession.read.json()读取一个String类型的RDD或者一个JSON文件。

注意: 这里面的Json每一行必须是一个有效的JSOn对象,如果一个对象跨越多行将导致失败。

举例:

1 *// A JSON dataset is pointed to by path.*
2 *// The path can be either a single text file or a directory storing text files*
3 **val** path **=** "examples/src/main/resources/people.json"
4 **val** peopleDF **=** spark.read.json(path)
5
6 *// The inferred schema can be visualized using the printSchema() method*
7 peopleDF.printSchema()
8 *// root*
9 *// |-- age: long (nullable = true)*
10 *// |-- name: string (nullable = true)*
11
12 *// Creates a temporary view using the DataFrame*
13 peopleDF.createOrReplaceTempView("people")
14
15 *// SQL statements can be run by using the sql methods provided by spark*
16 **val** teenagerNamesDF **=** spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
17 teenagerNamesDF.show()
18 *// +------+*
19 *// | name|*
20 *// +------+*
21 *// |Justin|*
22 *// +------+*
23
24 *// Alternatively, a DataFrame can be created for a JSON dataset represented by*
25 *// an RDD[String] storing one JSON object per string*
26 **val** otherPeopleRDD **=** spark.sparkContext.makeRDD(
27 """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: **Nil**)
28 **val** otherPeople **=** spark.read.json(otherPeopleRDD)
29 otherPeople.show()
30 *// +---------------+----+*
31 *// | address|name|*
32 *// +---------------+----+*
33 *// |[Columbus,Ohio]| Yin|*
34 *// +---------------+----+*
35 otherPeople.printSchema()
36 root
37 |-- address: struct (nullable = true)
38 | |-- city: string (nullable = true)
39 | |-- state: string (nullable = true)
40 |-- name: string (nullable = true)

SparkSQL在有赞大数据的实践

在有赞大数据使用过程中,列举了当时集群采坑的经验,一些简单的参数设置,现在摘抄如下:

spark.sql.autoBroadcastJoinThreshold

这个配置在大家使用 SparkSQL 的时候会比较熟悉,在 join 的场景判断相关的表是否可以使用 BroadcastJoin ,默认阀值是 10 MB。目前阀值判断的比较逻辑会参考几个因素:文件的大小字段选择的裁剪。比如某张 Hive 表的数据大小为 13 MB , 表 schema 为 struct<id:long,name:string>,而假设当前 SQL 只使用到 name 字段,那根据字段选择情况并对文件大小进行裁剪估算所需总字节的公式为: 20 / (8 + 20) * 13 约等于 9.3 MB(各个字段类型有不同的估算字节,比如long 是 8 个字节 ,string 是 20 个字节等),从而满足 BroadcastJoin 的条件。但是这里有几种情况需要额外考虑:1、表存储格式带来的差异,比如 使用 ZLIB 压缩的 ORC 格式跟 TEXT 格式就在数据存储上的文件大小可能会差很多,即使两张表都是 ORC 格式,压缩率的差异也是存在; 2、字段字节估算本身就有一定的误差,比如 string 字段认为是 20 个字节,对于一些极端情况的 string 大字段,那估算误差就会比较大; 3、读取 Hive 表的 "raw" 数据到内存然后展开成 Java 对象,内存的消耗也有一定放大系数。所以 10M 的阀值,最终实际上需要的内存可能达到 1G,这个我们也是在生产环境上碰到过。

spark.blacklist.enabled

Spark 针对 Task 失败有重试机制,但是当一个 Task 在某一台 host上的 一个 Spark Executor 实例执行失败,下一次重试调度因为考虑 Data Locality 还是会大概率的选择那个 host 上的 Executor。如果失败是因为机器坏盘引起的,那重试还是会失败,重试次数达到最大后那最终整个 Job 失败。而开启 blacklist 功能可以解决此类问题,将发生失败的 Executor 实例或者 host 添加到黑名单,那么重试可以选择其他实例或者 host ,从而提高任务的容错能力

spark.scheduler.pool

当我们的调度离线计算 SQL 任务,大部分都使用 SparkSQL 带来的问题是有些低优先级的任务可能会消耗很多 Executor 资源,从而让高优先级的任务一直得不到充分的资源去完成任务。我们希望资源调度的策略是让优先级高的任务优先得到资源,所以使用 Fair Scheduler 策略,并配置不同资源权重的 Pool 给不同优先级的任务

spark.sql.adaptive.enabled

adaptive 功能支持 shuffle 下游 stage 可以根据上游 stage 产生的 shuffle 数据量自动调节下游 stage 的 task 数,这个功能我们主要是为了解决 Hive 表数据表很多小文件的问题(Map Only 的 SQL 场景不起作用)。adaptive 功能在 Spark 1.6 版本就已经支持,但是我们目前 yz-spark 版本合入是Intel 实现的增强版本(该版本还实现了另两个功能:动态调整执行计划动态处理数据倾斜),目前官方版本还没有合入(https://github.com/Intel-bigdata/spark-adaptive

SPARK-24809

这是一个 correctness 的 bug, 在 broadcast join 的情况下可能会发生数据结果不正确的情况。当用于 broadcast 的 LongToUnsafeRowMap 对象如果被多次的序列化反序列化就会触发,导致野指针的产生,从而可能产生不正确的结果。当时这个问题导致我们一张核心仓库表产生错误数据。由于这个 bug 几周才偶现一次,复现的困难导致花费了一个月时间才定位原因。这次教训也让我们意识到需要经常的去关注社区版本的迭代,及早发现特别是那些比较严重的 bug fix,避免生产上的故障。

这是 Spark External Shuffle 的一个内存泄漏 bug ,所以在开启该功能的情况才会触发。它在某些场景下会导致 NodeManager 的 ExternalShuffleService 开始内存泄漏,这个内存泄漏比较大的危害是导致一个 HashMap 对象变的越来越大,最终导致 shuffle fetch 请求越来越慢(每次 fetch 请求需要对这个 HashMap 的 values 进行 sum 统计,这个逻辑变慢),从而最终导致了我们生产环境的离线任务耗时时间在某天突然多了 30% 以上

举个例子, select * from t1 where f1 not in (select f1 from t2),对于 “ not in subquery ”的场景最终都会选择 BroadcastNestedLoopJoinExec 物理执行计划,而 BroadcastNestedLoopJoinExec 是一个非常低效的物理执行计划,内部实现将 subquery broadcast 成一个 list,然后 t1 每一条记录通过 loop 遍历 list 去匹配是否存在。由于它的低效可能会长时间占用 executor 资源,同时 subquery 结果数据量比较大的情况下,broadcast 可能带来 driver 的 OOM 风险。

SparkSQL在字节跳动的优化和实践

在 Spark 里,实际并没有 Bucket Join 算子。这里说的 Bucket Join 泛指不需要 Shuffle 的 SortMergeJoin。

下 图展示了 SortMergeJoin 的基本原理。用虚线框代表的 Table 1 和 Table 2 是两张需要按某字段进行 Join 的表。虚线框内的 partition 0 到 partition m 是该表转换成 RDD 后的 Partition,而非表的分区。假设 Table 1 与 Table 2 转换为 RDD 后分别包含 m 和 k 个 Partition。为了进行 Join,需要通过 Shuffle 保证相同 Join Key 的数据在同一个 Partition 内且 Partition 内按 Key 排序,同时保证 Table 1 与 Table 2 经过 Shuffle 后的 RDD 的 Partition 数相同。

如下图所示,经过 Shuffle 后只需要启动 n 个 Task,每个 Task 处理 Table 1 与 Table 2 中对应 Partition 的数据进行 Join 即可。如 Task 0 只需要顺序扫描 Shuffle 后的左右两边的 partition 0 即可完成 Join。

该方法的优势是适用场景广,几乎可用于任意大小的数据集。劣势是每次 Join 都需要对全量数据进行 Shuffle,而 Shuffle 是最影响 Spark SQL 性能的环节。如果能避免 Shuffle 往往能大幅提升 Spark SQL 性能。

对 于大数据的场景来讲,数据一般是一次写入多次查询。如果经常对两张表按相同或类似的方式进行 Join,每次都需要付出 Shuffle 的代价。与其这样,不如让数据在写的时候,就让数据按照利于 Join 的方式分布,从而使得 Join 时无需进行 Shuffle。如下图所示,Table 1 与 Table 2 内的数据按照相同的 Key 进行分桶且桶数都为 n,同时桶内按该 Key 排序。对这两张表进行 Join 时,可以避免 Shuffle,直接启动 n 个 Task 进行 Join。

字节跳动对 Spark SQL 的 BucketJoin 做了四项比较大的改进。

改进一:支持与 Hive 兼容

在过去一段时间,字节跳动把大量的 Hive 作业迁移到了 SparkSQL。而 Hive 与 Spark SQL 的 Bucket 表不兼容。对于使用 Bucket 表的场景,如果直接更新计算引擎,会造成 Spark SQL 写入 Hive Bucket 表的数据无法被下游的 Hive 作业当成 Bucket 表进行 Bucket Join,从而造成作业执行时间变长,可能影响 SLA。

为了解决这个问题,我们让 Spark SQL 支持 Hive 兼容模式,从而保证 Spark SQL 写入的 Bucket 表与 Hive 写入的 Bucket 表效果一致,并且这种表可以被 Hive 和 Spark SQL 当成 Bucket 表进行 Bucket Join 而不需要 Shuffle。通过这种方式保证 Hive 向 Spark SQL 的透明迁移。

第一个需要解决的问题是,Hive 的一个 Bucket 一般只包含一个文件,而 Spark SQL 的一个 Bucket 可能包含多个文件。解决办法是动态增加一次以 Bucket Key 为 Key 并且并行度与 Bucket 个数相同的 Shuffle。

第二个需要解决的问题是,Hive 1.x 的哈希方式与 Spark SQL 2.x 的哈希方式(Murmur3Hash)不同,使得相同的数据在 Hive 中的 Bucket ID 与 Spark SQL 中的 Bucket ID 不同而无法直接 Join。在 Hive 兼容模式下,我们让上述动态增加的 Shuffle 使用 Hive 相同的哈希方式,从而解决该问题。

改进二:支持倍数关系 Bucket Join

Spark SQL 要求只有 Bucket 相同的表才能(必要非充分条件)进行 Bucket Join。对于两张大小相差很大的表,比如几百 GB 的维度表与几十 TB (单分区)的事实表,它们的 Bucket 个数往往不同,并且个数相差很多,默认无法进行 Bucket Join。因此我们通过两种方式支持了倍数关系的 Bucket Join,即当两张 Bucket 表的 Bucket 数是倍数关系时支持 Bucket Join。

第一种方式,Task 个数与小表 Bucket 个数相同。如下图所示,Table A 包含 3 个 Bucket,Table B 包含 6 个 Bucket。此时 Table B 的 bucket 0 与 bucket 3 的数据合集应该与 Table A 的 bucket 0 进行 Join。这种情况下,可以启动 3 个 Task。其中 Task 0 对 Table A 的 bucket 0 与 Table B 的 bucket 0 + bucket 3 进行 Join。在这里,需要对 Table B 的 bucket 0 与 bucket 3 的数据再做一次 merge sort 从而保证合集有序。

如果 Table A 与 Table B 的 Bucket 个数相差不大,可以使用上述方式。如果 Table B 的 Bucket 个数是 Bucket A Bucket 个数的 10 倍,那上述方式虽然避免了 Shuffle,但可能因为并行度不够反而比包含 Shuffle 的 SortMergeJoin 速度慢。此时可以使用另外一种方式,即 Task 个数与大表 Bucket 个数相等,如下图所示。

在该方案下,可将 Table A 的 3 个 Bucket 读多次。在上图中,直接将 Table A 与 Table A 进行 Bucket Union (新的算子,与 Union 类似,但保留了 Bucket 特性),结果相当于 6 个 Bucket,与 Table B 的 Bucket 个数相同,从而可以进行 Bucket Join。

改进三:支持 BucketJoin 降级

公 司内部过去使用 Bucket 的表较少,在我们对 Bucket 做了一系列改进后,大量用户希望将表转换为 Bucket 表。转换后,表的元信息显示该表为 Bucket 表,而历史分区内的数据并未按 Bucket 表要求分布,在查询历史数据时会出现无法识别 Bucket 的问题。

同时,由于数据量上涨快,平均 Bucket 大小也快速增长。这会造成单 Task 需要处理的数据量过大进而引起使用 Bucket 后的效果可能不如直接使用基于 Shuffle 的 Join。

为 了解决上述问题,我们实现了支持降级的 Bucket 表。基本原理是,每次修改 Bucket 信息(包含上述两种情况——将非 Bucket 表转为 Bucket 表,以及修改 Bucket 个数)时,记录修改日期。并且在决定使用哪种 Join 方式时,对于 Bucket 表先检查所查询的数据是否只包含该日期之后的分区。如果是,则当成 Bucket 表处理,支持 Bucket Join;否则当成普通无 Bucket 的表。

改进四:支持超集

对于一张常用表,可能会与另外一张表按 User 字段做 Join,也可能会与另外一张表按 User 和 App 字段做 Join,与其它表按 User 与 Item 字段进行 Join。而 Spark SQL 原生的 Bucket Join 要求 Join Key Set 与表的 Bucket Key Set 完全相同才能进行 Bucket Join。在该场景中,不同 Join 的 Key Set 不同,因此无法同时使用 Bucket Join。这极大的限制了 Bucket Join 的适用场景。

针对此问题,我们支持了超集场景下的 Bucket Join。只要 Join Key Set 包含了 Bucket Key Set,即可进行 Bucket Join。

如 下图所示,Table X 与 Table Y,都按字段 A 分 Bucket。而查询需要对 Table X 与 Table Y 进行 Join,且 Join Key Set 为 A 与 B。此时,由于 A 相等的数据,在两表中的 Bucket ID 相同,那 A 与 B 各自相等的数据在两表中的 Bucket ID 肯定也相同,所以数据分布是满足 Join 要求的,不需要 Shuffle。同时,Bucket Join 还需要保证两表按 Join Key Set 即 A 和 B 排序,此时只需要对 Table X 与 Table Y 进行分区内排序即可。由于两边已经按字段 A 排序了,此时再按 A 与 B 排序,代价相对较低。

物化列

Spark SQL 处理嵌套类型数据时,存在以下问题:

读取大量不必要的数据:对于 Parquet / ORC 等列式存储格式,可只读取需要的字段,而直接跳过其它字段,从而极大节省 IO。而对于嵌套数据类型的字段,如下图中的 Map 类型的 people 字段,往往只需要读取其中的子字段,如 people.age。却需要将整个 Map 类型的 people 字段全部读取出来然后抽取出 people.age 字段。这会引入大量的无意义的 IO 开销。在我们的场景中,存在不少 Map 类型的字段,而且很多包含几十至几百个 Key,这也就意味着 IO 被放大了几十至几百倍。

无法进行向量化读取:而向量化读能极大的提升性能。但截止到目前(2019 年 10 月 26 日),Spark 不支持包含嵌套数据类型的向量化读取。这极大地影响了包含嵌套数据类型的查询性能。

不支持 Filter 下推:目前(2019 年 10 月 26 日)的 Spark 不支持嵌套类型字段上的 Filter 的下推。

重复计算:JSON 字段,在 Spark SQL 中以 String 类型存在,严格来说不算嵌套数据类型。不过实践中也常用于保存不固定的多个字段,在查询时通过 JSON Path 抽取目标子字段,而大型 JSON 字符串的字段抽取非常消耗 CPU。对于热点表,频繁重复抽取相同子字段非常浪费资源。

对于这个问题,做数仓的同学也想了一些解决方案。如下图所示,在名为 base_table 的表之外创建了一张名为 sub_table 的表,并且将高频使用的子字段 people.age 设置为一个额外的 Integer 类型的字段。下游不再通过 base_table 查询 people.age,而是使用 sub_table 上的 age 字段代替。通过这种方式,将嵌套类型字段上的查询转为了 Primitive 类型字段的查询,同时解决了上述问题。

这种方案存在明显缺陷:

额外维护了一张表,引入了大量的额外存储/计算开销。

无法在新表上查询新增字段的历史数据(如要支持对历史数据的查询,需要重跑历史作业,开销过大,无法接受)。

表的维护方需要在修改表结构后修改插入数据的作业。

需要下游查询方修改查询语句,推广成本较大。

运营成本高:如果高频子字段变化,需要删除不再需要的独立子字段,并添加新子字段为独立字段。删除前,需要确保下游无业务使用该字段。而新增字段需要通知并推进下游业务方使用新字段。

为解决上述所有问题,我们设计并实现了物化列。它的原理是:

新增一个 Primitive 类型字段,比如 Integer 类型的 age 字段,并且指定它是 people.age 的物化字段。

插入数据时,为物化字段自动生成数据,并在 Partition Parameter 内保存物化关系。因此对插入数据的作业完全透明,表的维护方不需要修改已有作业。

查询时,检查所需查询的所有 Partition,如果都包含物化信息(people.age 到 age 的映射),直接将 select people.age 自动重写为 select age,从而实现对下游查询方的完全透明优化。同时兼容历史数据。

下图展示了在某张核心表上使用物化列的收益:

物化视图

在 OLAP 领域,经常会对相同表的某些固定字段进行 Group By 和 Aggregate / Join 等耗时操作,造成大量重复性计算,浪费资源,且影响查询性能,不利于提升用户体验。

我们实现了基于物化视图的优化功能:

如上图所示,查询历史显示大量查询根据 user 进行 group by,然后对 num 进行 sum 或 count 计算。此时可创建一张物化视图,且对 user 进行 gorup by,对 num 进行 avg(avg 会自动转换为 count 和 sum)。用户对原始表进行 select user, sum(num) 查询时,Spark SQL 自动将查询重写为对物化视图的 select user, sum_num 查询。

Spark SQL 引擎上的其它优化

下图展示了我们在 Spark SQL 上进行的其它部分优化工作:

Shuffle 的原理,很多同学应该已经很熟悉了。鉴于时间关系,这里不介绍过多细节,只简单介绍下基本模型。

如上图所示,我们将 Shuffle 上游 Stage 称为 Mapper Stage,其中的 Task 称为 Mapper。Shuffle 下游 Stage 称为 Reducer Stage,其中的 Task 称为 Reducer。

每个 Mapper 会将自己的数据分为最多 N 个部分,N 为 Reducer 个数。每个 Reducer 需要去最多 M (Mapper 个数)个 Mapper 获取属于自己的那部分数据。

这个架构存在两个问题:

稳定性问题

Mapper 的 Shuffle Write 数据存于 Mapper 本地磁盘,只有一个副本。当该机器出现磁盘故障,或者 IO 满载,CPU 满载时,Reducer 无法读取该数据,从而引起 FetchFailedException,进而导致 Stage Retry。Stage Retry 会造成作业执行时间增长,直接影响 SLA。同时,执行时间越长,出现 Shuffle 数据无法读取的可能性越大,反过来又会造成更多 Stage Retry。如此循环,可能导致大型作业无法成功执行。

性能问题

每个 Mapper 的数据会被大量 Reducer 读取,并且是随机读取不同部分。假设 Mapper 的 Shuffle 输出为 512MB,Reducer 有 10 万个,那平均每个 Reducer 读取数据 512MB / 100000 = 5.24KB。并且,不同 Reducer 并行读取数据。对于 Mapper 输出文件而言,存在大量的随机读取。而 HDD 的随机 IO 性能远低于顺序 IO。最终的现象是,Reducer 读取 Shuffle 数据非常慢,反映到 Metrics 上就是 Reducer Shuffle Read Blocked Time 较长,甚至占整个 Reducer 执行时间的一大半,如下图所示。

基于 HDFS 的 Shuffle 稳定性提升

经观察,引起 Shuffle 失败的最大因素不是磁盘故障等硬件问题,而是 CPU 满载和磁盘 IO 满载。

如上图所示,机器的 CPU 使用率接近 100%,使得 Mapper 侧的 Node Manager 内的 Spark External Shuffle Service 无法及时提供 Shuffle 服务。

下图中 Data Node 占用了整台机器 IO 资源的 84%,部分磁盘 IO 完全打满,这使得读取 Shuffle 数据非常慢,进而使得 Reducer 侧无法在超时时间内读取数据,造成 FetchFailedException。

无论是何种原因,问题的症结都是 Mapper 侧的 Shuffle Write 数据只保存在本地,一旦该节点出现问题,会造成该节点上所有 Shuffle Write 数据无法被 Reducer 读取。解决这个问题的一个通用方法是,通过多副本保证可用性。

最 初始的一个简单方案是,Mapper 侧最终数据文件与索引文件不写在本地磁盘,而是直接写到 HDFS。Reducer 不再通过 Mapper 侧的 External Shuffle Service 读取 Shuffle 数据,而是直接从 HDFS 上获取数据,如下图所示。

快速实现这个方案后,我们做了几组简单的测试。结果表明:

Mapper 与 Reducer 不多时,Shuffle 读写性能与原始方案相比无差异。

Mapper 与 Reducer 较多时,Shuffle 读变得非常慢。

在上面的实验过程中,HDFS 发出了报警信息。如下图所示,HDFS Name Node Proxy 的 QPS 峰值达到 60 万。(注:字节跳动自研了 Node Name Proxy,并在 Proxy 层实现了缓存,因此读 QPS 可以支撑到这个量级)。

原因在于,总共 10000 Reducer,需要从 10000 个 Mapper 处读取数据文件和索引文件,总共需要读取 HDFS 10000 * 1000 * 2 = 2 亿次。

如 果只是 Name Node 的单点性能问题,还可以通过一些简单的方法解决。例如在 Spark Driver 侧保存所有 Mapper 的 Block Location,然后 Driver 将该信息广播至所有 Executor,每个 Reducer 可以直接从 Executor 处获取 Block Location,然后无须连接 Name Node,而是直接从 Data Node 读取数据。但鉴于 Data Node 的线程模型,这种方案会对 Data Node 造成较大冲击。

最后我们选择了一种比较简单可行的方案,如下图所示。

Mapper 的 Shuffle 输出数据仍然按原方案写本地磁盘,写完后上传到 HDFS。Reducer 仍然按原始方案通过 Mapper 侧的 External Shuffle Service 读取 Shuffle 数据。如果失败了,则从 HDFS 读取。这种方案极大减少了对 HDFS 的访问频率。

该方案上线近一年:

覆盖 57% 以上的 Spark Shuffle 数据。

使得 Spark 作业整体性能提升 14%。

天级大作业性能提升 18%。

小时级作业性能提升 12%。

该方案旨在提升 Spark Shuffle 稳定性从而提升作业稳定性,但最终没有使用方差等指标来衡量稳定性的提升。原因在于每天集群负载不一样,整体方差较大。Shuffle 稳定性提升后,Stage Retry 大幅减少,整体作业执行时间减少,也即性能提升。最终通过对比使用该方案前后的总的作业执行时间来对比性能的提升,用于衡量该方案的效果。

Shuffle 性能优化实践与探索

如上文所分析,Shuffle 性能问题的原因在于,Shuffle Write 由 Mapper 完成,然后 Reducer 需要从所有 Mapper 处读取数据。这种模型,我们称之为以 Mapper 为中心的 Shuffle。它的问题在于:

  • Mapper 侧会有 M 次顺序写 IO。
  • Mapper 侧会有 M * N * 2 次随机读 IO(这是最大的性能瓶颈)。
  • Mapper 侧的 External Shuffle Service 必须与 Mapper 位于同一台机器,无法做到有效的存储计算分离,Shuffle 服务无法独立扩展。

针对上述问题,我们提出了以 Reducer 为中心的,存储计算分离的 Shuffle 方案,如下图所示:

该方案的原理是,Mapper 直接将属于不同 Reducer 的数据写到不同的 Shuffle Service。在上图中,总共 2 个 Mapper,5 个 Reducer,5 个 Shuffle Service。所有 Mapper 都将属于 Reducer 0 的数据远程流式发送给 Shuffle Service 0,并由它顺序写入磁盘。Reducer 0 只需要从 Shuffle Service 0 顺序读取所有数据即可,无需再从 M 个 Mapper 取数据。该方案的优势在于:

  • 将 M * N * 2 次随机 IO 变为 N 次顺序 IO。
  • Shuffle Service 可以独立于 Mapper 或者 Reducer 部署,从而做到独立扩展,做到存储计算分离。
  • Shuffle Service 可将数据直接存于 HDFS 等高可用存储,因此可同时解决 Shuffle 稳定性问题。

我的分享就到这里,谢谢大家。

提问:物化列新增一列,是否需要修改历史数据?
回答:历史数据太多,不适合修改历史数据。

提问:如果用户的请求同时包含新数据和历史数据,如何处理?
回答:
般而言,用户修改数据都是以 Partition 为单位。所以我们在 Partition Parameter
上保存了物化列相关信息。如果用户的查询同时包含了新 Partition 与历史 Partition,我们会在新 Partition
上针对物化列进行 SQL Rewrite,历史 Partition 不 Rewrite,然后将新老 Partition 进行
Union,从而在保证数据正确性的前提下尽可能充分利用物化列的优势。

提问:你们针对用户的场景,做了很多挺有价值的优化。像物化列、物化视图,都需要根据用户的查询 Pattern 进行设置。目前你们是人工分析这些查询,还是有某种机制自动去分析并优化?
回答:目前我们主要是通过一些审计信息辅助人工分析。同时我们也正在做物化列与物化视图的推荐服务,最终做到智能建设物化列与物化视图。

提问:刚刚介绍的基于 HDFS 的 Spark Shuffle 稳定性提升方案,是否可以异步上传 Shuffle 数据至 HDFS?
回答:
个想法挺好,我们之前也考虑过,但基于几点考虑,最终没有这样做。第一,单 Mapper 的 Shuffle 输出数据量一般很小,上传到 HDFS
耗时在 2 秒以内,这个时间开销可以忽略;第二,我们广泛使用 External Shuffle Service 和 Dynamic
Allocation,Mapper 执行完成后可能 Executor 就回收了,如果要异步上传,就必须依赖其它组件,这会提升复杂度,ROI
较低。

总结

感谢网络大神的分享:

https://tech.youzan.com/sparksql-in-youzan-2/

https://www.cnblogs.com/gaopeng527/p/4315808.html

https://zhuanlan.zhihu.com/p/43374434

https://www.iteblog.com/archives/9734.html#Spark_Shuffle

https://spark.apache.org/docs/latest/monitoring.html

https://www.jianshu.com/p/a19486f5a0ea

https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples