概述
参考资料:OReilly的《Advanced Analytics with Spark》第五章Anomaly Detection in Network Traffic with K-means clustering
附属代码主要来自:https://github.com/sryza/aas/blob/1st-edition/ch05-kmeans/src/main/scala/com/cloudera/datascience/kmeans/RunKMeans.scala
执行说明:在spark2.0.0 spark-shell上交互式执行通过。
//1. use jupiter to login, download test data
wget http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz
gunzip kddcup.data.gz
hdfs dfs -mkdir ./kmeans/
hdfs dfs -put kddcup.data ./kmeans/
//2. open spark-shell and set right parameter
cd /opt/bfd/spark-2.0.0-bin-bfd
//prevent conflict with original hive conf and other version.
export SPARK_HOME=/opt/bfd/spark-2.0.0-bin-bfd
unset HIVE_HOME
bin/spark-shell
--master=spark://ebg150-apps-01:7077
--driver-memory 5G
--total-executor-cores 10
--executor-memory 6G
--executor-cores 2 2>/dev/null
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
//3. create four functions in spark shell and get more and more accurate parameter using kmeans.
// Clustering, Take 0
def clusteringTake0(rawData: RDD[String]): Unit = {
rawData.map(_.split(',').last).countByValue().toSeq.sortBy(_._2).reverse.foreach(println)
val labelsAndData = rawData.map { line =>
val buffer = line.split(',').toBuffer
buffer.remove(1, 3)
val label = buffer.remove(buffer.length - 1)
val vector = Vectors.dense(buffer.map(_.toDouble).toArray)
(label, vector)
}
val data = labelsAndData.values.cache()
val kmeans = new KMeans()
val model = kmeans.run(data)
model.clusterCenters.foreach(println)
val clusterLabelCount = labelsAndData.map { case (label, datum) =>
val cluster = model.predict(datum)
(cluster, label)
}.countByValue()
clusterLabelCount.toSeq.sorted.foreach { case ((cluster, label), count) =>
println(f"$cluster%1s$label%18s$count%8s")
}
data.unpersist()
}
// Clustering, Take 1
def distance(a: Vector, b: Vector) =
math.sqrt(a.toArray.zip(b.toArray).map(p => p._1 - p._2).map(d => d * d).sum)
def distToCentroid(datum: Vector, model: KMeansModel) = {
val cluster = model.predict(datum)
val centroid = model.clusterCenters(cluster)
distance(centroid, datum)
}
def clusteringScore(data: RDD[Vector], k: Int): Double = {
val kmeans = new KMeans()
kmeans.setK(k)
val model = kmeans.run(data)
data.map(datum => distToCentroid(datum, model)).mean()
}
def clusteringScore2(data: RDD[Vector], k: Int): Double = {
val kmeans = new KMeans()
kmeans.setK(k)
kmeans.setRuns(10)
kmeans.setEpsilon(1.0e-6)
val model = kmeans.run(data)
data.map(datum => distToCentroid(datum, model)).mean()
}
def clusteringTake1(rawData: RDD[String]): Unit = {
val data = rawData.map { line =>
val buffer = line.split(',').toBuffer
buffer.remove(1, 3)
buffer.remove(buffer.length - 1)
Vectors.dense(buffer.map(_.toDouble).toArray)
}.cache()
(5 to 30 by 5).map(k => (k, clusteringScore(data, k))).
foreach(println)
(30 to 100 by 10).par.map(k => (k, clusteringScore2(data, k))).
toList.foreach(println)
data.unpersist()
}
def visualizationInR(rawData: RDD[String]): Unit = {
val data = rawData.map { line =>
val buffer = line.split(',').toBuffer
buffer.remove(1, 3)
buffer.remove(buffer.length - 1)
Vectors.dense(buffer.map(_.toDouble).toArray)
}.cache()
val kmeans = new KMeans()
kmeans.setK(100)
kmeans.setRuns(10)
kmeans.setEpsilon(1.0e-6)
val model = kmeans.run(data)
val sample = data.map(datum =>
model.predict(datum) + "," + datum.toArray.mkString(",")
).sample(false, 0.05)
sample.saveAsTextFile("hdfs:///user/jupiter/kmeans/sample")
data.unpersist()
}
// Clustering, Take 2
def buildNormalizationFunction(data: RDD[Vector]): (Vector => Vector) = {
val dataAsArray = data.map(_.toArray)
val numCols = dataAsArray.first().length
val n = dataAsArray.count()
val sums = dataAsArray.reduce(
(a, b) => a.zip(b).map(t => t._1 + t._2))
val sumSquares = dataAsArray.aggregate(
new Array[Double](numCols)
)(
(a, b) => a.zip(b).map(t => t._1 + t._2 * t._2),
(a, b) => a.zip(b).map(t => t._1 + t._2)
)
val stdevs = sumSquares.zip(sums).map {
case (sumSq, sum) => math.sqrt(n * sumSq - sum * sum) / n
}
val means = sums.map(_ / n)
(datum: Vector) => {
val normalizedArray = (datum.toArray, means, stdevs).zipped.map(
(value, mean, stdev) =>
if (stdev <= 0) (value - mean) else (value - mean) / stdev
)
Vectors.dense(normalizedArray)
}
}
def clusteringTake2(rawData: RDD[String]): Unit = {
val data = rawData.map { line =>
val buffer = line.split(',').toBuffer
buffer.remove(1, 3)
buffer.remove(buffer.length - 1)
Vectors.dense(buffer.map(_.toDouble).toArray)
}
val normalizedData = data.map(buildNormalizationFunction(data)).cache()
(60 to 120 by 10).par.map(k =>
(k, clusteringScore2(normalizedData, k))).toList.foreach(println)
normalizedData.unpersist()
}
// Clustering, Take 3
def buildCategoricalAndLabelFunction(rawData: RDD[String]): (String => (String,Vector)) = {
val splitData = rawData.map(_.split(','))
//use index to express non-numberic type, but the method can't be applied to large set.
val protocols = splitData.map(_(1)).distinct().collect().zipWithIndex.toMap
val services = splitData.map(_(2)).distinct().collect().zipWithIndex.toMap
val tcpStates = splitData.map(_(3)).distinct().collect().zipWithIndex.toMap
(line: String) => {
val buffer = line.split(',').toBuffer
val protocol = buffer.remove(1)
val service = buffer.remove(1)
val tcpState = buffer.remove(1)
val label = buffer.remove(buffer.length - 1)
val vector = buffer.map(_.toDouble)
val newProtocolFeatures = new Array[Double](protocols.size)
newProtocolFeatures(protocols(protocol)) = 1.0
val newServiceFeatures = new Array[Double](services.size)
newServiceFeatures(services(service)) = 1.0
val newTcpStateFeatures = new Array[Double](tcpStates.size)
newTcpStateFeatures(tcpStates(tcpState)) = 1.0
vector.insertAll(1, newTcpStateFeatures)
vector.insertAll(1, newServiceFeatures)
vector.insertAll(1, newProtocolFeatures)
(label, Vectors.dense(vector.toArray))
}
}
def clusteringTake3(rawData: RDD[String]): Unit = {
val parseFunction = buildCategoricalAndLabelFunction(rawData)
val data = rawData.map(parseFunction).values
val normalizedData = data.map(buildNormalizationFunction(data)).cache()
(80 to 160 by 10).map(k =>
(k, clusteringScore2(normalizedData, k))).toList.foreach(println)
normalizedData.unpersist()
}
// Clustering, Take 4
def entropy(counts: Iterable[Int]) = {
val values = counts.filter(_ > 0)
val n: Double = values.sum
values.map { v =>
val p = v / n
-p * math.log(p)
}.sum
}
def clusteringScore3(normalizedLabelsAndData: RDD[(String,Vector)], k: Int) = {
val kmeans = new KMeans()
kmeans.setK(k)
kmeans.setRuns(10)
kmeans.setEpsilon(1.0e-6)
val model = kmeans.run(normalizedLabelsAndData.values)
// Predict cluster for each datum
val labelsAndClusters = normalizedLabelsAndData.mapValues(model.predict)
// Swap keys / values
val clustersAndLabels = labelsAndClusters.map(_.swap)
// Extract collections of labels, per cluster
val labelsInCluster = clustersAndLabels.groupByKey().values
// Count labels in collections
val labelCounts = labelsInCluster.map(_.groupBy(l => l).map(_._2.size))
// Average entropy weighted by cluster size
val n = normalizedLabelsAndData.count()
labelCounts.map(m => m.sum * entropy(m)).sum / n
}
def clusteringTake4(rawData: RDD[String]): Unit = {
val parseFunction = buildCategoricalAndLabelFunction(rawData)
val labelsAndData = rawData.map(parseFunction)
val normalizedLabelsAndData =
labelsAndData.mapValues(buildNormalizationFunction(labelsAndData.values)).cache()
(80 to 160 by 10).map(k =>
(k, clusteringScore3(normalizedLabelsAndData, k))).toList.foreach(println)
normalizedLabelsAndData.unpersist()
}
// Detect anomalies
def buildAnomalyDetector(
data: RDD[Vector],
normalizeFunction: (Vector => Vector)): (Vector => Boolean) = {
val normalizedData = data.map(normalizeFunction)
normalizedData.cache()
val kmeans = new KMeans()
kmeans.setK(150)
kmeans.setRuns(10)
kmeans.setEpsilon(1.0e-6)
val model = kmeans.run(normalizedData)
normalizedData.unpersist()
val distances = normalizedData.map(datum => distToCentroid(datum, model))
val threshold = distances.top(100).last
//take top n most distance in a cluster to sort globally as anomy
(datum: Vector) => distToCentroid(normalizeFunction(datum), model) > threshold
}
def anomalies(rawData: RDD[String]) = {
val parseFunction = buildCategoricalAndLabelFunction(rawData)
val originalAndData = rawData.map(line => (line, parseFunction(line)._2))
val data = originalAndData.values
val normalizeFunction = buildNormalizationFunction(data)
val anomalyDetector = buildAnomalyDetector(data, normalizeFunction)
val anomalies = originalAndData.filter {
case (original, datum) => anomalyDetector(datum)
}.keys
anomalies.take(10).foreach(println)
}
//4. test the funcs
val rawData = sc.textFile("hdfs:///user/xxx/kmeans/kddcup.data")
clusteringTake0(rawData)
clusteringTake1(rawData)
clusteringTake2(rawData)
clusteringTake3(rawData)
clusteringTake4(rawData)
//the previous 4 step aims to select k, the function anomalies apply the correct k to get the final results
anomalies(rawData)
最后
以上就是俭朴月亮为你收集整理的Spark Kmeans网络流量异常侦测Demo的全部内容,希望文章能够帮你解决Spark Kmeans网络流量异常侦测Demo所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复