我是靠谱客的博主 清秀台灯,最近开发中收集的这篇文章主要介绍Storm之——Storm+Kafka+Flume+Zookeeper+MySQL实现数据实时分析(程序案例篇),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

改文章为转载,出处为http://blog.csdn.net/l1028386804/article/details/79441007,本文已按照作者要求转载







转载请注明出处:http://blog.csdn.net/l1028386804/article/details/79441007

一、前言

本博文是基于《Storm之——Storm+Kafka+Flume+Zookeeper+MySQL实现数据实时分析(环境搭建篇)》,请先阅读《Storm之——Storm+Kafka+Flume+Zookeeper+MySQL实现数据实时分析(环境搭建篇)》

首先我们启动服务器上的Storm、Kafka、Flume、Zookeeper和MySQL,具体参见博文《Storm之——Storm+Kafka+Flume+Zookeeper+MySQL实现数据实时分析(环境搭建篇)》。

二、简单介绍

为了方便,这里我们只是简单的向/home/flume/log.log中追加单词,每行一个单词,利用Storm接收每个单词,将单词计数更新到数据库,具体的逻辑为,如果数据库中没有相关单词,则将数据插入数据库,如果存在相关单词,则更新数据库中的计数。具体SQL逻辑参见博文《MySQL之——实现无数据插入,有数据更新》

三、程序实现

1、创建项目

创建Maven项目结构如下:


2、配置pom.xml


    
    
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation= "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0 </modelVersion>
  6. <groupId>com.lyz </groupId>
  7. <artifactId>storm-kafka-mysql </artifactId>
  8. <version>1.0-SNAPSHOT </version>
  9. <dependencies>
  10. <dependency>
  11. <groupId>org.apache.storm </groupId>
  12. <artifactId>storm-core </artifactId>
  13. <version>1.1.0 </version>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.apache.storm </groupId>
  17. <artifactId>storm-kafka </artifactId>
  18. <version>1.1.0 </version>
  19. </dependency>
  20. <dependency>
  21. <groupId>redis.clients </groupId>
  22. <artifactId>jedis </artifactId>
  23. <version>2.7.3 </version>
  24. </dependency>
  25. <dependency>
  26. <groupId>mysql </groupId>
  27. <artifactId>mysql-connector-java </artifactId>
  28. <version>5.1.28 </version>
  29. </dependency>
  30. <dependency>
  31. <groupId>c3p0 </groupId>
  32. <artifactId>c3p0 </artifactId>
  33. <version>0.9.1.2 </version>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.apache.kafka </groupId>
  37. <artifactId>kafka_2.12 </artifactId>
  38. <version>1.0.0 </version>
  39. <exclusions>
  40. <exclusion>
  41. <groupId>org.apache.zookeeper </groupId>
  42. <artifactId>zookeeper </artifactId>
  43. </exclusion>
  44. <exclusion>
  45. <groupId>log4j </groupId>
  46. <artifactId>log4j </artifactId>
  47. </exclusion>
  48. <exclusion>
  49. <groupId>org.slf4j </groupId>
  50. <artifactId>slf4j-log4j12 </artifactId>
  51. </exclusion>
  52. </exclusions>
  53. </dependency>
  54. <dependency>
  55. <groupId>org.apache.kafka </groupId>
  56. <artifactId>kafka-clients </artifactId>
  57. <version>1.0.0 </version>
  58. </dependency>
  59. </dependencies>
  60. <build>
  61. <plugins>
  62. <plugin>
  63. <artifactId>maven-assembly-plugin </artifactId>
  64. <configuration>
  65. <descriptorRefs>
  66. <descriptorRef>jar-with-dependencies </descriptorRef>
  67. </descriptorRefs>
  68. <archive>
  69. <manifest>
  70. <!--告诉运行的主类是哪个,注意根据自己的情况,下面的包名做相应的修改-->
  71. <mainClass>com.lyz.storm.StormTopologyDriver </mainClass>
  72. </manifest>
  73. </archive>
  74. </configuration>
  75. <executions>
  76. <execution>
  77. <id>make-assembly </id>
  78. <phase>package </phase>
  79. <goals>
  80. <goal>single </goal>
  81. </goals>
  82. </execution>
  83. </executions>
  84. </plugin>
  85. <plugin>
  86. <groupId>org.apache.maven.plugins </groupId>
  87. <artifactId>maven-compiler-plugin </artifactId>
  88. <configuration>
  89. <source>1.8 </source>
  90. <target>1.8 </target>
  91. </configuration>
  92. </plugin>
  93. </plugins>
  94. </build>
  95. </project>

3、实现单词分割计数的MySplitBolt类


    
    
  1. package com.lyz.storm.bolt;
  2. import org.apache.storm.topology.BasicOutputCollector;
  3. import org.apache.storm.topology.OutputFieldsDeclarer;
  4. import org.apache.storm.topology.base.BaseBasicBolt;
  5. import org.apache.storm.tuple.Fields;
  6. import org.apache.storm.tuple.Tuple;
  7. import org.apache.storm.tuple.Values;
  8. /
  9. * 这个Bolt模拟从kafkaSpout接收数据,并把数据信息发送给MyWordCountAndPrintBolt的过程。
  10. * @author liuyazhuang
  11. /
  12. public class MySplitBolt extends BaseBasicBolt {
  13. private static final long serialVersionUID = 4482101012916443908L;
  14. @Override
  15. public void execute(Tuple input, BasicOutputCollector collector) {
  16. //1、数据如何获取
  17. //如果StormTopologyDriver中的spout配置的是MyLocalFileSpout,则用的是declareOutputFields中的juzi这个key
  18. //byte[] juzi = (byte[]) input.getValueByField("juzi");
  19. //2、这里用这个是因为StormTopologyDriver这个里面的spout用的是KafkaSpout,而KafkaSpout中的declareOutputFields返回的是bytes,所以下面用bytes,这个地方主要模拟的是从kafka中获取数据
  20. byte[] juzi = ( byte[]) input.getValueByField( "bytes");
  21. //2、进行切割
  22. String[] strings = new String(juzi).split( " ");
  23. //3、发送数据
  24. for (String word : strings) {
  25. //Values对象帮我们生成一个list
  26. collector.emit( new Values(word, 1));
  27. }
  28. }
  29. @Override
  30. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  31. declarer.declare( new Fields( "word", "num"));
  32. }
  33. }

4、实现入库操作的MyWordCountAndPrintBolt类


    
    
  1. package com.lyz.storm.bolt;
  2. import java.sql.Connection;
  3. import java.sql.SQLException;
  4. import java.sql.Statement;
  5. import java.util.Map;
  6. import org.apache.storm.task.TopologyContext;
  7. import org.apache.storm.topology.BasicOutputCollector;
  8. import org.apache.storm.topology.OutputFieldsDeclarer;
  9. import org.apache.storm.topology.base.BaseBasicBolt;
  10. import org.apache.storm.tuple.Tuple;
  11. import com.lyz.storm.db.DBProvider;
  12. /
  13. * 用于统计分析,并且把统计分析的结果存储到mysql中。
  14. * @author liuyazhuang
  15. /
  16. public class MyWordCountAndPrintBolt extends BaseBasicBolt {
  17. private static final long serialVersionUID = 5564341843792874197L;
  18. private DBProvider provider;
  19. @Override
  20. public void prepare(Map stormConf, TopologyContext context) {
  21. //连接redis---代表可以连接任何事物
  22. provider = new DBProvider();
  23. super.prepare(stormConf,context);
  24. }
  25. @Override
  26. public void execute(Tuple input, BasicOutputCollector collector) {
  27. String word = (String) input.getValueByField( "word");
  28. Integer num = (Integer) input.getValueByField( "num");
  29. Connection conn = null;
  30. Statement stmt = null;
  31. try {
  32. conn = provider.getConnection();
  33. stmt = conn.createStatement() ;
  34. stmt.executeUpdate( "INSERT INTO word_count (word, count) VALUES ('" + word + "', " + num + ") ON DUPLICATE KEY UPDATE count = count + " + num) ;
  35. } catch (SQLException e) {
  36. e.printStackTrace();
  37. } finally{
  38. if(stmt != null){
  39. try {
  40. stmt.close();
  41. stmt = null;
  42. } catch (Exception e2) {
  43. e2.printStackTrace();
  44. }
  45. }
  46. if(conn != null){
  47. try {
  48. conn.close();
  49. conn = null;
  50. } catch (Exception e2) {
  51. e2.printStackTrace();
  52. }
  53. }
  54. }
  55. }
  56. @Override
  57. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  58. //todo 不需要定义输出的字段
  59. }
  60. }

5、实现操作数据库的DBProvider类


    
    
  1. package com.lyz.storm.db;
  2. import java.beans.PropertyVetoException;
  3. import java.sql.Connection;
  4. import java.sql.PreparedStatement;
  5. import java.sql.ResultSet;
  6. import java.sql.SQLException;
  7. import com.mchange.v2.c3p0.ComboPooledDataSource;
  8. /
  9. * JDBC操作数据库
  10. * @author liuyazhuang
  11. /
  12. public class DBProvider {
  13. private static ComboPooledDataSource source ;
  14. private static final String DB_DRIVER = "com.mysql.jdbc.Driver";
  15. private static final String DB_URL = "jdbc:mysql://127.0.0.1:3306/sharding_0?useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true";
  16. private static final String USER = "root";
  17. private static final String PASSWORD = "root";
  18. private static Connection connection;
  19. static{
  20. try {
  21. source = new ComboPooledDataSource();
  22. source.setDriverClass(DB_DRIVER);
  23. source.setJdbcUrl(DB_URL);
  24. source.setUser(USER);
  25. source.setPassword(PASSWORD);
  26. source.setInitialPoolSize( 10);
  27. source.setMaxPoolSize( 20);
  28. source.setMinPoolSize( 5);
  29. source.setAcquireIncrement( 1);
  30. source.setMaxIdleTime( 3);
  31. source.setMaxStatements( 3000);
  32. source.setCheckoutTimeout( 2000);
  33. } catch (PropertyVetoException e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. /
  38. * 获取数据库连接
  39. @return 数据库连接
  40. /
  41. public Connection getConnection() throws SQLException {
  42. connection = source.getConnection();
  43. return connection;
  44. }
  45. //关闭操作
  46. public static void closeConnection(Connection con){
  47. if(con!= null){
  48. try {
  49. con.close();
  50. } catch (SQLException e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. }
  55. public static void closeResultSet(ResultSet rs){
  56. if(rs!= null){
  57. try {
  58. rs.close();
  59. } catch (SQLException e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. }
  64. public static void closePreparedStatement(PreparedStatement ps){
  65. if(ps!= null){
  66. try {
  67. ps.close();
  68. } catch (SQLException e) {
  69. e.printStackTrace();
  70. }
  71. }
  72. }
  73. }

6、实现程序的入口类StormTopologyDriver


    
    
  1. package com.lyz.storm;
  2. import org.apache.storm.Config;
  3. import org.apache.storm.LocalCluster;
  4. import org.apache.storm.StormSubmitter;
  5. import org.apache.storm.generated.StormTopology;
  6. import org.apache.storm.kafka.KafkaSpout;
  7. import org.apache.storm.kafka.SpoutConfig;
  8. import org.apache.storm.kafka.ZkHosts;
  9. import org.apache.storm.topology.TopologyBuilder;
  10. import com.lyz.storm.bolt.MySplitBolt;
  11. import com.lyz.storm.bolt.MyWordCountAndPrintBolt;
  12. /*
  13. * 这个Driver使Kafka、strom、mysql进行串联起来。
  14. 这个代码执行前需要创建kafka的topic,创建代码如下:
  15. * [root@liuyazhuang kafka]# bin/kafka-topics.sh --create --zookeeper liuyazhuang1:2181 --replication-factor 1 -partitions 3 --topic wordCount
  16. 接着还要向kafka中传递数据,打开一个shell的producer来模拟生产数据
  17. * [root@liuyazhuang kafka]# bin/kafka-console-producer.sh --broker-list liuyazhuang:9092 --topic wordCount
  18. * 接着输入数据
  19. @author liuyazhuang
  20. */
  21. public class StormTopologyDriver {
  22. public static void main(String[] args) throws Exception {
  23. //1、准备任务信息
  24. TopologyBuilder topologyBuilder = new TopologyBuilder();
  25. SpoutConfig spoutConfig = new SpoutConfig( new ZkHosts( "192.168.209.121:2181"), "wordCount", "/wordCount", "wordCount");
  26. topologyBuilder.setSpout( "KafkaSpout", new KafkaSpout(spoutConfig), 2);
  27. topologyBuilder.setBolt( "bolt1", new MySplitBolt(), 4).shuffleGrouping( "KafkaSpout");
  28. topologyBuilder.setBolt( "bolt2", new MyWordCountAndPrintBolt(), 2).shuffleGrouping( "bolt1");
  29. //2、任务提交
  30. Config config = new Config();
  31. config.setNumWorkers( 2);
  32. StormTopology stormTopology = topologyBuilder.createTopology();
  33. if(args != null && args.length > 0){
  34. StormSubmitter.submitTopology(args[ 0], config, topologyBuilder.createTopology());
  35. } else{
  36. //本地模式
  37. LocalCluster localCluster = new LocalCluster();
  38. localCluster.submitTopology( "wordcount",config,stormTopology);
  39. }
  40. }
  41. }

7、创建数据库

执行如下脚本创建数据库


    
    
  1. create database sharding_0;
  2. CREATE TABLE word_count (
  3. id int( 11) NOT NULL AUTO_INCREMENT,
  4. word varchar( 255) DEFAULT '',
  5. count int( 11) DEFAULT NULL,
  6. PRIMARY KEY ( id),
  7. UNIQUE KEY word ( word) USING BTREE
  8. ) ENGINE= InnoDB AUTO_INCREMENT= 233 DEFAULT CHARSET=utf8;

至此,我们的程序案例编写完成。

四、温馨提示

大家可以到链接http://download.csdn.net/download/l1028386804/10269075下载完整的Storm+Kafka+Flume+Zookeeper+MySQL实现数据实时分析(程序案例篇)源代码





最后

以上就是清秀台灯为你收集整理的Storm之——Storm+Kafka+Flume+Zookeeper+MySQL实现数据实时分析(程序案例篇)的全部内容,希望文章能够帮你解决Storm之——Storm+Kafka+Flume+Zookeeper+MySQL实现数据实时分析(程序案例篇)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(55)

评论列表共有 0 条评论

立即
投稿
返回
顶部