MLlib学习——基本统计
阅读原文时间:2023年07月10日阅读:1

  给定一个数据集,数据分析师一般会先观察一下数据集的基本情况,称之为汇总统计或者概要性统计。一般的概要性统计用于概括一系列观测值,包括位置或集中趋势(比如算术平均值、中位数、众数和四分位均值),展型(比如四分位间距、绝对偏差和绝对距离偏差、各阶矩等),统计离差,分布的形状,依赖性等。除此之外,spark.mllib库也提供了一些其他的基本的统计分析工具,包括相关性、分层抽样、假设检验,随机数生成等。在本章,我们将从以下几个方面进行介绍:

  • 概括统计数据 Summary Statistics
  • 相关性 Correlations
  • 分层取样 Stratified Sampling

假设检验 Hypothesis Testing

  • 流式显著性测试

  • 随机数据生成 Random Data Generation

  • 核密度估计 Kernel density estimation

一、数据集

  Iris数据集也称鸢尾花卉数据集,是一类多重变量分析的数据集,是常用的分类实验数据集,由Fisher于1936收集整理。数据集包含150个数据集,分为3类,每类50个数据,每个数据包含4个属性。可通过花萼长度,花萼宽度,花瓣长度,花瓣宽度4个属性预测鸢尾花卉属于(Setosa,Versicolour,Virginica)三个种类中的哪一类。大家可以到这个链接下载该数据集:https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data。  其基本数据的样子是:

//将文件上传到hadoop的根目录下
#hdfs dfs -put data.txt /
var data = sc.textFile("/iris.data")
data.collect

输出:
res9: Array[String] = Array(5.1,3.5,1.4,0.2,Iris-setosa, 4.9,3.0,1.4,0.2,Iris-setosa, 4.7,3.2,1.3,0.2,Iris-setosa, 4.6,3.1,1.5,0.2,Iris-setosa, 5.0,3.6,1.4,0.2,Iris-setosa, 5.4,3.9,1.7,0.4,Iris-setosa, 4.6,3.4,1.4,0.3,Iris-setosa, 5.0,3.4,1.5,0.2,Iris-setosa, 4.4,2.9,1.4,0.2,Iris-setosa, 4.9,3.1,1.5,0.1,Iris-setosa, 5.4,3.7,1.5,0.2,Iris-setosa, 4.8,3.4,1.6,0.2,Iris-setosa, 4.8,3.0,1.4,0.1,Iris-setosa, 4.3,3.0,1.1,0.1,Iris-setosa, 5.8,4.0,1.2,0.2,Iris-setosa, 5.7,4.4,1.5,0.4,Iris-setosa, 5.4,3.9,1.3,0.4,Iris-setosa, 5.1,3.5,1.4,0.3,Iris-setosa, 5.7,3.8,1.7,0.3,Iris-setosa, 5.1,3.8,1.5,0.3,Iris-setosa, 5.4,3.4,1.7,0.2,Iris-setosa, 5.1,3.7…

二、概括统计数据

  对于RDD[Vector]类型的变量,Spark MLlib提供了一种叫colStats()的统计方法,调用该方法会返回一个类型为MultivariateStatisticalSummary的实例。根据这个实例可以获得每一列的最大值,最小值,均值、方差、总数等。具体操作如下所示:

%spark
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
---------------------------------------------------------------------------------------------------------------------------
// 读取要分析的数据,把数据转变成RDD[Vector]类型
// val observations=data.map(_.split(",")).map(p => Vectors.dense(p(0).toDouble, p(1).toDouble, p(2).toDouble, p(3).toDouble))

// 数据对应莺尾花的四个属性,即萼片长度,萼片宽度,花瓣长度和花瓣宽度存储在observations
// 调用colStats()方法,得到一个MultivariateStatisticalSummary类型的变量
注意:这个数据有问题,但使用其它数据可借鉴该这种方式---------------------------------------------------------------------------------------------------------------------------

val observations = sc.parallelize(
Seq(
Vectors.dense(1.0, 10.0, 100.0),
Vectors.dense(2.0, 20.0, 200.0),
Vectors.dense(3.0, 30.0, 300.0)
)
)
val summary: MultivariateStatisticalSummary = Statistics.colStats(observations)
// 统计列的大小 返回值类型:long
println(summary.count)
// 打印每列的平均值--------------------------------以下其它方法返回值类型都是:vector
println(summary.mean)
// 每列的方差
println(summary.variance)
// 每列的最大/最小值
println(summary.max)
println(summary.min)
// 每列L1/L2范数
println(summary.normL1)
println(summary.normL2)
// 每列非0向量个数
println(summary.numNonzeros)

三、相关性

  计算两个数据序列之间的相关性是统计中的常见操作。目前Spark支持两种相关性系数:皮尔逊相关系数(pearson)和斯皮尔曼等级相关系数(spearman)。相关系数是用以反映变量之间相关关系密切程度的统计指标。简单的来说就是相关系数绝对值越大(值越接近1或者-1),当取值为0表示不相关,取值为(0~-1]表示负相关,取值为(0, 1]表示正相关。

根据输入类型的不同,输出的结果也产生相应的变化。如果输入的是两个RDD[Double],则输出的是一个double类型的结果;如果输入的是一个RDD[Vector],则对应的输出的是一个相关系数矩阵:

%spark
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.rdd.RDD

//输入RDD[Double]类型数据
val seriesX: RDD[Double] = sc.parallelize(Array(1, 2, 3, 3, 5))
// 这两列数据要求必须数量相同
val seriesY: RDD[Double] = sc.parallelize(Array(11, 22, 33, 33, 555))

// 计算相关性
// 调用Statistics包中的corr()函数来获取相关性,这里用的是"pearson"(是默认方法),当然根据不同需要也可以选择"spearman"
val correlation: Double = Statistics.corr(seriesX, seriesY, "pearson")
println(s"Correlation is: $correlation")

%spark
val data: RDD[Vector] = sc.parallelize(
Seq(
Vectors.dense(1.0, 10.0, 100.0),
Vectors.dense(2.0, 20.0, 200.0),
Vectors.dense(5.0, 33.0, 366.0))
) // 注意,每个向量都是一行而不是一列

// 计算矩阵的相关性
val correlMatrix: Matrix = Statistics.corr(data, "pearson")
println(correlMatrix.toString)

  相关矩阵也叫相关系数矩阵,是由矩阵各列间的相关系数构成的。相关矩阵第i行第j列的元素是原矩阵第i列和第j列的相关系数。可以看到,输入两个RDD[Double]或一个RDD[Vector],求相关性得到结果是一致的。

四、分层抽样

分层取样(Stratified sampling)顾名思义,就是将数据根据不同的特征分成不同的组,然后按特定条件从不同的组中获取样本,并重新组成新的数组。分层取样算法是直接集成到键值对类型 RDD[(K, V)] 的 sampleByKey 和 sampleByKeyExact 方法,无需通过额外的 spark.mllib 库来支持。

  对于分层采样,可以将键视为标签,将值视为特定属性。例如,键可以是男性或女性,也可以是文档 ID,并且相应的值可以是人口中人员的年龄列表或文档中的单词列表。该方法将掷硬币以决定是否对观测值进行采样,因此需要一次传递数据,并提供预期的样本大小。

官网案例:

// an RDD[(K, V)] of any key value pairs
val data = sc.parallelize(
Seq((1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')))

// specify the exact fraction desired from each key
val fractions = Map(1 -> 0.1, 2 -> 0.6, 3 -> 0.3)

// Get an approximate sample from each stratum
val approxSample = data.sampleByKey(withReplacement = false, fractions = fractions)
// Get an exact sample from each stratum
val exactSample = data.sampleByKeyExact(withReplacement = false, fractions = fractions)

  1、SampleByKey方法

  sampleByKey 方法需要作用于一个键值对数组,其中 key 用于分类,value可以是任意数。然后通过 fractions 参数来定义分类条件和采样几率。fractions 参数被定义成一个 Map[K, Double] 类型,Key是键值的分层条件,Double 是该满足条件的 Key 条件的采样比例,1.0 代表 100%。

%spark
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.PairRDDFunctions

val data = sc.makeRDD(Array(("female","Lily"),("female","Lucy"),("female","Emily"),("female","Kate"),("female","Alice"),("male","Tom"),("male","Roy"),("male","David"),("male","Frank"),("male","Jack")))
data.collect
// 通过 fractions 参数来定义分类条件和采样几率
val fractions : Map[String, Double]= Map("female"->0.6,"male"->0.4)
// 设置采取60%的female和40%的male,因为数据中female和male各有5个样本,所以理想中的抽样结果应该是有3个female和2个male
// 接下来用sampleByKey进行抽样:
val approxSample = data.sampleByKey(withReplacement = false, fractions, 1)
approxSample.collect().foreach(println)

输出:
(female,Lily)
(female,Lucy)
(female,Emily)
(female,Kate)
(male,Tom)
(male,Roy)
(male,David)
(male,Frank)
(male,Jack)

//从上面可以看到,本应该抽取3个female和2个male,但结果抽取了4个female和5个male,结果不够准确,不过在样本数量足够大且要求一定效率的情况下,用sampleByKey进行抽样还是可行的。

  2、sampleByKeyExact

  sampleByKey 和 sampleByKeyExact 的区别在于 sampleByKey 每次都通过给定的概率以一种类似于掷硬币的方式来决定这个观察值是否被放入样本,因此一遍就可以过滤完所有数据,最后得到一个近似大小的样本,但往往不够准确。而 sampleByKeyExtra 会对全量数据做采样计算。对于每个类别,其都会产生 (fk⋅nk)个样本,其中fk是键为k的样本类别采样的比例;nk是键k所拥有的样本数。 sampleByKeyExtra 采样的结果会更准确,有99.99%的置信度,但耗费的计算资源也更多。

%spark
// 接上面sampleByKey代码
val exactSample = data.sampleByKeyExact(withReplacement = false, fractions, 1)
exactSample.collect.foreach(println)

输出;
(female,Lily)
(female,Emily)
(female,Kate)
(male,Roy)
(male,David)

// 从实验结果可以看出,所得结果和预想一致,但当样本数量比较大时,可能会耗时较久。
// 其中,sampleByKeyExact抽样方法中所涉及到的参数解释如下:
//   withReplacement:每次抽样是否有放回
//   fractions:控制不同key的抽样率
//   seed:随机数种子

五、假设检验Hypothesis testing

​   假设检验是统计学中一个强大的工具,用于确定结果是否具有统计显著性,无论该结果是否偶然发生。 Spark目前支持皮尔森卡方检测(Pearson’s chi-squared tests),包括“拟合度检验”(Goodness of fit)以及“独立性检验”(independence)。不同的输入类型决定了是做拟合度检验还是独立性检验。拟合度检验要求输入为Vector, 独立性检验要求输入是Matrix。

官方测试案例:

%spark
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.mllib.stat.test.ChiSqTestResult
import org.apache.spark.rdd.RDD

// 数据一:稠密向量
val vec: Vector = Vectors.dense(0.1, 0.15, 0.2, 0.3, 0.25)

// 计算拟合度系数
// 如果没有提供第二个要测试的向量作为参数,则测试将以均匀分布运行。
val goodnessOfFitTestResult = Statistics.chiSqTest(vec)
// 测试总结,包括p值、自由度、测试统计、使用的方法和无效假设。
println(s"$goodnessOfFitTestResult\n")

输出:
vec: org.apache.spark.mllib.linalg.Vector = [0.1,0.15,0.2,0.3,0.25]
goodnessOfFitTestResult: org.apache.spark.mllib.stat.test.ChiSqTestResult =
Chi squared test summary:
method: pearson
degrees of freedom = 4
statistic = 0.12499999999999999
pValue = 0.998126379239318
No presumption against null hypothesis: observed follows the same distribution as expected..

// 接上面代码
// 数据二:稠密矩阵
val mat: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))

// 对输入的稠密矩阵进行独立性测试
val independenceTestResult = Statistics.chiSqTest(mat)
// 试验总结,包括p值、自由度
println(s"$independenceTestResult\n")

输出:
mat: org.apache.spark.mllib.linalg.Matrix =
1.0 2.0
3.0 4.0
5.0 6.0
independenceTestResult: org.apache.spark.mllib.stat.test.ChiSqTestResult =
Chi squared test summary:
method: pearson
degrees of freedom = 2
statistic = 0.14141414141414144
pValue = 0.931734784568187
No presumption against null hypothesis: the occurrence of the outcomes is statistically independent..

// 接上面代码
// spark.mllib还支持输入类型,通过卡方独立性测试实现特征选择。RDD[LabeledPoint]
val obs: RDD[LabeledPoint] =
sc.parallelize(
Seq(
LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)),
LabeledPoint(1.0, Vectors.dense(1.0, 2.0, 0.0)),
LabeledPoint(-1.0, Vectors.dense(-1.0, 0.0, -0.5)
)
)
) // (标签、特征)成对。

// 列联表由原始(标签、特征)对构成,用于进行独立性测试。
// 返回一个数组,其中包含针对标签的每个功能的 卡方测试结果。
val featureTestResults: Array[ChiSqTestResult] = Statistics.chiSqTest(obs)
featureTestResults.zipWithIndex.foreach { case (k, v) =>
println(s"Column ${(v + 1)} :")
println(k)
} // 测试总结

输出:

Column 1 :
Chi squared test summary:
method: pearson
degrees of freedom = 1
statistic = 3.0000000000000004
pValue = 0.08326451666354884
Low presumption against null hypothesis: the occurrence of the outcomes is statistically independent..
Column 2 :
Chi squared test summary:
method: pearson
degrees of freedom = 1
statistic = 0.75
pValue = 0.3864762307712326
No presumption against null hypothesis: the occurrence of the outcomes is statistically independent..
Column 3 :
Chi squared test summary:
method: pearson
degrees of freedom = 2
statistic = 3.0
pValue = 0.22313016014843035
No presumption against null hypothesis: the occurrence of the outcomes is statistically independent..

//也支持把v1作为样本,把v2作为期望值,进行卡方检验:
val c1 = Statistics.chiSqTest(v1, v2)

柯尔莫哥洛夫-斯米尔诺夫检验(KS检验)基于累计分布函数,用以检验两个经验分布是否不同或一个经验分布与另一个理想分布是否不同。

%spark
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.rdd.RDD

// RDD样本数据
val data: RDD[Double] = sc.parallelize(Seq(0.1, 0.15, 0.2, 0.3, 0.25))

// 对样本与标准正态分布进行KS测试
val testResult = Statistics.kolmogorovSmirnovTest(data, "norm", 0, 1)
// 检验总结,包括p值、检验统计量和无效假设如果我们的p值表明显著性,我们可以拒绝无效假设。
println(testResult)
println()

// 使用我们制造的累积分布函数进行KS测试
val myCDF = Map(0.1 -> 0.2, 0.15 -> 0.6, 0.2 -> 0.05, 0.3 -> 0.05, 0.25 -> 0.1)
val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF)
println(testResult2)

输出:
Kolmogorov-Smirnov test summary:
degrees of freedom = 0
statistic = 0.539827837277029
pValue = 0.06821463111921133
Low presumption against null hypothesis: Sample follows theoretical distribution.

Kolmogorov-Smirnov test summary:
degrees of freedom = 0
statistic = 0.95
pValue = 6.249999999763389E-7
Very strong presumption against null hypothesis: Sample follows theoretical distribution.

参数说明:

  method: 方法。

  statistic: 检验统计量是用于假设检验计算的统计量。简单来说就是用来决定是否可以拒绝原假设的证据。检验统计量的值是利用样本数据计算得到的,它代表了样本中的信息。检验统计量的绝对值越大,拒绝原假设的理由越充分,反之,不拒绝原假设的理由越充分。

  degrees of freedom:自由度。表示可自由变动的样本观测值的数目,即独立变量数减1;

  pValue:统计学根据显著性检验方法所得到的P 值。

  在统计学上,做这个检验时,通常会设定一个虚无假设(也叫零假设,Null Hypothesis),通常记作H0。以及一个对立假设(Alternative Hypothesis),如果证明虚无假设错误,则可以推出对立假设成立。pValue就是用来判断H0假设是否成立的依据。因为期望值是基于H0假设得出的,如果观测值与期望值越一致,则说明检验现象与零假设越接近,则越没有理由拒绝零假设。如果观测值与期望值越偏离,说明零假设越站不住脚,则越有理由拒绝零假设,从而推出对立假设的成立。如果P值很小,说明原假设情况的发生的概率很小,而如果出现了,根据小概率原理,我们就有理由拒绝原假设,P值越小,我们拒绝原假设的理由越充分。总之,P值越小,表明结果越显著。

六、随机数生成Random Data Generation

org.apache.spark.mllib.random.RandomRDDs是一个工具集,用来生成含有随机数的RDD,可以按各种给定的分布模式生成数据集,Random RDDs包下现支持正态分布、泊松分布和均匀分布三种分布方式。RandomRDDs提供随机double RDDS或vector RDDS。

​ 下面的例子中生成一个随机double RDD,其值是标准正态分布N(0,1),然后将其映射到N(1,4)。

%spark
import org.apache.spark.SparkContext
import org.apache.spark.mllib.random.RandomRDDs._

// 生成1000000个服从正态分配N(0,1)的RDD[Double],并且分布在 10 个分区中
val u = normalRDD(sc, 10000000L, 10)
// 把生成的随机数转化成N(1,4) 正态分布
val v = u.map(x => 1.0 + 3.0 * x)

七、核密度估计 Kernel Density Estimation

org.apache.spark.mllib.stat.KernelDensity 用于核密度估算,核密度估算的意思是根据已知的样本估计未知的密度,属於非参数检验方法之一。核密度估计的原理是。观察某一事物的已知分布,如果某一个数在观察中出现了,可认为这个数的概率密度很大,和这个数比较近的数的概率密度也会比较大,而那些离这个数远的数的概率密度会比较小。

%spark
import org.apache.spark.mllib.stat.KernelDensity
import org.apache.spark.rdd.RDD

val data: RDD[Double] = sc.parallelize(Seq(1, 1, 1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 9))
// 利用样本数据构建核函数
val kd = new KernelDensity()
.setSample(data)
.setBandwidth(3.0)

// 构造了核密度估计kd,就可以对给定数据数据进行核估计
val densities = kd.estimate(Array(-1.0, 2.0, 5.0))

// 估计的概率密度函数值:
densities: Array[Double] = Array(0.04145944023341912, 0.07902016933085627, 0.08962920127312338)