在spark中使用聚类算法

笔者是个hadoop和spark的小白,之前在机器学习方面主要是用python的scikit-learn和pandas库,最近因为工作需要,需要在一个存储在hadoop上的约1000万用户数的数据集做一个聚类模型。而公司服务器上已知的机器学习工具只有spark的mllib,而且还是1.5.1版本。
嗯。。。总不能因为自己一个人的缘故强行要求运维添加各种各样的机器学习框架,只要硬着头皮尝试使用spark。

为了防止在建模过程中走弯路,先抽取了大概50万样本量的数据集在本地机上使用scikit-learn做了算法的测试和调参,确定没有问题以后在按照同样的流程和参数设置在spark中对全量用户做同样处理。

基本流程如下:

  • 提取数据
  • 数据预处理(标准化)
  • 主成分分析
  • 聚类
  • 导出数据到hadoop中

导入函数库

1
2
3
4
5
6
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.mllib.feature.StandardScaler
import org.apache.spark.mllib.feature.PCA
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.clustering.GaussianMixture

提取数据

在提取数据过程中,笔者本来打算使用spark.sql.hive.HiveContext,但是读取出来的数据是sql.DataFrame,并且在数据转换过程中不知道是因为版本过老还是其他什么原因,经常性的在spark-shell中出现Task not serializable的bug,虽然笔者猜测可能的原因是因为在分布式系统中,有部分变量包括model并没有serializable,导致运行出现bug,但是并没能顺利找到解决办法,在stackoverflow里面求助,得到的解答也是偏原理性的,为了不耽误项目进度,只能放弃这个方案。

在之后的建模过程中,笔者便完全遵照教程的指导,使用sc.textFile来从hadoop文件系统中直接读取数据集,因为使用sc.textFile读取的文件本身就是RDD格式,可以在在MLlib中直接运行,因此,之前的坑便算是绕过去了。

代码如下:

1
2
val raw_data = sc.textFile("path_to_table_on_hadoop/database_name.db/table_name")
val data = raw_data.map(s=>Vectors.dense(s.trim.split(',').slice(1,19).map(_.toDouble))).cache()

上方代码中,第一行是在指定的数据库中读取相应的数据表,第二行是因为第一行读取的结果是一个由string构成的RDD数据,因此需要将其转化为double变量以方便进一步的数据处理。其中,使用split(',')是因为笔者在创建数据表时使用的分隔符就是,slice(1, 19)是指截取从第二个到第十九个变量构成一个描述样本的向量,即数据集的维度为18。

数据预处理

因为异常值和缺失值已经在Hive建表的过程中清洗掉了,剩下的基本上都是比较正常的数据,因此异常值检测和缺失值填补在这一步就直接略过。直接进行标准化,进行标准化的目的是为了数据集各个维度的数量级一致,加快模型的收敛过程。

此处采用的是standard scaling,使用标准差和平均值进行标准化,在spark中的API接口是StandardScaler。代码如下:

1
2
val scaler= new StandardScaler(withMean=true, withStd=true).fit(data)
val data_scaled = data.map(x=>scaler.transform(Vectors.dense(x.toArray)))

里面的参数和相关定义,请参考API文档 (请注意这是1.5.1版本的api文档,请确认文档版本是否与自己的spark版本一致)。

主成分分析(PCA)

主成分分析的目的通常来说是为了降维,但是,此处特征维度本身并不高,因此,采用PCA的目的只要是通过坐标系映射,避免部分噪音特征的影响。
代码如下:

1
2
val pca = new PCA(1).fit(data_scaled)
val projected = data_scaled.map(row => pca.transform(row))

此处由于第一主成分的贡献度接近60%,其余主成分的贡献度基本上都在10%左右甚至一下,因此只选择了第一主成分来进行聚类。

聚类

此处主要尝试两种不同的聚类方法(mllib中的聚类方法真的是屈指可数。。。)

  • KMeans
  • 高斯混合模型(GMM)

KMeans

代码如下:

1
2
3
4
5
6
7
8
9
10
// RUN KMEANS
val projected_cache = projected.cache()
val clusters = new KMeans().setMaxIterations(300).run(projected_cache)

// EVALUATE KMEANS
val WSSSE = clusters.computeCost(projected_cache)
println("Within Set Sum of Squared Errors = " + WSSSE)

// PREDICT LABELS
val predicted_labels = clusters.predict(projected_cache)

KMeans中提供了一个评价聚类效果的函数computeCost,这个函数可以计算每个样本点与离自己最近的聚类中心点之间的距离平方和(sum of squared distance of points to their nearest center),也就是说,样本集的大小也会影响评价结果。

GMM

代码如下:

1
2
3
4
5
6
7
8
9
// RUN GMM
val projected_cache = projected.cache()
val gmm = new GaussianMixture().setK(2).run(projected_cache)

// PREDICT LABELS
val predicted_labels = gmm.predict(projected_cache)
val predicted_soft_labels = gmm.predictSoft(projected_cache)

val predicted_final_labels = predicted_soft_labels.map(x=>if (x(0)/x(1)>=121) 0 else 1)

相比较于Kmeans,GMM提供了两种预测函数preictpredictSoft。这两者的区别在于,前者是根据样本在不同类别中的概率大小直接给出样本的聚类标签,而后者则提供了样本在每一个类别中的具体概率值,使用者可以更具需要进一步调整。这里因为笔者所进行的二分类聚类是一个理论上样本分布极不均衡的数据集,因此,通过调整两类标签的概率比值适当提高了0类标签的判别阈值,减少0类标签可能出现的数量,以尽可能符合真实的样本分布。

导出数据

在完成聚类以后,不管是使用Kmeans还是GMM,我们都需要将自己的预测标签结果与样本编号例如id等等组合到一起,然后存入到hadoop中。代码如下:

1
2
val finalResult = raw_data zip predicted_labels map{case(a, b) => a.trim.split(',')(0) + "," + b.toString}
finalResult.saveAsTextFile("hdfs://path_to_table_on_hadoop/database_name.db/export_table_name.txt")

第一行是利用zip函数和map函数将id和标签组合到一起构成一个string的RDD数据,第二行是讲结果导入到hadoop文件系统中。

如果有需要,还可以进一步将结果导入到Hive表中,此处不再赘述。