概述
一.环境准备:linux + java的Jdk1.6以上版本
二.安装包下载地址:https://flume.apache.org/download.html
(下文我会用下载的apache-flume-1.6.0-bin.tar.gz包为例进行安装,浏览器下载地址:http://apache.fayea.com/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz)
三.进行安装很简单,解压安装包,配置flume环境变量。
1.解压安装包到/app目录下,解压命令
tar -zxvf
apache-flume-1.6.0-bin.tar.gz
2.解压后的文件名我改成简单好记的flume-1.6(此步骤可省略。。。)
3. 配置机器环境变量
将如下代码加到
/etc/profile
文件末尾
export FLUME_HOME=/app/flume-1.6.0
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=.:$PATH::$FLUME_HOME/bin
4.配置flume配置文件的环境变量
把flume-env.sh.template重命名为flume-env.sh
往里添加自己安装的javaJDK路径
export JAVA_HOME=/usr/java/jdk1.8.0_72
5.到这里flume-ng 安装完毕。
Flume-ng使用
简单原理介绍
这是一个关于池子的故事:有一个池子,它一头进水,另一头出水,进水口可以配置各种管子,出水口也可以配置各种管子,可以有多个进水口、多个出水口。水术语称为Event,进水口术语称为Source、出水口术语成为Sink、池子术语成为Channel,Source+Channel+Sink,术语称为Agent。如果有需要,还可以把多个Agent连起来
目标:单节点 Flume收集日志到mysql数据库
先进行搭建单节点 Flume收集日志配置,配置如下
#agentTotal name
agentTotal.channels = channelTotal
agentTotal.sources = sourceTotal
agentTotal.sinks = sinkTotal
#set sourceTotal
agentTotal.channels.channelTotal.type = memory
agentTotal.channels.channelTotal.capacity = 1000
agentTotal.channels.channelTotal.transactionCapacity = 100
agentTotal.sources.sourceTotal.channels = channelTotal
agentTotal.sources.sourceTotal.type = exec
agentTotal.sources.sourceTotal.command = cat /app/logs/tomcat1/info.log
# set SinkTotal
agentTotal.sinks.sinkTotal.type = flume.log.MysqlSink
agentTotal.sinks.sinkTotal.hostname=172.16.164.136
agentTotal.sinks.sinkTotal.port=3306
agentTotal.sinks.sinkTotal.databaseName=flume
agentTotal.sinks.sinkTotal.tableName=flume_log
agentTotal.sinks.sinkTotal.user=root
agentTotal.sinks.sinkTotal.password=shixiaolei
agentTotal.sinks.sinkTotal.localport=8081
agentTotal.sinks.sinkTotal.channel = channelTotal
配置解释:
这个配置文件定义了一个单独的叫做agentTotal的Agent。agentTotal有一个执行指定命令监听数据的Source,一个把Event暂存到Memory的Channel以及一个把Event数据持久化到mysql的Sink。这个配置文件命名了各种各样的组件,然后描述了他们的类型和配置参数。一个给定的配置文件可能定义了多个命名的Agent;当一个给定的Flume进程要被启动时,一个标志会传进去来告诉它到底哪个命名的Agent要被启动。
基于这个配置文件,我们可以通过以下方式启动Flume: (flume-ng未提供mysqlsink的实现,此处自己开发sink插件即可实现,具体开发见下文mysql sink插件开发)
拷贝上面的配置到/app/flume-1.6/conf/flume-conf.properties 配置文件中启动flume-ng即可进行日志收集
启动命令
nohup bin/flume-ng agent -c conf -f conf/flume_total.conf -n agentTotal -Dflume.root.logger=ERROR,console &
nohup bin/flume-ng agent -c conf -f conf/flume_source.conf -n
agentService
-Dflume.root.logger=ERROR,console &
命令解释:
-c/--conf 后跟配置目录,-f/--conf-file 后跟具体的配置文件,-n/--name 指定agent的名称
PS:-Dflume.root.logger=INFO,console 仅为 debug 使用,请勿生产环境生搬硬套,否则大量的日志会返回到终端。。。
mysql sink插件开发
新建maven项目添加依赖配置到pom.xml中
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.12</version>:
</dependency>
自定义sink需要实现Configurable接口代码
public class MysqlSink extends AbstractSink implements Configurable {
private Logger LOG = LoggerFactory.getLogger(MysqlSink.class);
private String hostname;
private String port;
private String databaseName;
private String tableName;
private String user;
private String password;
private PreparedStatement preparedStatement;
private Connection conn;
private int batchSize;
public MysqlSink() {
LOG.info("MysqlSink start...");
}
@Override
public void configure(Context context) {
hostname = context.getString("hostname");
Preconditions.checkNotNull(hostname, "hostname must be set!!");
port = context.getString("port");
Preconditions.checkNotNull(port, "port must be set!!");
databaseName = context.getString("databaseName");
Preconditions.checkNotNull(databaseName, "databaseName must be set!!");
tableName = context.getString("tableName");
Preconditions.checkNotNull(tableName, "tableName must be set!!");
user = context.getString("user");
Preconditions.checkNotNull(user, "user must be set!!");
password = context.getString("password");
Preconditions.checkNotNull(password, "password must be set!!");
batchSize = context.getInteger("batchSize", 100);
Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");
}
@Override
public void start() {
super.start();
try {
//调用Class.forName()方法加载驱动程序
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName;
//调用DriverManager对象的getConnection()方法,获得一个Connection对象
try {
conn = DriverManager.getConnection(url, user, password);
conn.setAutoCommit(false);
//创建一个Statement对象
preparedStatement = conn.prepareStatement("insert into " + tableName +
" (content) values (?)");
} catch (SQLException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void stop() {
super.stop();
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event;
String content;
List<String> actions = Lists.newArrayList();
transaction.begin();
try {
for (int i = 0; i < batchSize; i++) {
event = channel.take();
if (event != null) {
content = new String(event.getBody());
actions.add(content);
} else {
result = Status.BACKOFF;
break;
}
}
if (actions.size() > 0) {
preparedStatement.clearBatch();
for (String temp : actions) {
preparedStatement.setString(1, temp);
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
conn.commit();
}
transaction.commit();
} catch (Throwable e) {
try {
transaction.rollback();
} catch (Exception e2) {
LOG.error("Exception in rollback. Rollback might not have been" +
"successful.", e2);
}
LOG.error("Failed to commit transaction." +
"Transaction rolled back.", e);
Throwables.propagate(e);
} finally {
transaction.close();
}
return result;
}
}
自定义的sink插件部署
对项目编译打成jar包放入flume-ng安装目录下的lib文件夹
上述过程不再进行讲解此处讲一下使用自定义的mysqlsink时遇到的问题
问题1:找不到mysql-connector的jar包
解决方法:拷贝mysql-connector驱动jar包放入flume-ng安装目录下的lib文件夹内
问题2:pom.xml依赖flume的版本需要和自己使用的flume版本一致,否则会出现莫名错误~
一.环境准备:linux + java的Jdk1.6以上版本
二.安装包下载地址:https://flume.apache.org/download.html
(下文我会用下载的apache-flume-1.6.0-bin.tar.gz包为例进行安装,浏览器下载地址:http://apache.fayea.com/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz)
三.进行安装很简单,解压安装包,配置flume环境变量。
1.解压安装包到/app目录下,解压命令
tar -zxvf
apache-flume-1.6.0-bin.tar.gz
2.解压后的文件名我改成简单好记的flume-1.6(此步骤可省略。。。)
3. 配置机器环境变量
将如下代码加到
/etc/profile
文件末尾
export FLUME_HOME=/app/flume-1.6.0
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=.:$PATH::$FLUME_HOME/bin
4.配置flume配置文件的环境变量
把flume-env.sh.template重命名为flume-env.sh
往里添加自己安装的javaJDK路径
export JAVA_HOME=/usr/java/jdk1.8.0_72
5.到这里flume-ng 安装完毕。
Flume-ng使用
简单原理介绍
这是一个关于池子的故事:有一个池子,它一头进水,另一头出水,进水口可以配置各种管子,出水口也可以配置各种管子,可以有多个进水口、多个出水口。水术语称为Event,进水口术语称为Source、出水口术语成为Sink、池子术语成为Channel,Source+Channel+Sink,术语称为Agent。如果有需要,还可以把多个Agent连起来
目标:单节点 Flume收集日志到mysql数据库
先进行搭建单节点 Flume收集日志配置,配置如下
#agentTotal name
agentTotal.channels = channelTotal
agentTotal.sources = sourceTotal
agentTotal.sinks = sinkTotal
#set sourceTotal
agentTotal.channels.channelTotal.type = memory
agentTotal.channels.channelTotal.capacity = 1000
agentTotal.channels.channelTotal.transactionCapacity = 100
agentTotal.sources.sourceTotal.channels = channelTotal
agentTotal.sources.sourceTotal.type = exec
agentTotal.sources.sourceTotal.command = cat /app/logs/tomcat1/info.log
# set SinkTotal
agentTotal.sinks.sinkTotal.type = flume.log.MysqlSink
agentTotal.sinks.sinkTotal.hostname=172.16.164.136
agentTotal.sinks.sinkTotal.port=3306
agentTotal.sinks.sinkTotal.databaseName=flume
agentTotal.sinks.sinkTotal.tableName=flume_log
agentTotal.sinks.sinkTotal.user=root
agentTotal.sinks.sinkTotal.password=shixiaolei
agentTotal.sinks.sinkTotal.localport=8081
agentTotal.sinks.sinkTotal.channel = channelTotal
配置解释:
这个配置文件定义了一个单独的叫做agentTotal的Agent。agentTotal有一个执行指定命令监听数据的Source,一个把Event暂存到Memory的Channel以及一个把Event数据持久化到mysql的Sink。这个配置文件命名了各种各样的组件,然后描述了他们的类型和配置参数。一个给定的配置文件可能定义了多个命名的Agent;当一个给定的Flume进程要被启动时,一个标志会传进去来告诉它到底哪个命名的Agent要被启动。
基于这个配置文件,我们可以通过以下方式启动Flume: (flume-ng未提供mysqlsink的实现,此处自己开发sink插件即可实现,具体开发见下文mysql sink插件开发)
拷贝上面的配置到/app/flume-1.6/conf/flume-conf.properties 配置文件中启动flume-ng即可进行日志收集
启动命令
nohup bin/flume-ng agent -c conf -f conf/flume_total.conf -n agentTotal -Dflume.root.logger=ERROR,console &
nohup bin/flume-ng agent -c conf -f conf/flume_source.conf -n
agentService
-Dflume.root.logger=ERROR,console &
命令解释:
-c/--conf 后跟配置目录,-f/--conf-file 后跟具体的配置文件,-n/--name 指定agent的名称
PS:-Dflume.root.logger=INFO,console 仅为 debug 使用,请勿生产环境生搬硬套,否则大量的日志会返回到终端。。。
mysql sink插件开发
新建maven项目添加依赖配置到pom.xml中
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.12</version>:
</dependency>
自定义sink需要实现Configurable接口代码
public class MysqlSink extends AbstractSink implements Configurable {
private Logger LOG = LoggerFactory.getLogger(MysqlSink.class);
private String hostname;
private String port;
private String databaseName;
private String tableName;
private String user;
private String password;
private PreparedStatement preparedStatement;
private Connection conn;
private int batchSize;
public MysqlSink() {
LOG.info("MysqlSink start...");
}
@Override
public void configure(Context context) {
hostname = context.getString("hostname");
Preconditions.checkNotNull(hostname, "hostname must be set!!");
port = context.getString("port");
Preconditions.checkNotNull(port, "port must be set!!");
databaseName = context.getString("databaseName");
Preconditions.checkNotNull(databaseName, "databaseName must be set!!");
tableName = context.getString("tableName");
Preconditions.checkNotNull(tableName, "tableName must be set!!");
user = context.getString("user");
Preconditions.checkNotNull(user, "user must be set!!");
password = context.getString("password");
Preconditions.checkNotNull(password, "password must be set!!");
batchSize = context.getInteger("batchSize", 100);
Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");
}
@Override
public void start() {
super.start();
try {
//调用Class.forName()方法加载驱动程序
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName;
//调用DriverManager对象的getConnection()方法,获得一个Connection对象
try {
conn = DriverManager.getConnection(url, user, password);
conn.setAutoCommit(false);
//创建一个Statement对象
preparedStatement = conn.prepareStatement("insert into " + tableName +
" (content) values (?)");
} catch (SQLException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void stop() {
super.stop();
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event;
String content;
List<String> actions = Lists.newArrayList();
transaction.begin();
try {
for (int i = 0; i < batchSize; i++) {
event = channel.take();
if (event != null) {
content = new String(event.getBody());
actions.add(content);
} else {
result = Status.BACKOFF;
break;
}
}
if (actions.size() > 0) {
preparedStatement.clearBatch();
for (String temp : actions) {
preparedStatement.setString(1, temp);
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
conn.commit();
}
transaction.commit();
} catch (Throwable e) {
try {
transaction.rollback();
} catch (Exception e2) {
LOG.error("Exception in rollback. Rollback might not have been" +
"successful.", e2);
}
LOG.error("Failed to commit transaction." +
"Transaction rolled back.", e);
Throwables.propagate(e);
} finally {
transaction.close();
}
return result;
}
}
自定义的sink插件部署
对项目编译打成jar包放入flume-ng安装目录下的lib文件夹
上述过程不再进行讲解此处讲一下使用自定义的mysqlsink时遇到的问题
问题1:找不到mysql-connector的jar包
解决方法:拷贝mysql-connector驱动jar包放入flume-ng安装目录下的lib文件夹内
问题2:pom.xml依赖flume的版本需要和自己使用的flume版本一致,否则会出现莫名错误~
最后
以上就是健壮悟空为你收集整理的Hadoop之--flume安装配置的全部内容,希望文章能够帮你解决Hadoop之--flume安装配置所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复