我是靠谱客的博主 优美钥匙,最近开发中收集的这篇文章主要介绍flume day03(容灾、一<=>多)1.负载 均衡2.案例(容灾):读取1111端口数据 数据发送到 2222端口和3333端口 最终数据输出到 控制台3.flume核心组件4.案例(one2many 一个接收,发送两个端口)5.案例  many2one6.channle7.监控          ①source          ②channle          ③sink,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

  • 目录

    1.负载 均衡

    2.案例(容灾):读取1111端口数据 数据发送到 2222端口和3333端口 最终数据输出到 控制台

    1. 3个agent

    2. 启动   (从后向前启动)

    3. 容灾

    3.flume核心组件

    4.案例(one2many 一个接收,发送两个端口)

    1.三个agent

    2.启动(从后向前启动)

    5.案例  many2one

    1.四个agent

    2.启动

    6.channle

    7.监控         ①source         ②channle         ③sink

    1.监控手段

    2.参数解释·

    SOURCE

    CHANNEL

    SINK

    3.如何使用http+json 方式监控 flume

    1.数据

    2.agent

    3.启动agent

    4.网站监控


  • 1.负载 均衡

    • 高可用HA(High Availability) 
    • 定义:负载均衡是高可用网络基础架构的关键组件,通常用于将工作负载分布到多个服务器来提高网站、应用、数据库或其他服务的性能和可靠性。
    • HDFS
      • nn (NameNode):维护hdfs命名空间
                  存储整个hdfs文件块文件的信息 + client读写请求
      • dn(DataNode):存储数据块及块校验

      • snn (SecondaryNameNode):合并nn上面的镜像文件 =》 存储 整个hdfs 文件块的信息
                   默认 1h小时 3600s
    • sink
      • flume 为了防止sink 发生故障 ,负载均衡
      • 负载:failover       
      • 均衡:load_balance:
        • 1.随机发送数据 :random
        • 2.轮循发送数据 :round_robin 
    • 容灾:
      • sink 出现故障
         负载 
         均衡 
      • 均衡:load_balance 
        • 1.将数据分开 提供并行度的功能 减轻sink 压力  
        •  2.如果 第二个或者第三个 agent挂掉  数据都会发送到 没挂的sink 对应的agent上面 
    • 2.案例(容灾):读取1111端口数据 数据发送到 2222端口和3333端口 最终数据输出到 控制台

      • 思路
        		3个agent : 
        			agent1: 
        				source:nc 
        				channel :mem 
        				sink : avro  两个sink 
        
        			agent2:2222端口
        				source:avro 
        				channel :mem 
        				sink : logger  
        			agent3:3333端口 
        				source:avro 
        				channel :mem 
        				sink : logger
        
      • 1. 3个agent

        •  配置agent1.conf文件
           路径:/home/hadoop/project/flume/sink
           [hadoop@bigdata13 sink]$ vim agent1.conf
          agent1.sources = r1
          agent1.sinks = k1 k2
          agent1.channels = c1
          
          agent1.sources.r1.type = netcat
          agent1.sources.r1.bind = bigdata13
          agent1.sources.r1.port = 1111
          
          agent1.channels.c1.type = memory
          
          #定义sink 2222
          agent1.sinks.k1.type = avro
          agent1.sinks.k1.hostname = bigdata13
          agent1.sinks.k1.port = 2222
          
          #定义sink 3333
          agent1.sinks.k2.type = avro
          agent1.sinks.k2.hostname = bigdata13
          agent1.sinks.k2.port = 3333
          
          #定义sink processers
          agent1.sinkgroups = g1
          agent1.sinkgroups.g1.sinks = k1 k2
          agent1.sinkgroups.g1.processor.type = load_balance
          agent1.sinkgroups.g1.processor.backoff = true
          agent1.sinkgroups.g1.processor.selector = round_robin
          agent1.sinkgroups.g1.processor.selector.maxTimeOut=2000
          
          agent1.sources.r1.channels = c1
          agent1.sinks.k1.channel = c1
          agent1.sinks.k2.channel = c1

          a1.sinkgroups.g1.processor.backoff = true #如果开启,则将失败的sink放入黑名单
          a1.sinkgroups.g1.processor.selector = round_robin (轮询) # 另外还支持random(随机)
          a1.sinkgroups.g1.processor.selector.maxTimeOut=10000 #在黑名单放置的超时时间,超时结束时,若仍然无法接收,则超时时间呈指数增长

           

        •  配置agent2.conf文件
           路径:/home/hadoop/project/flume/sink
           [hadoop@bigdata13 sink]$ vim agent2.conf
          #agent2:2222端口
          agent2.sources = r1
          agent2.sinks = k1
          agent2.channels = c1
          
          agent2.sources.r1.type = avro
          agent2.sources.r1.bind = bigdata13
          agent2.sources.r1.port = 2222
          
          agent2.channels.c1.type = memory
          agent2.sinks.k1.type = logger
          
          agent2.sources.r1.channels = c1
          agent2.sinks.k1.channel = c1
          
        •  配置agent3.conf文件
           路径:/home/hadoop/project/flume/sink
           [hadoop@bigdata13 sink]$ vim agent3.conf
          #agent3: 3333端口
          agent3.sources = r1
          agent3.sinks = k1
          agent3.channels = c1
          
          agent3.sources.r1.type = avro
          agent3.sources.r1.bind = bigdata13
          agent3.sources.r1.port = 3333
          
          agent3.channels.c1.type = memory
          agent3.sinks.k1.type = logger
          
          agent3.sources.r1.channels = c1
          agent3.sinks.k1.channel = c1
      • 2. 启动   (从后向前启动)

        • 启动agent3.conf
          flume-ng agent 
          --name agent3 
          --conf ${FLUME_HOME}/conf 
          --conf-file /home/hadoop/project/flume/sink/agent3.conf 
          -Dflume.root.logger=info,console
          
        • 启动agent2.conf
          flume-ng agent 
          --name agent2 
          --conf ${FLUME_HOME}/conf 
          --conf-file /home/hadoop/project/flume/sink/agent2.conf 
          -Dflume.root.logger=info,console
          
        • 启动agent1.conf
          flume-ng agent 
          --name agent1 
          --conf ${FLUME_HOME}/conf 
          --conf-file /home/hadoop/project/flume/sink/agent1.conf 
          -Dflume.root.logger=info,console
          
        • 启动:[hadoop@bigdata13 sink]$ telnet bigdata13 1111
      • 3. 容灾

        •  1.agent
           配置agent1_failover.conf文件
           路径:/home/hadoop/project/flume/sink
           [hadoop@bigdata13 sink]$ vim agent1_failover.conf
          agent1.sources = r1
          agent1.sinks = k1 k2
          agent1.channels = c1
          
          agent1.sources.r1.type = netcat
          agent1.sources.r1.bind = bigdata13
          agent1.sources.r1.port = 1111
          
          agent1.channels.c1.type = memory
          
          #定义sink 2222
          agent1.sinks.k1.type = avro
          agent1.sinks.k1.hostname = bigdata13
          agent1.sinks.k1.port = 2222
          
          #定义sink 3333
          agent1.sinks.k2.type = avro
          agent1.sinks.k2.hostname = bigdata13
          agent1.sinks.k2.port = 3333
          
          #定义sink processers
          agent1.sinkgroups = g1
          agent1.sinkgroups.g1.sinks = k1 k2
          agent1.sinkgroups.g1.processor.type = failover
          agent1.sinkgroups.g1.processor.priority.k1 = 5
          agent1.sinkgroups.g1.processor.priority.k2 = 10
          agent1.sinkgroups.g1.processor.maxpenalty = 2000
          
          agent1.sources.r1.channels = c1
          agent1.sinks.k1.channel = c1
          agent1.sinks.k2.channel = c1

          #定义sink processers
          agent1.sinkgroups = g1                                                          #申明一个sinkgroups
          agent1.sinkgroups.g1.sinks = k1 k2                                       #设置2个sink
          agent1.sinkgroups.g1.processor.type = failover
          agent1.sinkgroups.g1.processor.priority.k1 = 5
          agent1.sinkgroups.g1.processor.priority.k2 = 10
          agent1.sinkgroups.g1.processor.maxpenalty = 2000
          k1与k2,其中2个优先级是5和10,而processor的maxpenalty被设置为10秒,默认是30秒。

          • agent2和agent3的内容同上

        • 启动
          • 启动agent3.conf
          • 启动agent2.conf
          • 启动agent1_failover.conf
            flume-ng agent 
            --name agent1 
            --conf ${FLUME_HOME}/conf 
            --conf-file /home/hadoop/project/flume/sink/agent1_failover.conf 
            -Dflume.root.logger=info,console
            

             当agent3没挂掉时  发送的33333顺利到达
             当agent3挂掉时 发送的22222到达agent2

  • 3.flume核心组件

    • sources
      • interceptors 拦截器 :主要处理采集的信息 数据转换?数据清洗
      • channel selectors :采集的数据 发送到哪个channel
    • channels
    • sinks
  • 4.案例(one2many 一个接收,发送两个端口)

  • 三个agent完成 上面的事情:
                                            agent1: 1111接收数据 发送 2222 和3333端口
                                            agent2: 接收2222 数据发送到 logger
                                            agent3: 接收3333 数据发送到 logger
    • 1.三个agent

      •  配置agent1.conf文件
         路径:/home/hadoop/project/flume/one2many
         [hadoop@bigdata13 one2many]$ vim agent1.conf
        agent1.sources = r1
        agent1.sinks = k1 k2
        agent1.channels = c1 c2
        
        agent1.sources.r1.type = netcat
        agent1.sources.r1.bind = bigdata13
        agent1.sources.r1.port = 1111
        
        #0 配置source channle
        agent1.sources.r1.selector.type = replicating
        agent1.sources.r1.channels = c1 c2
        
        #1.配置两个channel
        agent1.channels.c1.type = memory
        agent1.channels.c2.type = memory
        
        #定义sink hdfs
        #定义sink 2222
        agent1.sinks.k1.type = avro
        agent1.sinks.k1.hostname = bigdata13
        agent1.sinks.k1.port = 2222
        
        #定义sink 3333
        agent1.sinks.k2.type = avro
        agent1.sinks.k2.hostname = bigdata13
        agent1.sinks.k2.port = 3333
        
        #定义 连接
        agent1.sources.r1.channels = c1 c2
        agent1.sinks.k1.channel = c1
        agent1.sinks.k2.channel = c2

      •  配置agent2.conf文件
         路径:/home/hadoop/project/flume/sink
         [hadoop@bigdata13 one2many]$ cp agent2.conf /home/hadoop/project/flume/one2many
         (和上面的agent2.conf一样,所以采用cp命令)
        #agent2:2222端口
        agent2.sources = r1
        agent2.sinks = k1
        agent2.channels = c1
        
        agent2.sources.r1.type = avro
        agent2.sources.r1.bind = bigdata13
        agent2.sources.r1.port = 2222
        
        agent2.channels.c1.type = memory
        agent2.sinks.k1.type = logger
        
        agent2.sources.r1.channels = c1
        agent2.sinks.k1.channel = c1
        
      •  配置agent3.conf文件
         路径:/home/hadoop/project/flume/sink
         [hadoop@bigdata13 one2many]$ cp agent3.conf /home/hadoop/project/flume/one2many
         (和上面的agent3.conf一样,所以采用cp命令)
        #agent3: 3333端口
        agent3.sources = r1
        agent3.sinks = k1
        agent3.channels = c1
        
        agent3.sources.r1.type = avro
        agent3.sources.r1.bind = bigdata13
        agent3.sources.r1.port = 3333
        
        agent3.channels.c1.type = memory
        agent3.sinks.k1.type = logger
        
        agent3.sources.r1.channels = c1
        agent3.sinks.k1.channel = c1
    • 2.启动(从后向前启动)

      • 启动agent3.conf
        flume-ng agent 
        --name agent3 
        --conf ${FLUME_HOME}/conf 
        --conf-file /home/hadoop/project/flume/sink/agent3.conf 
        -Dflume.root.logger=info,console
        
      • 启动agent2.conf
        flume-ng agent 
        --name agent2 
        --conf ${FLUME_HOME}/conf 
        --conf-file /home/hadoop/project/flume/sink/agent2.conf 
        -Dflume.root.logger=info,console
        
      • 启动agent1.conf
        flume-ng agent 
        --name agent1 
        --conf ${FLUME_HOME}/conf 
        --conf-file /home/hadoop/project/flume/sink/agent1.conf 
        -Dflume.root.logger=info,console
        
      • 启动命令:[hadoop@bigdata13 one2many]$ telnet bigdata13 1111
  • 5.案例  many2one

  • 需求:多种日志采集到一个agent里面 之后 通过这个agent进行指定数据分发
               DL2262班里分为boy、girl、teacher发送数据
    • 1.四个agent

      •  配置agent1.conf文件
         路径:/home/hadoop/project/flume/many2one
         [hadoop@bigdata13 many2one]$ vim agent1.conf
        agent1.sources = r1
        agent1.sinks = k1
        agent1.channels = c1
        
        agent1.sources.r1.type = netcat
        agent1.sources.r1.bind = bigdata13
        agent1.sources.r1.port = 1111
        
        #添加一个拦截器 =》 数据清洗 + event打标签
        agent1.sources.r1.interceptors = i1
        agent1.sources.r1.interceptors.i1.type = static
        agent1.sources.r1.interceptors.i1.key = dl2262
        agent1.sources.r1.interceptors.i1.value = boy
        #0 配置source channle
        agent1.sources.r1.channels = c1
        #1.配置两个channel
        agent1.channels.c1.type = memory
        #定义sink 2222
        agent1.sinks.k1.type = avro
        agent1.sinks.k1.hostname = bigdata13
        agent1.sinks.k1.port = 2222
        #定义 连接
        agent1.sources.r1.channels = c1
        agent1.sinks.k1.channel = c1

      •  配置agent2.conf文件
         路径:/home/hadoop/project/flume/many2one
         [hadoop@bigdata13 many2one]$ vim agent2.conf
        agent2.sources = r1
        agent2.sinks = k1
        agent2.channels = c1
        
        agent2.sources.r1.type = netcat
        agent2.sources.r1.bind = bigdata13
        agent2.sources.r1.port = 1112
        
        #添加一个拦截器 =》 数据清洗 + event打标签
        agent2.sources.r1.interceptors = i1
        agent2.sources.r1.interceptors.i1.type = static
        agent2.sources.r1.interceptors.i1.key = dl2262
        agent2.sources.r1.interceptors.i1.value = girl
        #0 配置source channle
        agent2.sources.r1.channels = c1
        #1.配置两个channel
        agent2.channels.c1.type = memory
        #定义sink 2222
        agent2.sinks.k1.type = avro
        agent2.sinks.k1.hostname = bigdata13
        agent2.sinks.k1.port = 2222
        #定义 连接
        agent2.sources.r1.channels = c1
        agent2.sinks.k1.channel = c1

      •  配置agent3.conf文件
         路径:/home/hadoop/project/flume/many2one
         [hadoop@bigdata13 many2one]$ vim agent3.conf
        agent3.sources = r1
        agent3.sinks = k1
        agent3.channels = c1
        
        agent3.sources.r1.type = netcat
        agent3.sources.r1.bind = bigdata13
        agent3.sources.r1.port = 1113
        
        #添加一个拦截器 =》 数据清洗 + event打标签
        agent3.sources.r1.interceptors = i1
        agent3.sources.r1.interceptors.i1.type = static
        agent3.sources.r1.interceptors.i1.key = dl2262
        agent3.sources.r1.interceptors.i1.value = tea
        #0 配置source channle
        agent3.sources.r1.channels = c1
        #1.配置两个channel
        agent3.channels.c1.type = memory
        #定义sink 2222
        agent3.sinks.k1.type = avro
        agent3.sinks.k1.hostname = bigdata13
        agent3.sinks.k1.port = 2222
        #定义 连接
        agent3.sources.r1.channels = c1
        agent3.sinks.k1.channel = c1
        

      •  配置agent4.conf文件
         路径:/home/hadoop/project/flume/many2one
         [hadoop@bigdata13 many2one]$ vim agent4.conf
        agent4.sources = r1
        agent4.sinks = k1 k2 k3
        agent4.channels = c1 c2 c3
        
        agent4.sources.r1.type = avro
        agent4.sources.r1.bind = bigdata13
        agent4.sources.r1.port = 2222
        
        
        #0 配置source channle
        agent4.sources.r1.selector.type = multiplexing
        agent4.sources.r1.selector.header = dl2262
        agent4.sources.r1.selector.mapping.boy = c1
        agent4.sources.r1.selector.mapping.girl = c2
        agent4.sources.r1.selector.default = c3
        agent4.sources.r1.channels = c1 c2 c3
        
        #1.配置两个channel
        agent4.channels.c1.type = memory
        agent4.channels.c2.type = memory
        agent4.channels.c3.type = memory
        #定义sink logger
        agent4.sinks.k1.type =logger
        agent4.sinks.k2.type =logger
        agent4.sinks.k3.type =logger
        #定义 连接
        agent4.sources.r1.channels = c1 c2 c3
        agent4.sinks.k1.channel = c1
        agent4.sinks.k2.channel = c2
        agent4.sinks.k3.channel = c3

    • 2.启动

      • 启动agent4.conf
        flume-ng agent 
        --name agent4 
        --conf ${FLUME_HOME}/conf 
        --conf-file /home/hadoop/project/flume/many2one/agent4.conf 
        -Dflume.root.logger=info,console
        
      • 启动agent3.conf
        flume-ng agent 
        --name agent3 
        --conf ${FLUME_HOME}/conf 
        --conf-file /home/hadoop/project/flume/many2one/agent3.conf 
        -Dflume.root.logger=info,console
        
      • 启动agent2.conf
        flume-ng agent 
        --name agent2 
        --conf ${FLUME_HOME}/conf 
        --conf-file /home/hadoop/project/flume/many2one/agent2.conf 
        -Dflume.root.logger=info,console
        
      • 启动agent1.conf
        flume-ng agent 
        --name agent1 
        --conf ${FLUME_HOME}/conf 
        --conf-file /home/hadoop/project/flume/many2one/agent1.conf 
        -Dflume.root.logger=info,console
        
      • 三个启动
        • telnet bigdata13 1111

        • telnet bigdata13 1112

        • telnet bigdata13 1113

      • 结果

    • 6.channle

      • 1.默认容量 (capacity 100)

      • 2.事务容量  (transactionCapacity >= 100)
                 写事务: souce  => channle     (若出错,返回souce)
                  取事务:channle => sink         (若出错,返回channle)

    • 7.监控
               ①source
               ②channle
               ③sink

      • 1.监控手段

        • 1. flume提供的ganglia框架    【需要安装ganglia】

        • 2. 通过agent启动配置一些参数, 通过http方式获取   【推荐使用】
                 json数据 =》 http接口数据 =》

          • 1.前端人员 可视化界面展示

          • 2.采集 http接口数据 =》 mysql =》 可视化

      • 2.参数解释·

        • SOURCE

          	OpenConnectionCount(打开的连接数)
          	Type(组件类型)
          	AppendBatchAcceptedCount(追加到channel中的批数量)
          	AppendBatchReceivedCount(source端刚刚追加的批数量)
          	EventAcceptedCount(成功放入channel的event数量)
          	AppendReceivedCount(source追加目前收到的数量)
          	StartTime(组件开始时间)
          	StopTime(组件停止时间)
          	EventReceivedCount(source端成功收到的event数量)
          	AppendAcceptedCount(放入channel的event数量)

        • CHANNEL

          	EventPutSuccessCount(成功放入channel的event数量)
          	ChannelFillPercentage(通道使用比例)
          	Type(组件类型)
          	EventPutAttemptCount(尝试放入将event放入channel的次数)
          	ChannelSize(目前在channel中的event数量)
          	StartTime(组件开始时间)
          	StopTime(组件停止时间)
          	EventTakeSuccessCount(从channel中成功取走的event数量)
          	ChannelCapacity(通道容量)
          

        • SINK

          
          	BatchCompleteCount(完成的批数量)
          	ConnectionFailedCount(连接失败数)
          	EventDrainAttemptCount(尝试提交的event数量)
          	ConnectionCreatedCount(创建连接数)
          	Type(组件类型)
          	BatchEmptyCount(批量取空的数量)
          	ConnectionClosedCount(关闭连接数量)
          	EventDrainSuccessCount(成功发送event的数量)
          	StartTime(组件开始时间)
          	StopTime(组件停止时间)
          	BatchUnderflowCount(正处于批量处理的batch数)
          

      • 3.如何使用http+json 方式监控 flume

        • 1.数据

          • 路径:/home/hadoop/tmp

            for x in {1..2000}
            do 
              echo "dl2262,${x}" >> /home/hadoop/tmp/dt01.log
              sleep 0.1s
            done
        • 2.agent

        •  配置agent.conf文件
           路径:/home/hadoop/project/flume/monitor
           [hadoop@bigdata13 many2one]$ vim agent.conf

          a1.sources = r1
          a1.sinks = k1
          a1.channels = c1
          
          a1.sources.r1.type = TAILDIR
          a1.sources.r1.filegroups = f1
          a1.sources.r1.filegroups.f1=/home/hadoop/tmp/dt01.log
          
          a1.channels.c1.type = memory
          a1.sinks.k1.type = logger
          
          a1.sources.r1.channels = c1
          a1.sinks.k1.channel = c1
          
        • 3.启动agent

          flume-ng agent 
          --name a1 
          --conf ${FLUME_HOME}/conf 
          --conf-file /home/hadoop/project/flume/monitor/agent.conf 
          -Dflume.root.logger=info,console 
          -Dflume.monitoring.type=http 
          -Dflume.monitoring.port=9527
          
        • 4.网站监控

          • http://bigdata13:9527/metrics

             其中内容

            {
                "CHANNEL.c1": {
                    "ChannelCapacity": "100",
                    "ChannelFillPercentage": "0.0",
                    "Type": "CHANNEL",
                    "ChannelSize": "0",
                    "EventTakeSuccessCount": "2553",
                    "EventTakeAttemptCount": "2584",
                    "StartTime": "1671030728197",
                    "EventPutAttemptCount": "2553",
                    "EventPutSuccessCount": "2553",
                    "StopTime": "0"
                },
                "SOURCE.r1": {
                    "AppendBatchAcceptedCount": "32",
                    "GenericProcessingFail": "0",
                    "AppendReceivedCount": "0",
                    "EventAcceptedCount": "2553",
                    "StartTime": "1671030728579",
                    "AppendBatchReceivedCount": "32",
                    "ChannelWriteFail": "0",
                    "EventReceivedCount": "2553",
                    "EventReadFail": "0",
                    "Type": "SOURCE",
                    "AppendAcceptedCount": "0",
                    "OpenConnectionCount": "0",
                    "StopTime": "0"
                }
            }
             
        • 补充:   flume启动有error  尝试重启
                     监控 pid
                     监控 json数据
                     java后端监控页面

最后

以上就是优美钥匙为你收集整理的flume day03(容灾、一<=>多)1.负载 均衡2.案例(容灾):读取1111端口数据 数据发送到 2222端口和3333端口 最终数据输出到 控制台3.flume核心组件4.案例(one2many 一个接收,发送两个端口)5.案例  many2one6.channle7.监控          ①source          ②channle          ③sink的全部内容,希望文章能够帮你解决flume day03(容灾、一<=>多)1.负载 均衡2.案例(容灾):读取1111端口数据 数据发送到 2222端口和3333端口 最终数据输出到 控制台3.flume核心组件4.案例(one2many 一个接收,发送两个端口)5.案例  many2one6.channle7.监控          ①source          ②channle          ③sink所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(50)

评论列表共有 0 条评论

立即
投稿
返回
顶部