import groovy.sql.DataSet
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object sparkSession {
case class Person(name:String,age:BigInt)
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[4]")
.appName("Spark SQL Example")
.getOrCreate()
//读取json文件,返回一个dataframe;
val df = spark.read.json("D:\\people.json")
import spark.implicits._
//查看DataFrame中的内容,默认显示20行;
df.show()
//----------------------dataframe和dataSet相互转换-----------------------------------
// val df:DataFrame = DataSet[Row] //dataframe和dataSet的关系;
val ds = df.as[Person]
val ds_1 = df.toJSON
val df_new = ds.toDF()
//----------------------------------------------------------------------------------
//打印DataFrame的Schema信息;可以理解为模型;
df.printSchema()
//查看DataFrame部分列中的内容,age
df.select("age").show()
//查看DataFrame部分列中的内容,age+1
df.select($"name",$"age"+1).show
//另外一种写法;查看age和name;
df.select(df("age"),df("name")).show()
//过滤age大于20的,显示10行;
df.filter($"age" > 20).show(10)
//统计年龄大于20的人数;
df.filter(df("age") > 20).count()
//按年龄进行分组,统计人数;
df.groupBy("name").count().show()
//collect方法会将df中的所有数据都获取到,并返回一个Row类型的Array对象
df.collect().foreach(println)
//和上面的类似,返回一个Row类型的List集合;
val list = df.collectAsList()
//获取指定字段的统计信息;
df.describe("name","age").show()
//跟sql语句的where条件一样;
df.where("age = 18 and name = 'jason'").show()
//根据某个字段筛选;
df.filter("name = 'jason'").show()
//获取指定的字段可以对字段做一些特殊的操作,可以写别名;
df.selectExpr("age","name as n").show()
//获取指定的字段,注意这个一次只能获取一个字段;
val age = df.col("age")
println(age)
//和上面的一样,也是获取某个字段;
val name = df.apply("name")
println(name)
//删除指定的字段;
df.drop("age").show()
//跟sql的limit一样,显示前几行;
df.limit(5).show()
//根据某个字段排序;
df.orderBy(df("age").desc).show()
//按照partition进行排序;
df.sortWithinPartitions("age").show()
//跟sql里面的一样,根据某个字段分组;
val groupby_name = df.groupBy("name").count().show()
//结合groupby做一些聚合操作;
df.groupBy("age").max().show()
//去重;
df.distinct().show()
//指定字段去重;
df.dropDuplicates("name").show()
//聚合操作;
df.agg("age"-> "max","age"-> "min").show()
//对结果叠加;
df.union(df).show()
//跟sql里面的join一样支持,left join,right join,inner join,这个操作非常的丰富,这里就不在一一列举了;
df.join(df,Seq("age"),"left").show()
//获取两个df中相同的数据,相当于inner join;
df.intersect(df).show()
//对指定字段重命名;
df.withColumnRenamed("name","name1").show()
//增加新的字段,默认显示为null
df.withColumn("name1",df("age")).show()
//前几天有人问我增加新的字段显示为0,怎么写?选择一列数值类型的乘以0就可以了;
df.withColumn("name2",df("age")*0).show()
}
}
val data1 = data.toDF("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating")
data1.limit(10).show
+-------+------+---+------------+--------+-------------+---------+----------+------+
|affairs|gender|age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+---+------------+--------+-------------+---------+----------+------+
| 0| male| 37| 10| no| 3| 18| 7| 4|
| 0| null| 27| null| no| 4| 14| 6| null|
| 0| null| 32| null| yes| 1| 12| 1| null|
| 0| null| 57| null| yes| 5| 18| 6| null|
| 0| null| 22| null| no| 2| 17| 6| null|
| 0| null| 32| null| no| 2| 17| 5| null|
| 0|female| 22| null| no| 2| 12| 1| null|
| 0| male| 57| 15| yes| 2| 14| 4| 4|
| 0|female| 32| 15| yes| 4| 16| 1| 2|
| 0| male| 22| 1.5| no| 4| 14| 4| 5|
+-------+------+---+------------+--------+-------------+---------+----------+------+
val resNull=data1.na.drop()
resNull.limit(10).show()
+-------+------+---+------------+--------+-------------+---------+----------+------+
|affairs|gender|age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+---+------------+--------+-------------+---------+----------+------+
| 0| male| 37| 10| no| 3| 18| 7| 4|
| 0| male| 57| 15| yes| 2| 14| 4| 4|
| 0|female| 32| 15| yes| 4| 16| 1| 2|
| 0| male| 22| 1.5| no| 4| 14| 4| 5|
| 0| male| 37| 15| yes| 2| 20| 7| 2|
| 0| male| 27| 4| yes| 4| 18| 6| 4|
| 0| male| 47| 15| yes| 5| 17| 6| 4|
| 0|female| 22| 1.5| no| 2| 17| 5| 4|
| 0|female| 27| 4| no| 4| 14| 5| 4|
| 0|female| 37| 15| yes| 1| 17| 5| 5|
+-------+------+---+------------+--------+-------------+---------+----------+------+
//删除某列的空值和NaN
val res=data1.na.drop(Array("gender","yearsmarried"))
// 删除某列的非空且非NaN的低于10的
data1.na.drop(10,Array("gender","yearsmarried"))
//填充所有空值的列
val res123=data1.na.fill("wangxiao123")
res123.limit(10).show()
+-------+-----------+---+------------+--------+-------------+---------+----------+-----------+
|affairs| gender|age|yearsmarried|children|religiousness|education|occupation| rating|
+-------+-----------+---+------------+--------+-------------+---------+----------+-----------+
| 0| male| 37| 10| no| 3| 18| 7| 4|
| 0|wangxiao123| 27| wangxiao123| no| 4| 14| 6|wangxiao123|
| 0|wangxiao123| 32| wangxiao123| yes| 1| 12| 1|wangxiao123|
| 0|wangxiao123| 57| wangxiao123| yes| 5| 18| 6|wangxiao123|
| 0|wangxiao123| 22| wangxiao123| no| 2| 17| 6|wangxiao123|
| 0|wangxiao123| 32| wangxiao123| no| 2| 17| 5|wangxiao123|
| 0| female| 22| wangxiao123| no| 2| 12| 1|wangxiao123|
| 0| male| 57| 15| yes| 2| 14| 4| 4|
| 0| female| 32| 15| yes| 4| 16| 1| 2|
| 0| male| 22| 1.5| no| 4| 14| 4| 5|
+-------+-----------+---+------------+--------+-------------+---------+----------+-----------+
//对指定的列空值填充
val res2=data1.na.fill(value="wangxiao111",cols=Array("gender","yearsmarried") )
res2.limit(10).show()
+-------+-----------+---+------------+--------+-------------+---------+----------+------+
|affairs| gender|age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+-----------+---+------------+--------+-------------+---------+----------+------+
| 0| male| 37| 10| no| 3| 18| 7| 4|
| 0|wangxiao111| 27| wangxiao111| no| 4| 14| 6| null|
| 0|wangxiao111| 32| wangxiao111| yes| 1| 12| 1| null|
| 0|wangxiao111| 57| wangxiao111| yes| 5| 18| 6| null|
| 0|wangxiao111| 22| wangxiao111| no| 2| 17| 6| null|
| 0|wangxiao111| 32| wangxiao111| no| 2| 17| 5| null|
| 0| female| 22| wangxiao111| no| 2| 12| 1| null|
| 0| male| 57| 15| yes| 2| 14| 4| 4|
| 0| female| 32| 15| yes| 4| 16| 1| 2|
| 0| male| 22| 1.5| no| 4| 14| 4| 5|
+-------+-----------+---+------------+--------+-------------+---------+----------+------+
//查询空值列
data1.filter("gender is null").select("gender").limit(10).show
+------+
|gender|
+------+
| null|
| null|
| null|
| null|
| null|
+------+
data1.filter( data1("gender").isNull ).select("gender").limit(10).show
+------+
|gender|
+------+
| null|
| null|
| null|
| null|
| null|
+------+
data1.filter("gender is not null").select("gender").limit(10).show
+------+
|gender|
+------+
| male|
|female|
| male|
|female|
| male|
| male|
| male|
| male|
|female|
|female|
+------+
data1.filter("gender<>''").select("gender").limit(10).show
+------+
|gender|
+------+
| male|
|female|
| male|
|female|
| male|
| male|
| male|
| male|
|female|
|female|
+------+
Dataset是一个强类型的特定领域的对象,这种对象可以函数式或者关系操作并行地转换。每个Dataset也有一个被称为一个DataFrame的类型化视图,这种DataFrame是Row类型的Dataset,即Dataset[Row]
Dataset是“懒惰”的,只在执行行动操作时触发计算。本质上,数据集表示一个逻辑计划,该计划描述了产生数据所需的计算。当执行行动操作时,Spark的查询优化程序优化逻辑计划,并生成一个高效的并行和分布式物理计划。
import scala.math._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrameReader
import org.apache.spark.sql.functions._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.DataFrameStatFunctions
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val dataList: List[(Double, String, Double, Double, String, Double, Double, Double, Double)] = List(
(0, "male", 37, 10, "no", 3, 18, 7, 4),
(0, "female", 27, 4, "no", 4, 14, 6, 4),
(0, "female", 32, 15, "yes", 1, 12, 1, 4),
(0, "male", 57, 15, "yes", 5, 18, 6, 5),
(0, "male", 22, 0.75, "no", 2, 17, 6, 3),
(0, "female", 32, 1.5, "no", 2, 17, 5, 5),
(0, "female", 22, 0.75, "no", 2, 12, 1, 3),
(0, "male", 57, 15, "yes", 2, 14, 4, 4),
(0, "female", 32, 15, "yes", 4, 16, 1, 2),
(0, "male", 22, 1.5, "no", 4, 14, 4, 5))
val data = dataList.toDF("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating")
data.printSchema()
root
|-- affairs: double (nullable = false)
|-- gender: string (nullable = true)
|-- age: double (nullable = false)
|-- yearsmarried: double (nullable = false)
|-- children: string (nullable = true)
|-- religiousness: double (nullable = false)
|-- education: double (nullable = false)
|-- occupation: double (nullable = false)
|-- rating: double (nullable = false)
// 在Spark-shell中展示,前n条记录
data.show(7)
+-------+------+----+------------+--------+-------------+---------+----------+------+
|affairs|gender| age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+----+------------+--------+-------------+---------+----------+------+
| 0.0| male|37.0| 10.0| no| 3.0| 18.0| 7.0| 4.0|
| 0.0|female|27.0| 4.0| no| 4.0| 14.0| 6.0| 4.0|
| 0.0|female|32.0| 15.0| yes| 1.0| 12.0| 1.0| 4.0|
| 0.0| male|57.0| 15.0| yes| 5.0| 18.0| 6.0| 5.0|
| 0.0| male|22.0| 0.75| no| 2.0| 17.0| 6.0| 3.0|
| 0.0|female|32.0| 1.5| no| 2.0| 17.0| 5.0| 5.0|
| 0.0|female|22.0| 0.75| no| 2.0| 12.0| 1.0| 3.0|
+-------+------+----+------------+--------+-------------+---------+----------+------+
only showing top 7 rows
// 取前n条记录
val data3=data.limit(5)
// 过滤
data.filter("age>50 and gender=='male' ").show
+-------+------+----+------------+--------+-------------+---------+----------+------+
|affairs|gender| age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+----+------------+--------+-------------+---------+----------+------+
| 0.0| male|57.0| 15.0| yes| 5.0| 18.0| 6.0| 5.0|
| 0.0| male|57.0| 15.0| yes| 2.0| 14.0| 4.0| 4.0|
+-------+------+----+------------+--------+-------------+---------+----------+------+
// 数据框的所有列
val columnArray=data.columns
columnArray: Array[String] = Array(affairs, gender, age, yearsmarried, children, religiousness, education, occupation, rating)
// 查询某些列的数据
data.select("gender", "age", "yearsmarried", "children").show(3)
+------+----+------------+--------+
|gender| age|yearsmarried|children|
+------+----+------------+--------+
| male|37.0| 10.0| no|
|female|27.0| 4.0| no|
|female|32.0| 15.0| yes|
+------+----+------------+--------+
only showing top 3 rows
val colArray=Array("gender", "age", "yearsmarried", "children")
colArray: Array[String] = Array(gender, age, yearsmarried, children)
data.selectExpr(colArray:_*).show(3)
+------+----+------------+--------+
|gender| age|yearsmarried|children|
+------+----+------------+--------+
| male|37.0| 10.0| no|
|female|27.0| 4.0| no|
|female|32.0| 15.0| yes|
+------+----+------------+--------+
only showing top 3 rows
// 操作指定的列,并排序
// data.selectExpr("gender", "age+1","cast(age as bigint)").orderBy($"gender".desc, $"age".asc).show
data.selectExpr("gender", "age+1 as age1","cast(age as bigint) as age2").sort($"gender".desc, $"age".asc).show
+------+----+----+
|gender|age1|age2|
+------+----+----+
| male|23.0| 22|
| male|23.0| 22|
| male|38.0| 37|
| male|58.0| 57|
| male|58.0| 57|
|female|23.0| 22|
|female|28.0| 27|
|female|33.0| 32|
|female|33.0| 32|
|female|33.0| 32|
+------+----+----+
val data4=data.selectExpr("gender", "age+1 as age1","cast(age as bigint) as age2").sort($"gender".desc, $"age".asc)
data4: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [gender: string, age1: double ... 1 more field]
// 查看物理执行计划
data4.explain()
== Physical Plan ==
*Project [gender#20, age1#135, age2#136L]
+- *Sort [gender#20 DESC, age#21 ASC], true, 0
+- Exchange rangepartitioning(gender#20 DESC, age#21 ASC, 200)
+- LocalTableScan [gender#20, age1#135, age2#136L, age#21]
// 查看逻辑和物理执行计划
data4.explain(extended=true)
== Parsed Logical Plan ==
'Sort ['gender DESC, 'age ASC], true
+- Project [gender#20, (age#21 + cast(1 as double)) AS age1#135, cast(age#21 as bigint) AS age2#136L]
+- Project [_1#9 AS affairs#19, _2#10 AS gender#20, _3#11 AS age#21, _4#12 AS yearsmarried#22, _5#13 AS children#23, _6#14 AS religiousness#24, _7#15 AS education#25, _8#16 AS occupation#2
6, _9#17 AS rating#27] +- LocalRelation [_1#9, _2#10, _3#11, _4#12, _5#13, _6#14, _7#15, _8#16, _9#17]
== Analyzed Logical Plan ==
gender: string, age1: double, age2: bigint
Project [gender#20, age1#135, age2#136L]
+- Sort [gender#20 DESC, age#21 ASC], true
+- Project [gender#20, (age#21 + cast(1 as double)) AS age1#135, cast(age#21 as bigint) AS age2#136L, age#21]
+- Project [_1#9 AS affairs#19, _2#10 AS gender#20, _3#11 AS age#21, _4#12 AS yearsmarried#22, _5#13 AS children#23, _6#14 AS religiousness#24, _7#15 AS education#25, _8#16 AS occupatio
n#26, _9#17 AS rating#27] +- LocalRelation [_1#9, _2#10, _3#11, _4#12, _5#13, _6#14, _7#15, _8#16, _9#17]
== Optimized Logical Plan ==
Project [gender#20, age1#135, age2#136L]
+- Sort [gender#20 DESC, age#21 ASC], true
+- LocalRelation [gender#20, age1#135, age2#136L, age#21]
== Physical Plan ==
*Project [gender#20, age1#135, age2#136L]
+- *Sort [gender#20 DESC, age#21 ASC], true, 0
+- Exchange rangepartitioning(gender#20 DESC, age#21 ASC, 200)
+- LocalTableScan [gender#20, age1#135, age2#136L, age#21]
import org.apache.spark.sql.functions._
// 对整个DataFrame的数据去重
data.distinct()
data.dropDuplicates()
// 对指定列的去重
val colArray=Array("affairs", "gender")
data.dropDuplicates(colArray)
//data.dropDuplicates("affairs", "gender")
val df=data.filter("gender=='male' ")
// data与df的差集
data.except(df).show
+-------+------+----+------------+--------+-------------+---------+----------+------+
|affairs|gender| age|yearsmarried|children|religiousness|education|occupation|rating|
+-------+------+----+------------+--------+-------------+---------+----------+------+
| 0.0|female|32.0| 15.0| yes| 1.0| 12.0| 1.0| 4.0|
| 0.0|female|32.0| 1.5| no| 2.0| 17.0| 5.0| 5.0|
| 0.0|female|32.0| 15.0| yes| 4.0| 16.0| 1.0| 2.0|
| 0.0|female|22.0| 0.75| no| 2.0| 12.0| 1.0| 3.0|
| 0.0|female|27.0| 4.0| no| 4.0| 14.0| 6.0| 4.0|
+-------+------+----+------------+--------+-------------+---------+----------+------+
// data与df的交集
data.intersect(df)
data.groupBy("gender").agg(count($"age"),max($"age").as("maxAge"), avg($"age").as("avgAge")).show
+------+----------+------+------+
|gender|count(age)|maxAge|avgAge|
+------+----------+------+------+
|female| 5| 32.0| 29.0|
| male| 5| 57.0| 39.0|
+------+----------+------+------+
data.groupBy("gender").agg("age"->"count","age" -> "max", "age" -> "avg").show
+------+----------+--------+--------+
|gender|count(age)|max(age)|avg(age)|
+------+----------+--------+--------+
|female| 5| 32.0| 29.0|
| male| 5| 57.0| 39.0|
+------+----------+--------+--------+
亲测
groupBy聚合算子
people.unionAll(newPeople).groupBy(col("name")).count.show
或者
people.unionAll(newPeople).groupBy($"name").count.show
或者
people.unionAll(newPeople).groupBy("name").count.show
调用DataFrame的toDF方法,重新命名全部列名,增加列名的可读性
people.groupBy("depId").agg(Map("age" -> "max", "gender" -> "count"))
.toDF("depId","maxAge","countGender").show
使用函数式编程方式对前后两个合并进行分组统计并显示结果
people.unionAll(newPeople).groupBy($"name").count.filter($"count" < 2).show
使用groupBy方法将合并后的DataFrame按照"name" 列进行分组,
得到GroupData类的实例,实例会自动带上分组的列,以及"count" 列。
GroupData类在spark2.0.x 版本改为RelationalGroupedDataset
// 创建视图
data.createOrReplaceTempView("Affairs")
val df1 = spark.sql("SELECT * FROM Affairs WHERE age BETWEEN 20 AND 25")
df1: org.apache.spark.sql.DataFrame = [affairs: double, gender: string ... 7 more fields]
// 子查询
val df2 = spark.sql("select gender, age,rating from ( SELECT * FROM Affairs WHERE age BETWEEN 20 AND 25 ) t ")
df2: org.apache.spark.sql.DataFrame = [gender: string, age: double ... 1 more field]
df2.show
+------+----+------+
|gender| age|rating|
+------+----+------+
| male|22.0| 3.0|
|female|22.0| 3.0|
| male|22.0| 5.0|
+------+----+------+
collect_set去除重复元素;collect_list不去除重复元素
select gender,
concat_ws(’,’, collect_set(children)),
concat_ws(’,’, collect_list(children))
from Affairs
group by gender
// 创建视图
data.createOrReplaceTempView("Affairs")
val df3= spark.sql("select gender,concat_ws(',',collect_set(children)),concat_ws(',',collect_list(children)) from Affairs group by gender")
df3: org.apache.spark.sql.DataFrame = [gender: string, concat_ws(,, collect_set(children)): string ... 1 more field]
df3.show // collect_set去除重复元素;collect_list不去除重复元素
+------+-----------------------------------+------------------------------------+
|gender|concat_ws(,, collect_set(children))|concat_ws(,, collect_list(children))|
+------+-----------------------------------+------------------------------------+
|female| no,yes| no,yes,no,no,yes|
| male| no,yes| no,yes,no,yes,no|
+------+-----------------------------------+------------------------------------+
val df6 = spark.sql("select gender,children,max(age),avg(age),count(age) from Affairs group by Cube(gender,children) order by 1,2")
df6.show
+------+--------+--------+--------+----------+
|gender|children|max(age)|avg(age)|count(age)|
+------+--------+--------+--------+----------+
| null| null| 57.0| 34.0| 10|
| null| no| 37.0| 27.0| 6|
| null| yes| 57.0| 44.5| 4|
|female| null| 32.0| 29.0| 5|
|female| no| 32.0| 27.0| 3|
|female| yes| 32.0| 32.0| 2|
| male| null| 57.0| 39.0| 5|
| male| no| 37.0| 27.0| 3|
| male| yes| 57.0| 57.0| 2|
+------+--------+--------+--------+----------+
val df7 = spark.sql("select gender,children,max(age),avg(age),count(age) from Affairs group by rollup(gender,children) order by 1,2")
df7.show
+------+--------+--------+--------+----------+
|gender|children|max(age)|avg(age)|count(age)|
+------+--------+--------+--------+----------+
| null| null| 57.0| 34.0| 10|
|female| null| 32.0| 29.0| 5|
|female| no| 32.0| 27.0| 3|
|female| yes| 32.0| 32.0| 2|
| male| null| 57.0| 39.0| 5|
| male| no| 37.0| 27.0| 3|
| male| yes| 57.0| 57.0| 2|
+------+--------+--------+--------+----------+
select gender,
age,
row_number() over(partition by gender order by age) as rowNumber,
rank() over(partition by gender order by age) as ranks,
dense_rank() over(partition by gender order by age) as denseRank,
percent_rank() over(partition by gender order by age) as percentRank
from Affairs
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val dataList: List[(Double, String, Double, Double, String, Double, Double, Double, Double)] = List(
(0, "male", 37, 10, "no", 3, 18, 7, 4),
(0, "female", 27, 4, "no", 4, 14, 6, 4),
(0, "female", 32, 15, "yes", 1, 12, 1, 4),
(0, "male", 57, 15, "yes", 5, 18, 6, 5),
(0, "male", 22, 0.75, "no", 2, 17, 6, 3),
(0, "female", 32, 1.5, "no", 2, 17, 5, 5),
(0, "female", 22, 0.75, "no", 2, 12, 1, 3),
(0, "male", 57, 15, "yes", 2, 14, 4, 4),
(0, "female", 32, 15, "yes", 4, 16, 1, 2),
(0, "male", 22, 1.5, "no", 4, 14, 4, 5),
(0, "male", 37, 15, "yes", 2, 20, 7, 2),
(0, "male", 27, 4, "yes", 4, 18, 6, 4),
(0, "male", 47, 15, "yes", 5, 17, 6, 4),
(0, "female", 22, 1.5, "no", 2, 17, 5, 4),
(0, "female", 27, 4, "no", 4, 14, 5, 4),
(0, "female", 37, 15, "yes", 1, 17, 5, 5),
(0, "female", 37, 15, "yes", 2, 18, 4, 3),
(0, "female", 22, 0.75, "no", 3, 16, 5, 4),
(0, "female", 22, 1.5, "no", 2, 16, 5, 5),
(0, "female", 27, 10, "yes", 2, 14, 1, 5),
(0, "female", 22, 1.5, "no", 2, 16, 5, 5),
(0, "female", 22, 1.5, "no", 2, 16, 5, 5),
(0, "female", 27, 10, "yes", 4, 16, 5, 4),
(0, "female", 32, 10, "yes", 3, 14, 1, 5),
(0, "male", 37, 4, "yes", 2, 20, 6, 4))
val data = dataList.toDF("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating")
data.printSchema()
// 创建视图
data.createOrReplaceTempView("Affairs")
val s1="row_number() over(partition by gender order by age) as rowNumber,"
val s2="rank() over(partition by gender order by age) as ranks,"
val s3="dense_rank() over(partition by gender order by age) as denseRank,"
val s4="percent_rank() over(partition by gender order by age) as percentRank"
val df8=spark.sql("select gender,age,"+s1+s2+s3+s4+" from Affairs")
df8.show(50)
+------+----+---------+-----+---------+------------------+
|gender| age|rowNumber|ranks|denseRank| percentRank|
+------+----+---------+-----+---------+------------------+
|female|22.0| 1| 1| 1| 0.0|
|female|22.0| 2| 1| 1| 0.0|
|female|22.0| 3| 1| 1| 0.0|
|female|22.0| 4| 1| 1| 0.0|
|female|22.0| 5| 1| 1| 0.0|
|female|22.0| 6| 1| 1| 0.0|
|female|27.0| 7| 7| 2| 0.4|
|female|27.0| 8| 7| 2| 0.4|
|female|27.0| 9| 7| 2| 0.4|
|female|27.0| 10| 7| 2| 0.4|
|female|32.0| 11| 11| 3|0.6666666666666666|
|female|32.0| 12| 11| 3|0.6666666666666666|
|female|32.0| 13| 11| 3|0.6666666666666666|
|female|32.0| 14| 11| 3|0.6666666666666666|
|female|37.0| 15| 15| 4|0.9333333333333333|
|female|37.0| 16| 15| 4|0.9333333333333333|
| male|22.0| 1| 1| 1| 0.0|
| male|22.0| 2| 1| 1| 0.0|
| male|27.0| 3| 3| 2| 0.25|
| male|37.0| 4| 4| 3| 0.375|
| male|37.0| 5| 4| 3| 0.375|
| male|37.0| 6| 4| 3| 0.375|
| male|47.0| 7| 7| 4| 0.75|
| male|57.0| 8| 8| 5| 0.875|
| male|57.0| 9| 8| 5| 0.875|
+------+----+---------+-----+---------+------------------+
val dfList = List(("Hadoop", "Java,SQL,Hive,HBase,MySQL"), ("Spark", "Scala,SQL,DataSet,MLlib,GraphX"))
dfList: List[(String, String)] = List((Hadoop,Java,SQL,Hive,HBase,MySQL), (Spark,Scala,SQL,DataSet,MLlib,GraphX))
case class Book(title: String, words: String)
val df=dfList.map{p=>Book(p._1,p._2)}.toDS()
df: org.apache.spark.sql.Dataset[Book] = [title: string, words: string]
df.show
+------+--------------------+
| title| words|
+------+--------------------+
|Hadoop|Java,SQL,Hive,HBa...|
| Spark|Scala,SQL,DataSet...|
+------+--------------------+
df.flatMap(_.words.split(",")).show
+-------+
| value|
+-------+
| Java|
| SQL|
| Hive|
| HBase|
| MySQL|
| Scala|
| SQL|
|DataSet|
| MLlib|
| GraphX|
+-------+
手机扫一扫
移动阅读更方便
你可能感兴趣的文章