我是靠谱客的博主 明亮彩虹,最近开发中收集的这篇文章主要介绍利用spark做文本聚类分析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

聚类分析

什么是聚类分析?《数据挖掘导论》是给出了这样的定义:聚类分析仅根据在数据中发现的描述对象及其关系的信息,将数据对象分组。其目标是,组内的对象相互之间是相似的(相关的),而不同组中的对象是不同(不相关的)。组内的相似性(同质性)越大,组间差别越大,聚类就越好。

想像有这样的一个情景:用户每天都会通过搜索引擎去查询他/她所感兴趣的信息,而我们希望能够根据用户的搜索词去细分目标用户群体,从而分析不同用户群体对哪些信息比较感兴趣。这时,聚类分析就是我们常常采用的手段。

高斯混合分布聚类模型

除了常见的基于距离的聚类模型,如k-means聚类,聚类中也有基于概率模型,例如高斯混合分布聚类模型(GMM)。基于概率模型的好处在于,它并没有像k-means那样让每一个数据点只能归属于一个簇当中,而是通过概率来反映每个数据点可能分布到每一个簇的概率值,即属于软聚类。在某些场景中,软聚类能够解释数据点的多元性,好比如人的兴趣点不唯一,用户行为的多样性等等。
高斯混合分布模型主要是利用EM算法做参数估计,关于高斯混合分布聚类模型的详细讲述,我将其放到另一份博客当中:
http://blog.csdn.net/qq_30843221/article/details/54894640

聚类模型的详细过程

1.样本数据
我们模拟用户搜索行为,一组是搜索关于电影内容,而另一组是关于机器学习,具体数据如下:

好看 电影 惊悚 悬疑 不错 推荐
机器学习 自然语言处理 信息 检索
机器学习 数据挖掘 人工智能 检索
电影 动画 精彩 好看 不错 加油 推荐

2.数据的加载,基本的分词(我使用的java版的spark,分词工具为hanlp)

//加载数据
String filename = "/home/quincy1994/test.txt";
JavaRDD<String> sentences = sc.textFile(filename);
JavaRDD<String> segRDD = sentences.map(new Seg());
JavaRDD<Row> jrdd = segRDD.map(new StringtoRow());
segRDD.cache();

//数据转换为矩阵
StructType schema = new StructType(new StructField[]{
    new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
DataFrame sentenceData = sqlContext.createDataFrame(jrdd, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");  //tokenizer以简单的空白分割词语
DataFrame wordsData = tokenizer.transform(sentenceData); // 将句子分割词语

//分词类
    static class Seg implements Function<String, String>{

        public String call(String sentence) throws Exception{
            String segStr = "";
            List<Term> termList = segment.seg(sentence); //分词
            StringBuilder sb = new StringBuilder();
            for(Term term: termList){
                String word = term.word;
                sb.append(word+ " ");
            }
            segStr = sb.toString().trim();
            return segStr;
        }
    }

    //将String的sentence转变为mllib中row数据类型
    static class StringtoRow implements Function<String, Row>{

        public Row call(String sentence) throws Exception {
            return RowFactory.create(sentence);
        }
    }

3.特征选择(我主要采用的tfidf模型,刚开始使用word2vec不太理想,可能数据太稀疏)

//tfidf模型
int numFeatures = 20;  //选定抽取前k个特征
HashingTF hashingTF  = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numFeatures);
DataFrame featurizedData = hashingTF.transform(wordsData);
IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);
DataFrame result = idfModel.transform(featurizedData);

4.数据的归一化处理(之前忘了做这步,也导致数据聚类效果不理想)

//归一化处理
Normalizer normalizer = new Normalizer().setInputCol("features").setOutputCol("normFeatures").setP(1.0);
DataFrame l1NormData = normalizer.transform(result.select("features"));
JavaRDD<Vector> normRDD = l1NormData.rdd().toJavaRDD().map(new RowToVector()); //将row转变成为vector
normRDD.cache();

//将row转变为Vector,机器学习模型基本采用vector类型
    static class RowToVector implements Function<Row, Vector>{

        public Vector call(Row r) throws Exception {
            // TODO Auto-generated method stub
            Vector features = r.getAs(0);    //将row转变成为vector 
            return features;
        }
    }

5.使用高斯混合模型聚类(代码超简单)

static int k = 2; //设定有多少个高斯混合模型
GaussianMixtureModel gmm = new GaussianMixture().setK(k).run(normRDD.rdd());
normRDD.cache();

6.为每个节点标记它归属的簇

//为每个节点标记归属的簇
        RDD<Vector> points = normRDD.rdd();
        JavaRDD<double[]> predictRDD = new JavaRDD(gmm.predictSoft(points), null);
        JavaRDD<Integer> resultRDD = predictRDD.map(new Group());
        resultRDD.cache();
static class Group implements Function<double[], Integer>{

        //我设定归属概率大于0.5的簇,否则当其为噪声
        public Integer call(double[] probabilities) throws Exception {
            double max = 0.5;
            int index = -1;
            for(int i = 0; i < probabilities.length; i++){
                if(max <= probabilities[i]){
                    index = i;
                    break;
                }
            }
            return index;
        }
    }

7.从每个簇中提取主要标签词

//在每个簇中提取主标签
        Object[] output= resultRDD.collect().toArray();  //得到每个数据点属于的簇
        Object[]  seg = segRDD.collect().toArray();    //得到每个数据点原来的标签词
        //集合不同簇各自的标签词
        List<Tuple2<Integer, String>> list = new ArrayList<Tuple2<Integer, String>>();
        for(int i = 0; i<output.length; i++){
            int group = (Integer) output[i];
            String tags = (String) seg[i];
            Tuple2<Integer, String> one = new Tuple2<Integer, String>(group, tags);
            list.add(one);
        }
        JavaPairRDD<Integer, String>  rddValue = sc.parallelizePairs(list);
        JavaPairRDD<Integer, Iterable<String>> groupRDD = rddValue.groupByKey();  //按簇归类
        JavaRDD<Tuple2<Integer, String>> tagsRDD = groupRDD.map(new ReduceString()); //将不同的标签混合在一块
        JavaRDD<Tuple2<Integer,String>> topKRDD = tagsRDD.map(new TopTag()); //找出前k个具有代表性的标签

static class ReduceString implements Function<Tuple2<Integer, Iterable<String>>, Tuple2<Integer, String>>{
        //合并标签词
        public Tuple2<Integer, String> call(Tuple2<Integer, Iterable<String>> clusterString){
            int key = clusterString._1();
            StringBuffer sb = new StringBuffer();
            Iterable<String> iter = clusterString._2();
            for( String string: iter){
                sb.append(string + " ");
            }
            return new Tuple2(key, sb.toString().trim());
        }
    }

static class TopTag implements Function<Tuple2<Integer, String>, Tuple2<Integer, String>>{
        //将所有的标签收集,排序,找出频率最高的前k个标签词 
        int topK = 3; 

        public Tuple2<Integer, String> call(Tuple2<Integer, String> cluster){
            int key = cluster._1();
            String[] taglist = cluster._2().split(" ");
            Map<String, Integer> map = new HashMap<String, Integer>();
            for(String tag: taglist){
                if(!map.containsKey(tag)){
                    map.put(tag, 1);
                }
                else{
                    int count = map.get(tag);
                    map.put(tag, count + 1);
                }
            }

            List<Map.Entry<String, Integer>> infolds = new ArrayList<Map.Entry<String, Integer>>(map.entrySet());
            Collections.sort(infolds, new Comparator<Map.Entry<String, Integer>>(){
                public int compare(Map.Entry<String, Integer>o1, Map.Entry<String, Integer>o2){
                    return (o2.getValue() - o1.getValue());
                }
            });
            String str = "";
            int num = 0;
            for(Map.Entry<String, Integer> one: infolds){
                str += one.getKey() + " ";
                if(num == topK){
                    break;
                }
                num += 1;
            }
            return new Tuple2<Integer, String>(key, str.trim());
        }
    }

8.输出结果

//输出结果
        List<Tuple2<Integer, String>> reducelist = topKRDD.collect();
        for(Tuple2<Integer, String> tags: reducelist){
            System.out.println(tags._1() + ":" + tags._2());
        }

结果如下:

0:机器学习 检索 信息
1:电影 推荐 好看

我将具体的代码放置我的github中:
https://www.github.com/Quincy1994/SparkStudy/tree/master/cluster

最后

以上就是明亮彩虹为你收集整理的利用spark做文本聚类分析的全部内容,希望文章能够帮你解决利用spark做文本聚类分析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(48)

评论列表共有 0 条评论

立即
投稿
返回
顶部