
2.1 数据准备 2.1.1 数据集文件准备
- 这是一个 二元分类 问题, 也就是预测出来的结果只有两种, 是恶性肿瘤, 或是良性肿瘤。
- 有关 Spark ML 的介绍与知识点请参考: Spark ML学习笔记—Spark MLlib 与 Spark ML。
(1) 该项目并为使用数据库当做数据源,而是直接将数据文件放在项目目录中, 这是一个结构化的简化数据集。
(2) 本项目使用的数据集是: 来自 UCI 机器学习库 的威斯康星乳腺癌数据集。数据集地址: 威斯康星乳腺癌数据集。
| 字段 | 含义 |
|---|---|
| id | 样本代码变安好 |
| thickness | 肿块厚度 |
| size | 细胞大小的均匀性 |
| shape | 细胞形状的均匀性 |
| madh | 边缘粘附力 |
| epsize | 单上皮细胞大小 |
| bnuc | 裸核 |
| bchrom | 染色质的颜色 |
| nNuc | 核仁正常情况 |
| mit | 有丝分裂情况 |
| cancer_class | 分类情况: 2为良性, 4为恶性 |
使用的依赖包多数来自于 Spark ML, 而非 Spark MLlib。
import org.apache.spark.SparkConf
import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression}
import org.apache.spark.ml.evaluation.BinaryClassificationevaluator
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
// 加载并解析数据 (转换后: 0是恶性肿瘤, 1是良性肿瘤)
// 把最后一列的分类情况转换的 2 和 4 转换为 0 和 1, 并放在第一列。
// 转换之后, 第一列当做标签(label), 后面的数据则作为特征存在(feature)
val rdd = spark.sparkContext.textFile("datas3/wbcd.csv")
val cancerRDD = parseRDD(rdd).map{
line => Cancer(if (line(9) == 4.0) 1 else 0, line(0), line(1), line(2), line(3), line(4), line(5), line(6), line(7), line(8))
}
// (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) => (10, 1, 2, 3, 4, 5, 6, 7, 8, 9)
parseRDD() 函数定义如下:
def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
rdd.map(_.split(",")).filter(_(6) != "?").map(_.drop(1)).map(_.map(_.toDouble))
}
// 将字符串转换为 Double 数组, 并去掉第 1 列的 id
// (0,1,2,3,4,5,6,7,8,9,10) => (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
样例类 Cancer 的定义如下:
case class Cancer(cancer_class: Double, thickness: Double, size: Double, shape: Double, madh: Double,
epsize: Double, bnuc: Double, bchrom: Double, nNuc: Double, mit: Double)
import spark.implicits._ val cancerDF = cancerRDD.toDF().cache() cancerDF.show()
转换后的数据帧如下图所示:
val featureCols = Array("thickness", "size", "shape", "madh", "epsize", "bnuc", "bchrom", "nNuc", "mit")
// 定义一个转换器 (VectorAssembler 是一个转换器)
val assembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("features")
// 使用 转换器 将 特征列 为 数据帧 (特征向量 features 在数据帧后面单独添加一列)
val df2 = assembler.transform(cancerDF)
df2.show()
添加了特征向量的数据帧如下图所示 (最右侧一列为特征向量):
// 使用 StringIndexer 为训练集创建 标签 (标签 label 在数据帧后面单独添加一列) (StringIndexer 不是转换器吗? 怎么是评估器)
val labelIndexer = new StringIndexer()
.setInputCol("cancer_class")
.setOutputCol("label")
val df3 = labelIndexer.fit(df2).transform(df2)
df3.show()
添加标签后的 数据帧 如下图所示:
// 创建测试及训练集 (splitSeed 的用处: ???) val splitSeed = 1234567 val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed)
// 为测试集获取原始的 预测结果 及 可能性 (这里输出的都是测试集中的数据吗) val predictions = model.transform(testData) predictions.show()
附带 预测结果 的 数据帧 如下图所示:
val trainingSummary = model.summary val objectiveHistory = trainingSummary.objectiveHistory objectiveHistory.foreach( loss => println(loss) )
注: 损失的输出结果随着迭代次数的增加而逐步降低。
// 我们使用的分类器来自二元逻辑回归算法
val binarySummary = trainingSummary.asInstanceOf[BinaryLogisticRegressionSummary]
// 以数据帧和 areaUnderROC 的方式获取 ROC, 其值越接近1.0, 效果越好
val roc = binarySummary.roc
// roc.show()
println("Area Under ROC: " + binarySummary.areaUnderROC)
然后再计算一些其他的指标,如 真正类比例、假正类比例、假负类比例 以及 总数量 等等:
// 最后, 判断模型的精度。但首先将模型的阈值设置为最大的 fMeasure (F值)
val fMeasure = binarySummary.fMeasureByThreshold
val fm = fMeasure.col("F-Measure")
val maxFMeasure = fMeasure.select(("F-Measure")).head().getDouble(0)
val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure).select("threshold").head().getDouble(0)
model.setThreshold(bestThreshold)
// 然后计算该模型的精度
val evaluator = new BinaryClassificationevaluator().setLabelCol("label")
val accuracy = evaluator.evaluate(predictions)
println("Accuracy: " + accuracy)
...
import org.apache.spark.SparkConf
import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression}
import org.apache.spark.ml.evaluation.BinaryClassificationevaluator
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object SparkML_0104_test4 {
def main(args: Array[String]): Unit = {
// TODO 创建 Spark SQL 的运行环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkML")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 首先, 定义两个后面会用到的函数
def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
rdd.map(_.split(",")).filter(_(6) != "?").map(_.drop(1)).map(_.map(_.toDouble))
} // (0,1,2,3,4,5,6,7,8,9,10) => (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// step 1: 加载并解析数据 (转换后: 0是恶性肿瘤, 1是良性肿瘤)
val rdd = spark.sparkContext.textFile("datas3/wbcd.csv")
val cancerRDD = parseRDD(rdd).map{
line => Cancer(if (line(9) == 4.0) 1 else 0, line(0), line(1), line(2), line(3), line(4), line(5), line(6), line(7), line(8))
} // (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) => (10, 1, 2, 3, 4, 5, 6, 7, 8, 9)
// step 2: 为 ML pipeline 将 RDD 转换为 数据帧
import spark.implicits._
val cancerDF = cancerRDD.toDF().cache()
cancerDF.show()
// step 3: 特征抽取与转换
val featureCols = Array("thickness", "size", "shape", "madh", "epsize", "bnuc", "bchrom", "nNuc", "mit")
// 将它们合并成为一个特征向量 (VectorAssembler 是一个转换器)
val assembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("features")
// 接着将其转换为数据帧 (特征向量 features 在数据帧后面单独添加一列)
val df2 = assembler.transform(cancerDF)
df2.show()
// 使用 StringIndexer 为训练集创建 标签 (标签 label 在数据帧后面单独添加一列) (StringIndexer 不是转换器吗? 怎么是评估器)
val labelIndexer = new StringIndexer()
.setInputCol("cancer_class")
.setOutputCol("label")
val df3 = labelIndexer.fit(df2).transform(df2)
df3.show()
// step 4: 创建测试及训练集 (splitSeed 的用处: ???)
val splitSeed = 1234567
val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed)
// step 5: 使用训练集创建评估器 (使用逻辑回归算法为该 pipeline 创建一个评估器)
val lr = new LogisticRegression()
.setMaxIter(50)
.setRegParam(0.01)
.setElasticNetParam(0.01)
val model = lr.fit(trainingData)
// step 6: 为测试集获取原始的 预测结果 及 可能性 (这里输出的都是测试集中的数据吗)
val predictions = model.transform(testData)
predictions.show()
// step 7: 生成对象的历史训练结果
val trainingSummary = model.summary
val objectiveHistory = trainingSummary.objectiveHistory
objectiveHistory.foreach(
loss => println(loss)
)
// step 8: 评估模型
val binarySummary = trainingSummary.asInstanceOf[BinaryLogisticRegressionSummary]
// 以数据帧和 areaUnderROC 的方式获取 ROC, 其值越接近1.0, 效果越好
val roc = binarySummary.roc
// roc.show()
println("Area Under ROC: " + binarySummary.areaUnderROC)
// 最后, 判断模型的精度。但首先将模型的阈值设置为最大的 fMeasure (F值)
val fMeasure = binarySummary.fMeasureByThreshold
val fm = fMeasure.col("F-Measure")
val maxFMeasure = fMeasure.select(("F-Measure")).head().getDouble(0)
val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure).select("threshold").head().getDouble(0)
model.setThreshold(bestThreshold)
// 然后计算该模型的精度
val evaluator = new BinaryClassificationevaluator().setLabelCol("label")
val accuracy = evaluator.evaluate(predictions)
println("Accuracy: " + accuracy)
spark.close()
}
case class Cancer(cancer_class: Double, thickness: Double, size: Double, shape: Double, madh: Double,
epsize: Double, bnuc: Double, bchrom: Double, nNuc: Double, mit: Double)
}