我是靠谱客的博主 高兴大叔,最近开发中收集的这篇文章主要介绍servlet接口通过log4j将数据写入到flume,并且flume将数据传送到hdfs,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、安装Hadoop集群
二、编写servlet接口,并配置log4j将数据传送至flume
三、flume安装并配置,将数据传送至hdfs
hadoop和flume的安装这里不写,主要是接口代码和flume配置文件
1、servlet接口,并配置log4j将数据传送至flume
(1)接口代码,将数据写入log4j

public class Pvstat_web extends HttpServlet {
    private static final long serialVersionUID = 1L;
    private static final Log logger = LogFactory.getLog("weblogger");

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
       
        List<String> logList = new ArrayList<String>();
        ...(获取参数值,并添加到list集合)
        StringBuffer line = new StringBuffer();
        Boolean isfirst = Boolean.valueOf(true);
        Iterator<String> it = logList.iterator();
        while (it.hasNext()) {
            if (isfirst.booleanValue())
                isfirst = Boolean.valueOf(false);
            else
                line.append("01");
            //将list集合中的数据以01为分隔符,拼接成字符串
            line.append((String) it.next());
        }
        //将拼接后的字符串写入日志
       logger.info(line.toString().replace("n", "").replace("r", ""));
    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        doGet(req, resp);
    }
}

(2)配置log4j.xml文件,将日志写入flume所在的服务器的端口中,负载均衡

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
       <appender name="logger2flume"
              class="org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender">
        <param name="Hosts"
               value="192.168.8.129:45682 192.168.8.101:45672 192.168.8.128:45672" />
        <param name="Selector" value="ROUND_ROBIN" />
        <param name="UnsafeMode" value="true" />
        <param name="MaxBackoff" value="20000" />
        <filter class="org.apache.log4j.varia.LevelRangeFilter">
            <param name="levelMin" value="INFO" />
            <param name="levelMax" value="INFO" />
            <param name="AcceptOnMatch" value="true" />
        </filter>
    </appender>
    <appender name="logger2flumeasync" class="org.apache.log4j.AsyncAppender">
        <param name="BufferSize" value="8192" />
        <appender-ref ref="logger2flume" />
    </appender>
    <logger name="weblogger" additivity="false">
        <level value="INFO" />
        <appender-ref ref="logger2flumeasync"/>
    </logger>
    <logger name="org.apache">
        <level value="OFF" />
    </logger>
    <logger name="org.springframework">
        <level value="OFF" />
    </logger>
</log4j:configuration>

2、flume配置文件

event.sources = log4jSource
event.sinks = hdfsSink
event.channels = mchannel

#source
event.sources.log4jSource.type = avro
event.sources.log4jSource.bind = 10.0.1.60
event.sources.log4jSource.port = 45682

event.sources.log4jSource.interceptors = i1
event.sources.log4jSource.interceptors.i1.type = com.sid.flume.Builder
event.sources.log4jSource.interceptors.i1.siid = siid

# hdfsSink
event.sinks.hdfsSink.type=hdfs
event.sinks.hdfsSink.hdfs.path=hdfs://192.168.8.129:9000/stage/event/%{siid}/%Y%m%d/%H
event.sinks.hdfsSink.hdfs.fileType=DataStream
event.sinks.hdfsSink.hdfs.writeFormat=Text
event.sinks.hdfsSink.hdfs.batchSize=100
event.sinks.hdfsSink.hdfs.minBlockReplicas=1
event.sinks.hdfsSink.hdfs.rollSize=66584576
event.sinks.hdfsSink.hdfs.rollCount=0
event.sinks.hdfsSink.hdfs.idleTimeout=180
event.sinks.hdfsSink.hdfs.threadsPoolSize=20
event.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
event.sinks.hdfsSink.hdfs.closeTries=3
event.sinks.hdfsSink.hdfs.retryInterval=120

event.channels.mchannel.type = memory
event.channels.mchannel.capacity = 1000
event.channels.mchannel.transactionCapacity = 100

event.sources.log4jSource.channels = mchannel
event.sinks.hdfsSink.channel = mchannel

因为业务要求,自定义了一个简单的拦截器,需求是将body内的数据的第三个字段作为hdfs路径。
自定义拦截器代码
(1)pom依赖

        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-sdk</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
        </dependency>

(2)业务代码
Builder类

import org.apache.flume.Context;
import org.apache.flume.interceptor.Interceptor;

public class Builder implements Interceptor.Builder {
    private String dt = null;
    private String hour = null;
    private String siid = null;
    @Override
    public Interceptor build() {
        return new SidTestInterceptor(dt,hour,siid);
    }

    @Override
    public void configure(Context context) {
        String dts =  context.getString("dt");
        String hours =  context.getString("hour");
        String siids = context.getString("siid");
        dt = dts;
        hour = hours;
        siid = siids;
    }
}

方法类

import com.google.common.base.Charsets;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;
import java.util.Map;

public class SidTestInterceptor implements Interceptor { 
    private String siid = null;

    public SidTestInterceptor(String siid) {
        this.siid = siid;
    }

    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        //获取body内容
        String body = new String(event.getBody(), Charsets.UTF_8);
        //将内容进行分隔
        String[] split = body.split("\u0001");
        //获取第三个元素
        String siids = split[2];
        // 获取Event的header键值对
        Map<String, String> headers = event.getHeaders();
        if (siids!=null){
            headers.put("siid",siids);
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {

    }
}

最后

以上就是高兴大叔为你收集整理的servlet接口通过log4j将数据写入到flume,并且flume将数据传送到hdfs的全部内容,希望文章能够帮你解决servlet接口通过log4j将数据写入到flume,并且flume将数据传送到hdfs所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(80)

评论列表共有 0 条评论

立即
投稿
返回
顶部