在Spark MLlib中可以做二次训练的模型
今天给大家带来我整理的Spark 3.0.1 MLlib库中可以做二次训练的模型总结,首先给大家介绍一下什么是二次训练:这词是我自己想的,因为我不知道有哪些确切的表达方式,所谓二次训练就是将模型的参数,或者整个模型保存起来,然后通过new的方式新建训练类,通过训练类和上次训练出来的模型参数做第二次、第三次训练。接下来我将对Spark官方网站基于RDD的所有MLlib算法都做一遍测试,大家可以跟着目录来看。
在Classification and Regression - RDD-based API和Clustering - RDD-based API中我找到了mllib支持的所有分类、回归、聚类的模型,不是很多,所以我打算一个个尝试。最终整理出表格如下:
类型 | 模型 | 是否支持 |
二分类 | SVMWithSGD | 支持 |
二分类 | LogisticRegressionWithLBFGS | 支持 |
二分类 | DecisionTree | 不支持 |
二分类 | RandomForest | 不支持 |
二分类 | GradientBoostedTrees | 不支持 |
二分类 | NaiveBayes | 不支持 |
多分类 | LogisticRegressionWithLBFGS | 支持 |
多分类 | DecisionTree | 不支持 |
多分类 | RandomForest | 不支持 |
多分类 | NaiveBayes | 不支持 |
回归 | DecisionTree | 不支持 |
回归 | RandomForest | 不支持 |
回归 | GradientBoostedTrees | 不支持 |
聚类 | KMeans | 支持 |
聚类 | GaussianMixture | 支持 |
聚类 | PowerIterationClustering | 未知 |
聚类 | LDA | 不支持 |
2.1 二分类
MLlib中可以做二分类的模型有:linear SVMs, logistic regression, decision trees, random forests, gradient-boosted trees, naive Bayes 。
2.1.1 SVMWithSGD - 二分类
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SVMWithSGDExample").setMaster("local[*]")
val sc = new SparkContext(conf)
// 加载数据
val filePath = "D:\My_IDE\IDEA2019\WorkSpace\Spark-Kafka\8_Spark_MLlib_Continue_Train\src\main\dataSet\sample_libsvm_data.txt"
val data: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, filePath)
// 切分
val Array(train, test) = data.randomSplit(Array(0.7, 0.3))
// 抽出特征、label
val testFeatures: RDD[linalg.Vector] = test.map(lp => lp.features)
val testLabel: RDD[Double] = test.map(lp => lp.label)
val numIteration = 1
var svmModel: SVMModel = null
var weights: linalg.Vector = null
val epochs = 1 to 4
for (epoch <- epochs) {
if (1.equals(epoch)) {
svmModel = SVMWithSGD.train(train, numIteration)
weights = svmModel.weights
} else {
svmModel = SVMWithSGD.train(train, numIteration, 0.05, 0.0, 0.5, svmModel.weights)
// 预测
val testPredict: RDD[Double] = svmModel.predict(testFeatures)
// 缝合
val labelAndPredict: RDD[(Double, Double)] = testLabel.zip(testPredict)
// 参数
val metrics: BinaryClassificationMetrics = new BinaryClassificationMetrics(labelAndPredict)
val roc: Array[(Double, Double)] = metrics.roc().collect()
val pr: Array[(Double, Double)] = metrics.pr().collect()
println(s"epoch : $epoch , roc: false positive rate , true positive rate ")
println(s"epoch : $epoch , pr: precision , recall ")
println(s"epoch : $epoch , areaUnderPR:${metrics.areaUnderPR()} , areaUnderROC:${metrics.areaUnderROC()} ")
epoch : 1 , roc: false positive rate , true positive rate
epoch : 1 , pr: precision , recall
epoch : 1 , areaUnderPR:1.0 , areaUnderROC:1.0
epoch : 2 , roc: false positive rate , true positive rate
epoch : 2 , pr: precision , recall
epoch : 2 , areaUnderPR:1.0 , areaUnderROC:1.0
epoch : 3 , roc: false positive rate , true positive rate
epoch : 3 , pr: precision , recall
epoch : 3 , areaUnderPR:1.0 , areaUnderROC:1.0
epoch : 4 , roc: false positive rate , true positive rate
epoch : 4 , pr: precision , recall
epoch : 4 , areaUnderPR:1.0 , areaUnderROC:1.0
2.1.2 LogisticRegressionWithLBFGS - 二分类
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("LogisticRegressionWithLBFGSExample").setMaster("local[*]")
val sc = new SparkContext(conf)
// 加载数据
val filePath = "D:\My_IDE\IDEA2019\WorkSpace\Spark-Kafka\8_Spark_MLlib_Continue_Train\src\main\dataSet\sample_libsvm_data.txt"
val data = MLUtils.loadLibSVMFile(sc, filePath)
val Array(train, test) = data.randomSplit(Array(0.7, 0.3))
// 抽出features和label
val testFeatures: RDD[linalg.Vector] = test.map(lp => lp.features)
val testLabel: RDD[Double] = test.map(lp => lp.label)
val epochs = 1 to 4
var model: LogisticRegressionModel = null
for (epoch <- epochs) {
if (1.equals(epoch)) {
model = new LogisticRegressionWithLBFGS().run(train)
} else {
model = new LogisticRegressionWithLBFGS().run(train, model.weights)
// 预测
val predictLabel: RDD[Double] = model.predict(testFeatures)
val labelAndPredict: RDD[(Double, Double)] = testLabel.zip(predictLabel)
val metrics: BinaryClassificationMetrics = new BinaryClassificationMetrics(labelAndPredict)
val roc: Array[(Double, Double)] = metrics.roc().collect()
val pr: Array[(Double, Double)] = metrics.pr().collect()
println(s"epoch : $epoch , roc: (false positive rate , true positive rate)")
println(s"epoch : $epoch , pr: (precision , recall)")
println(s"epoch : $epoch , areaUnderROC : ${metrics.areaUnderROC()}")
println(s"epoch : $epoch , areaUnderPR : ${metrics.areaUnderPR()}")
epoch : 1 , roc: (false positive rate , true positive rate)
epoch : 1 , pr: (precision , recall)
epoch : 1 , areaUnderROC : 0.9160839160839163
epoch : 1 , areaUnderPR : 0.9376623376623375
epoch : 2 , roc: (false positive rate , true positive rate)
epoch : 2 , pr: (precision , recall)
epoch : 2 , areaUnderROC : 0.9160839160839163
epoch : 2 , areaUnderPR : 0.9376623376623375
epoch : 3 , roc: (false positive rate , true positive rate)
epoch : 3 , pr: (precision , recall)
epoch : 3 , areaUnderROC : 0.9160839160839163
epoch : 3 , areaUnderPR : 0.9376623376623375
epoch : 4 , roc: (false positive rate , true positive rate)
epoch : 4 , pr: (precision , recall)
epoch : 4 , areaUnderROC : 0.9160839160839163
epoch : 4 , areaUnderPR : 0.9376623376623375
2.1.3 DecisionTree - 二分类
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DecisionTreeClassificationExample").setMaster("local[*]")
val sc = new SparkContext(conf)
// 数据
val filePath = "D:\My_IDE\IDEA2019\WorkSpace\Spark-Kafka\8_Spark_MLlib_Continue_Train\src\main\dataSet\sample_libsvm_data.txt"
val data = MLUtils.loadLibSVMFile(sc, filePath)
val Array(train, test) = data.randomSplit(Array(0.7, 0.3))
// 参数
val numClasses = 2 // 分类
val categoricalFeaturesInfo = Map[Int, Int]() //需要分类的特征如 "sex" -> 2,表示sex分2类
val impurity = "gini" // 杂质
val maxDepth = 5 // 深度
val maxBins = 32 // 广度
// 训练
val model = DecisionTree.trainClassifier(train, numClasses, categoricalFeaturesInfo,
impurity, maxDepth, maxBins)
// 预测
val labelAndPrecit: RDD[(Double, Double)] = test.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
// 评估
val metrics = new BinaryClassificationMetrics(labelAndPrecit)
- 1、拿到模型参数
- 2、通过参数构建负责训练的类
- 3、通过类继续训练
在Spark 3.0.1 mllib中的算法设计中都做了分层,例如KMeans类负责训练,KMeansModel则是被训练的模型。在决策树也是一样的,DecisionTree负责训练,DecisionTreeModel负责被训练,而重新构造的话要么通过参数构造Model,通过Model去构造DecisionTree,要么直接通过参数构造DecisionTree。
class DecisionTree private[spark] (private val strategy: Strategy, private val seed: Int)
extends Serializable with Logging {
def this(strategy: Strategy) = this(strategy, seed = 0)
def run(input: RDD[LabeledPoint]): DecisionTreeModel =
object DecisionTree extends Serializable with Logging {
def train(input: RDD[LabeledPoint], strategy: Strategy): DecisionTreeModel =
def train(
input: RDD[LabeledPoint],
algo: Algo,
impurity: Impurity,
maxDepth: Int): DecisionTreeModel =
def train(
input: RDD[LabeledPoint],
algo: Algo,
impurity: Impurity,
maxDepth: Int,
numClasses: Int): DecisionTreeModel =
def train(
input: RDD[LabeledPoint],
algo: Algo,
impurity: Impurity,
maxDepth: Int,
numClasses: Int,
maxBins: Int,
quantileCalculationStrategy: QuantileStrategy,
categoricalFeaturesInfo: Map[Int, Int]): DecisionTreeModel =
def trainClassifier(
input: RDD[LabeledPoint],
numClasses: Int,
categoricalFeaturesInfo: Map[Int, Int],
impurity: String,
maxDepth: Int,
maxBins: Int): DecisionTreeModel =
def trainClassifier(
input: JavaRDD[LabeledPoint],
numClasses: Int,
categoricalFeaturesInfo: java.util.Map[java.lang.Integer, java.lang.Integer],
impurity: String,
maxDepth: Int,
maxBins: Int): DecisionTreeModel =
def trainRegressor(
input: RDD[LabeledPoint],
categoricalFeaturesInfo: Map[Int, Int],
impurity: String,
maxDepth: Int,
maxBins: Int): DecisionTreeModel =
def trainRegressor(
input: JavaRDD[LabeledPoint],
categoricalFeaturesInfo: java.util.Map[java.lang.Integer, java.lang.Integer],
impurity: String,
maxDepth: Int,
maxBins: Int): DecisionTreeModel =
2.1.4 RandomForest - 二分类
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RandomForestClassificationExample").setMaster("local[*]")
val sc = new SparkContext(conf)
// 数据
val filePath = "D:\My_IDE\IDEA2019\WorkSpace\Spark-Kafka\8_Spark_MLlib_Continue_Train\src\main\dataSet\sample_libsvm_data.txt"
val data = MLUtils.loadLibSVMFile(sc, filePath)
val Array(train, test) = data.randomSplit(Array(0.7, 0.3))
val testFeatures: RDD[linalg.Vector] = test.map(lp => lp.features)
val testLabel: RDD[Double] = test.map(lp => lp.label)
// 参数
val numClasses = 2 // 分类
val categoricalFeaturesInfo = Map[Int, Int]() // 需要分类的特征
val numTrees = 3 // 树数目
val featureSubsetStrategy = "auto" // 划分子集的策略
val impurity = "gini" // 杂质
val maxDepth = 4 // 树深
val maxBins = 32 // 宽度
// 模型
val randomForestModel: RandomForestModel = RandomForest.trainClassifier(train, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
// 预测
val predict: RDD[Double] = randomForestModel.predict(testFeatures)
val labelAndPredict: RDD[(Double, Double)] = testLabel.zip(predict)
// 评估
val metrics = new BinaryClassificationMetrics(labelAndPredict)
private class RandomForest (
private val strategy: Strategy,
private val numTrees: Int,
featureSubsetStrategy: String,
private val seed: Int)
extends Serializable with Logging {
def run(input: RDD[LabeledPoint]): RandomForestModel =
object RandomForest extends Serializable with Logging {
def trainClassifier(
input: RDD[LabeledPoint],
strategy: Strategy,
numTrees: Int,
featureSubsetStrategy: String,
seed: Int): RandomForestModel =
def trainClassifier(
input: RDD[LabeledPoint],
numClasses: Int,
categoricalFeaturesInfo: Map[Int, Int],
numTrees: Int,
featureSubsetStrategy: String,
impurity: String,
maxDepth: Int,
maxBins: Int,
seed: Int = Utils.random.nextInt()): RandomForestModel =
def trainClassifier(
input: JavaRDD[LabeledPoint],
numClasses: Int,
categoricalFeaturesInfo: java.util.Map[java.lang.Integer, java.lang.Integer],
numTrees: Int,
featureSubsetStrategy: String,
impurity: String,
maxDepth: Int,
maxBins: Int,
seed: Int): RandomForestModel =
def trainRegressor(
input: RDD[LabeledPoint],
strategy: Strategy,
numTrees: Int,
featureSubsetStrategy: String,
seed: Int): RandomForestModel =
def trainRegressor(
input: RDD[LabeledPoint],
categoricalFeaturesInfo: Map[Int, Int],
numTrees: Int,
featureSubsetStrategy: String,
impurity: String,
maxDepth: Int,
maxBins: Int,
seed: Int = Utils.random.nextInt()): RandomForestModel =
def trainRegressor(
input: JavaRDD[LabeledPoint],
categoricalFeaturesInfo: java.util.Map[java.lang.Integer, java.lang.Integer],
numTrees: Int,
featureSubsetStrategy: String,
impurity: String,
maxDepth: Int,
maxBins: Int,
seed: Int): RandomForestModel =
2.1.5 GradientBoostedTrees - 二分类
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("GradientBoostedTreesClassificationExample").setMaster("local[*]")
val sc = new SparkContext(conf)
// 数据
val filePath = "D:\My_IDE\IDEA2019\WorkSpace\Spark-Kafka\8_Spark_MLlib_Continue_Train\src\main\dataSet\sample_libsvm_data.txt"
val data: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, filePath)
val Array(train, test) = data.randomSplit(Array(0.7, 0.3))
val testFeatures: RDD[linalg.Vector] = test.map(lp => lp.features)
val testLabel: RDD[Double] = test.map(lp => lp.label)
// 参数
val boostingStrategy = BoostingStrategy.defaultParams("Classification")
boostingStrategy.numIterations = 3 // 迭代次数
boostingStrategy.treeStrategy.numClasses = 2 // 分类
boostingStrategy.treeStrategy.maxDepth = 5 // 树深
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]() // 需要分类的特征
// 模型
val gbtModel: GradientBoostedTreesModel = GradientBoostedTrees.train(train, boostingStrategy)
// 预测
val predict: RDD[Double] = gbtModel.predict(testFeatures)
val labelAndPredict: RDD[(Double, Double)] = testLabel.zip(predict)
// 评估
val metrics = new BinaryClassificationMetrics(labelAndPredict)
println(metrics.areaUnderPR()) //0.9857397504456328
println(metrics.areaUnderROC()) // 0.9705882352941176
object GradientBoostedTrees extends Logging {
def train(
input: RDD[LabeledPoint],
boostingStrategy: BoostingStrategy): GradientBoostedTreesModel =
def train(
input: JavaRDD[LabeledPoint],
boostingStrategy: BoostingStrategy): GradientBoostedTreesModel =
class GradientBoostedTrees private[spark] (
private val boostingStrategy: BoostingStrategy,
private val seed: Int)
extends Serializable with Logging {
def this(boostingStrategy: BoostingStrategy) = this(boostingStrategy, seed = 0)
def run(input: RDD[LabeledPoint]): GradientBoostedTreesModel =
def run(input: JavaRDD[LabeledPoint]): GradientBoostedTreesModel =
def runWithValidation(
input: RDD[LabeledPoint],
validationInput: RDD[LabeledPoint]): GradientBoostedTreesModel =
def runWithValidation(
input: JavaRDD[LabeledPoint],
validationInput: JavaRDD[LabeledPoint]): GradientBoostedTreesModel =
2.1.6 NaiveBayes - 二分类
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("NaiveBayesExample").setMaster("local[*]")
val sc = new SparkContext(conf)
// 数据
val filePath = "D:\My_IDE\IDEA2019\WorkSpace\Spark-Kafka\8_Spark_MLlib_Continue_Train\src\main\dataSet\sample_libsvm_data.txt"
val data = MLUtils.loadLibSVMFile(sc, filePath)
val Array(train, test) = data.randomSplit(Array(0.7, 0.3))
// 抽出features和label
val testFeatures: RDD[linalg.Vector] = test.map(lp => lp.features)
val testLabel: RDD[Double] = test.map(lp => lp.label)
// 模型
val nbModel = NaiveBayes.train(train, lambda = 1.0, modelType = "multinomial")
// 预测
val predict: RDD[Double] = nbModel.predict(testFeatures)
val labelAndPredict: RDD[(Double, Double)] = testLabel.zip(predict)
// 评估
val metrics = new BinaryClassificationMetrics(labelAndPredict)
println(metrics.areaUnderROC()) // 1.0
println(metrics.areaUnderPR()) // 1.0
2.2 多分类
类型 | 模型 | 是否支持 |
多分类 | LogisticRegressionWithLBFGS | 支持 |
多分类 | DecisionTree | 不支持 |
多分类 | RandomForest | 不支持 |
多分类 | NaiveBayes | 不支持 |
2.3 回归
Spark mllib中的回归算法有: linear least squares, Lasso, ridge regression, decision trees, random forests, gradient-boosted trees, isotonic regression。接下来对所有算法一个个试是否支持二次训练。
关于前三个回归算法的介绍在Linear least squares, Lasso, and ridge regression,但是给出的案例是streaming的。
在Spark 3.0.x中,上述的回归模型 linear least squares, Lasso, ridge regression三个是没办法使用的(在网上有关于LinearRegressionWithSGD训练的博客,应该使用的是老的版本),因为其内部设计改动,只能提供给Streaming使用,例如LinearRegressionWithSGD:
在Spark提供的官方API介绍中,这三个模型与我所使用的Spark 3.0.1源码不一致,在官方介绍中模型有run方法,并且可以接受 initialWeights,这显然是可以做二次训练的。在我将Spark版本切换至3.1.1(最新版本,同时与介绍中版本一致)时,这三个模型的源码并没有变化,应该是Spark官方没有更新。
2.3.1 DecisionTree - 回归
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DecisionTreeClassificationExample").setMaster("local[*]")
val sc = new SparkContext(conf)
// 数据
val filePath = "D:\My_IDE\IDEA2019\WorkSpace\Spark-Kafka\8_Spark_MLlib_Continue_Train\src\main\dataSet\sample_libsvm_data.txt"
val data = MLUtils.loadLibSVMFile(sc, filePath)
val Array(train, test) = data.randomSplit(Array(0.7, 0.3))
val testFeatures: RDD[linalg.Vector] = test.map(lp => lp.features)
val testLabel: RDD[Double] = test.map(lp => lp.label)
// 参数
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "variance"
val maxDepth = 5
val maxBins = 32
// 模型
val model = DecisionTree.trainRegressor(train, categoricalFeaturesInfo, impurity,
maxDepth, maxBins)
// 预测
val predict: RDD[Double] = model.predict(testFeatures)
val labelAndPredict: RDD[(Double, Double)] = testLabel.zip(predict)
// 评估
val metrics: RegressionMetrics = new RegressionMetrics(labelAndPredict)
println(metrics.meanAbsoluteError) // MAE 0.09375
println(metrics.meanSquaredError) // MSE 0.09374999999999999
println(metrics.r2) // R2 0.623529411764706
println(metrics.rootMeanSquaredError) // RMSE 0.30618621784789724
2.3.2 RandomForest - 回归
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DecisionTreeClassificationExample").setMaster("local[*]")
val sc = new SparkContext(conf)
// 数据
val filePath = "D:\My_IDE\IDEA2019\WorkSpace\Spark-Kafka\8_Spark_MLlib_Continue_Train\src\main\dataSet\sample_libsvm_data.txt"
val data = MLUtils.loadLibSVMFile(sc, filePath)
val Array(train, test) = data.randomSplit(Array(0.7, 0.3))
val testFeatures: RDD[linalg.Vector] = test.map(lp => lp.features)
val testLabel: RDD[Double] = test.map(lp => lp.label)
// 参数
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 3
val featureSubsetStrategy = "auto"
val impurity = "variance"
val maxDepth = 4
val maxBins = 32
// 模型
val model = RandomForest.trainRegressor(train, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
// 预测
val predict: RDD[Double] = model.predict(testFeatures)
val labelAndPredict: RDD[(Double, Double)] = testLabel.zip(predict)
// 评估
val metrics = new RegressionMetrics(labelAndPredict)
println(metrics.rootMeanSquaredError) // RMSE 0.12782749814122843
println(metrics.meanSquaredError) // MSE 0.016339869281045756
println(metrics.meanAbsoluteError) // MAE 0.029411764705882353
println(metrics.r2) // R2 0.9241071428571428
2.3.3 GradientBoostedTrees - 回归
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DecisionTreeClassificationExample").setMaster("local[*]")
val sc = new SparkContext(conf)
// 数据
val filePath = "D:\My_IDE\IDEA2019\WorkSpace\Spark-Kafka\8_Spark_MLlib_Continue_Train\src\main\dataSet\sample_libsvm_data.txt"
val data = MLUtils.loadLibSVMFile(sc, filePath)
val Array(train, test) = data.randomSplit(Array(0.7, 0.3))
val testFeatures: RDD[linalg.Vector] = test.map(lp => lp.features)
val testLabel: RDD[Double] = test.map(lp => lp.label)
// 参数
val boostingStrategy = BoostingStrategy.defaultParams("Regression")
boostingStrategy.numIterations = 3
boostingStrategy.treeStrategy.maxDepth = 5
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
// 模型
val model: GradientBoostedTreesModel = GradientBoostedTrees.train(train, boostingStrategy)
// 预测
val predict: RDD[Double] = model.predict(testFeatures)
val labelAndPredict: RDD[(Double, Double)] = testLabel.zip(predict)
// 评估
val metrics = new RegressionMetrics(labelAndPredict)
println(metrics.rootMeanSquaredError) // RMSE 0.1889822365046136
println(metrics.meanSquaredError) // MSE 0.03571428571428571
println(metrics.meanAbsoluteError) // MAE 0.03571428571428571
println(metrics.r2) // R2 0.8362573099415205
2.4 聚类
在Spark官网关于聚类的算法有介绍: K-means 、 Gaussian mixture 、 Power iteration clustering (PIC) 、 Latent Dirichlet allocation (LDA) 、 Bisecting k-means 。
2.4.1 KMeans - 聚类
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("repeat train kmeans")
val sc = new SparkContext(conf)
val filePath = "D:\My_IDE\IDEA2019\WorkSpace\Spark-Kafka\7_Spark_mllib_params_get\src\main\dataSet\kmeans_data.txt"
val data: RDD[String] = sc.textFile(filePath)
val parsedData: RDD[linalg.Vector] = data.map(line => Vectors.dense(line.split(" ").map(_.toDouble))).cache()
val numClusters = 2
val numIterations = 20
// 反复训练
val epochs = 1 to 4
var model: KMeansModel = null
var centers: Array[linalg.Vector] = null
for (epoch <- epochs) {
println(s"第 $epoch 次训练")
if (1.equals(epoch)) {
model = KMeans.train(parsedData, numClusters, numIterations)
centers = model.clusterCenters
} else {
// 重新加载
model = new KMeansModel(centers)
val kmeans: KMeans = new KMeans().setInitialModel(model)
// 继续训练
model = kmeans.run(parsedData)
centers = model.clusterCenters
println(s"epoch ${epoch} ,cost = ${model.computeCost(parsedData)}")
第 1 次训练
epoch 1 ,cost = 0.11999999999999958
第 2 次训练
epoch 2 ,cost = 0.11999999999999958
第 3 次训练
epoch 3 ,cost = 0.11999999999999958
第 4 次训练
epoch 4 ,cost = 0.11999999999999958
2.4.2 GaussianMixture - 聚类
GaussianMixture 可以做二次训练,具体的代码如下:
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("GaussianMixtureExample").setMaster("local[*]")
val sc = new SparkContext(conf)
// 数据
val filePath = "D:\My_IDE\IDEA2019\WorkSpace\Spark-Kafka\8_Spark_MLlib_Continue_Train\src\main\dataSet\gmm_data.txt"
val data = sc.textFile(filePath)
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()
// 参数
val numClasses = 2
// 模型
val epochs = 1 to 4
var model: GaussianMixtureModel = null
for (epoch <- epochs) {
if (1.equals(epoch)) {
model = new GaussianMixture().setK(numClasses).run(parsedData)
}else {
val mixture: GaussianMixture = new GaussianMixture().setInitialModel(model)
model = mixture.run(parsedData)
// 预测
val predict: RDD[Int] = model.predict(parsedData)
// Spark未提供cost接口,要自己求
2.4.3 PowerIterationClustering - 聚类
2.4.4 LDA - 聚类
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("KMeansExample")
val sc = new SparkContext(conf)
// 数据
val filePath = "D:\My_IDE\IDEA2019\WorkSpace\Spark-Kafka\8_Spark_MLlib_Continue_Train\src\main\dataSet\kmeans_data.txt"
val data: RDD[String] = sc.textFile(filePath)
val parsedData: RDD[linalg.Vector] = data.map(line => Vectors.dense(line.trim.split(" ").map(_.toDouble)))
// 赋予唯一ID
val corps: RDD[(Long, linalg.Vector)] = parsedData.zipWithIndex().map(_.swap).cache()
// 模型
val numClasses = 3
val model: LDAModel = new LDA().setK(numClasses).run(corps)
val matrix: Matrix = model.topicsMatrix
for (topic <- Range(0, 3)) {
print(s"Topic $topic :")
for (word <- Range(0, model.vocabSize)) {
print(s"${matrix(word, topic)}")
## 3、总结
以上就是酷酷八宝粥为你收集整理的Spark MLlib中支持二次训练的模型算法在Spark MLlib中可以做二次训练的模型的全部内容,希望文章能够帮你解决Spark MLlib中支持二次训练的模型算法在Spark MLlib中可以做二次训练的模型所遇到的程序开发问题。
