当前位置:   article > 正文

flume使用(二):采集远程日志数据到MySql数据库_日志 flume mysql

日志 flume mysql

本文内容可查看目录

本文内容包含单节点(单agent)和多节点(多agent,采集远程日志)说明

一、环境

linux系统:Centos7
Jdk:1.7

Flume:1.7.0

二、安装

linux中jdk、mysql的安装不多赘述

flume1.7的安装:进入官网:http://flume.apache.org/

然后找到1.7版本下载放到centos系统解压即可


三、准备数据库表

注,本文flume的event是execSource来源。即通过执行linux命令获得执行结果作为flume的数据源。通过自定义MysqlSink作为flume的sink。

创建sql语句:

CREATE TABLE `flume_test` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

四、MysqlSink编写

4.1.maven创建项目(打包方式为jar)

pom.xml文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>yichao.mym</groupId>
  <artifactId>flumeDemo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  
     <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-configuration</artifactId>
            <version>1.7.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.15</version>
		</dependency>
    </dependencies>
</project>

4.2 准备java Bean

与数据库表对应的javabean,方便处理event的body(event的body就是execSource的命令读取的内容)

package yichao.mym.base.bean;

public class Person {

	private String name;
	
	private Integer age;

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public Integer getAge() {
		return age;
	}

	public void setAge(Integer age) {
		this.age = age;
	}
	
}

4.3 自定义的sink编写

说明都在代码中

  1. package yichao.mym.base.bean;
  2. import java.sql.Connection;
  3. import java.sql.DriverManager;
  4. import java.sql.PreparedStatement;
  5. import java.sql.SQLException;
  6. import java.util.List;
  7. import org.apache.flume.Channel;
  8. import org.apache.flume.Context;
  9. import org.apache.flume.Event;
  10. import org.apache.flume.EventDeliveryException;
  11. import org.apache.flume.Transaction;
  12. import org.apache.flume.conf.Configurable;
  13. import org.apache.flume.sink.AbstractSink;
  14. import org.slf4j.Logger;
  15. import org.slf4j.LoggerFactory;
  16. import com.google.common.base.Preconditions;
  17. import com.google.common.base.Throwables;
  18. import com.google.common.collect.Lists;
  19. public class MysqlSink extends AbstractSink implements Configurable {
  20. private Logger LOG = LoggerFactory.getLogger(MysqlSink.class);
  21. private String hostname;
  22. private String port;
  23. private String databaseName;
  24. private String tableName;
  25. private String user;
  26. private String password;
  27. private PreparedStatement preparedStatement;
  28. private Connection conn;
  29. private int batchSize; //每次提交的批次大小
  30. public MysqlSink() {
  31. LOG.info("MySqlSink start...");
  32. }
  33. /**实现Configurable接口中的方法:可获取配置文件中的属性*/
  34. public void configure(Context context) {
  35. hostname = context.getString("hostname");
  36. Preconditions.checkNotNull(hostname, "hostname must be set!!");
  37. port = context.getString("port");
  38. Preconditions.checkNotNull(port, "port must be set!!");
  39. databaseName = context.getString("databaseName");
  40. Preconditions.checkNotNull(databaseName, "databaseName must be set!!");
  41. tableName = context.getString("tableName");
  42. Preconditions.checkNotNull(tableName, "tableName must be set!!");
  43. user = context.getString("user");
  44. Preconditions.checkNotNull(user, "user must be set!!");
  45. password = context.getString("password");
  46. Preconditions.checkNotNull(password, "password must be set!!");
  47. batchSize = context.getInteger("batchSize", 100); //设置了batchSize的默认值
  48. Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");
  49. }
  50. /**
  51. * 服务启动时执行的代码,这里做准备工作
  52. */
  53. @Override
  54. public void start() {
  55. super.start();
  56. try {
  57. //调用Class.forName()方法加载驱动程序
  58. Class.forName("com.mysql.jdbc.Driver");
  59. } catch (ClassNotFoundException e) {
  60. e.printStackTrace();
  61. }
  62. String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName;
  63. //调用DriverManager对象的getConnection()方法,获得一个Connection对象
  64. try {
  65. conn = DriverManager.getConnection(url, user, password);
  66. conn.setAutoCommit(false);
  67. //创建一个Statement对象
  68. preparedStatement = conn.prepareStatement("insert into " + tableName +
  69. " (name,age) values (?,?)");
  70. } catch (SQLException e) {
  71. e.printStackTrace();
  72. System.exit(1);
  73. }
  74. }
  75. /**
  76. * 服务关闭时执行
  77. */
  78. @Override
  79. public void stop() {
  80. super.stop();
  81. if (preparedStatement != null) {
  82. try {
  83. preparedStatement.close();
  84. } catch (SQLException e) {
  85. e.printStackTrace();
  86. }
  87. }
  88. if (conn != null) {
  89. try {
  90. conn.close();
  91. } catch (SQLException e) {
  92. e.printStackTrace();
  93. }
  94. }
  95. }
  96. /**
  97. * 执行的事情:<br/>
  98. (1)持续不断的从channel中获取event放到batchSize大小的数组中<br/>
  99. (2)event可以获取到则进行event处理,否则返回Status.BACKOFF标识没有数据提交<br/>
  100. (3)batchSize中有内容则进行jdbc提交<br/>
  101. */
  102. public Status process() throws EventDeliveryException {
  103. Status result = Status.READY;
  104. Channel channel = getChannel();
  105. Transaction transaction = channel.getTransaction();
  106. Event event;
  107. String content;
  108. List<Person> persons = Lists.newArrayList();
  109. transaction.begin();
  110. try {
  111. /*event处理*/
  112. for (int i = 0; i < batchSize; i++) {
  113. event = channel.take();
  114. if (event != null) {//对事件进行处理
  115. //event 的 body 为 "exec tail-event-$i , $i"
  116. content = new String(event.getBody());
  117. Person person=new Person();
  118. if (content.contains(",")) {
  119. //存储 event 的 content
  120. person.setName(content.substring(0, content.indexOf(",")));
  121. //存储 event 的 create +1 是要减去那个 ","
  122. person.setAge(Integer.parseInt(content.substring(content.indexOf(",")+1).trim()));
  123. }else{
  124. person.setName(content);
  125. }
  126. persons.add(person);
  127. } else {
  128. result = Status.BACKOFF;
  129. break;
  130. }
  131. }
  132. /*jdbc提交*/
  133. if (persons.size() > 0) {
  134. preparedStatement.clearBatch();
  135. for (Person temp : persons) {
  136. preparedStatement.setString(1, temp.getName());
  137. preparedStatement.setInt(2, temp.getAge());
  138. preparedStatement.addBatch();
  139. }
  140. preparedStatement.executeBatch();
  141. conn.commit();
  142. }
  143. transaction.commit();
  144. } catch (Exception e) {
  145. try {
  146. transaction.rollback();
  147. } catch (Exception e2) {
  148. LOG.error("Exception in rollback. Rollback might not have been.successful.", e2);
  149. }
  150. LOG.error("Failed to commit transaction.Transaction rolled back.", e);
  151. Throwables.propagate(e);
  152. } finally {
  153. transaction.close();
  154. }
  155. return result;
  156. }
  157. }
编写好后打包成jar,发送到 flume安装目录下的lib文件夹中。同时 把mysql的驱动包mysql-connector-java一起放过去


4.4 conf配置:编写mysqlSink.conf(单agent的测试)

在flume的conf 文件夹下新建配置文件 mysqlSink.conf 内容如下:

agent1.sources=source1
agent1.channels=channel1
agent1.sinks=mysqlSink

# describe/configure source1
# type:exec is through linux command like 'tail -F' to collect logData
agent1.sources.source1.type=exec
agent1.sources.source1.command=tail -F /usr/local/tomcat/logs/ac.log
agent1.sources.source1.channels=channel1

# use a channel which buffers events in memory
# type:memory or file is to temporary to save buffer data which is sink using
agent1.channels.channel1.type=memory
agent1.channels.channel1.capacity=5000
agent1.channels.channel1.transactionCapacity=1000

# describe sink. there are using mysqlSink that is a jar
agent1.sinks.mysqlSink.type=yichao.mym.base.bean.MysqlSink
agent1.sinks.mysqlSink.hostname=localhost
agent1.sinks.mysqlSink.port=3306
agent1.sinks.mysqlSink.databaseName=firstflume
agent1.sinks.mysqlSink.tableName=flume_test
agent1.sinks.mysqlSink.user=root
agent1.sinks.mysqlSink.password=123456
agent1.sinks.mysqlSink.channel=channel1
agent1.sinks.mysqlSink.batchSize=5

说明:

(1)localhost 为mysql 数据库所在的服务器IP;

(2)/usr/local/tomcat/logs/ac.log;

(3)yichao.mym.base.bean.MysqlSink是自定义sink的mysqlsink的全称

重点:capacity(channel大小) > transactionCapacity(大小是每次flume的事务大小) > batchSize(sink会一次从channel中取多少个event去发送)。

这些数值应根据实时性要求、并发量、占用系统资源等方面权衡设计,但必须遵循以上标准。flume官方却没有这样的说明,一旦没有遵循,执行过程中就会报错!

五、准备测试

启动flume:在flume安装目录下的bin目录中:

./flume-ng agent -c ../conf -f ../conf/mysqlSink.conf -n agent1 -Dflume.root.logger=INFO,console

启动服务后,可以模拟log文件的动态增长,新开终端,通过shell命令:

for i in {1..100};do echo "exec tail-name-$i,$i" >> /usr/local/tomcat/logs/ac.log;sleep 1;done;

此时可以快速刷新数据库的数据表,可以看到数据正在动态增长:



-----------------------------------------------------------------------------------------------------

六、多节点多agent

1.说明架构方式

两台可互相通信的linux机器:

201机器:安装好jdk1.7,mysql,flume1.7

202机器:安装好jdk1.7,flume1.7

结构:

不过本案例中,agent1、agent2、agent3都是execSource源,即直接读取磁盘上的log文件,而不是log4j直接作为agent的source。

那么对于本案例,202机器就作为其中一个agent收集者(agent1、agent2、agent3),把从本机上收集的log内容发送到远程的201机器。他们之间就是使用avro作为传输协议。

所以本案例202机器的:

source:exec (tail -F /usr/local/tomcat/logs/ac.log)

channel:memory

sink:avro

本案例201机器的:

source:avro

channel:memory

sink:自定义的mysqlSink


注:表、自定义的sink的jar、javaBean都和之前的一致

2.两个agent的配置文件conf

202机器的flume配置文件:tail-avro.conf

agent1.sources=source1  
agent1.channels=channel1  
agent1.sinks=mysqlSink  
  
# describe/configure source1  
# type:exec is through linux command like 'tail -F' to collect logData  
agent1.sources.source1.type=exec  
agent1.sources.source1.command=tail -F /usr/local/tomcat/logs/ac.log  
agent1.sources.source1.channels=channel1  
  
# use a channel which buffers events in memory  
# type:memory or file is to temporary to save buffer data which is sink using  
agent1.channels.channel1.type=memory  
agent1.channels.channel1.capacity=5000  
agent1.channels.channel1.transactionCapacity=1000  

agent1.sinks.mysqlSink.type=avro
agent1.sinks.mysqlSink.channel=channel1
agent1.sinks.mysqlSink.hostname=192.168.216.201
agent1.sinks.mysqlSink.port=4545
agent1.sinks.mysqlSink.batch-size=5


201机器的flume配置文件:avro-mysql.conf

agent1.sources=source1  
agent1.channels=channel1  
agent1.sinks=mysqlSink  
  
# describe/configure source1  
# type:avro is through net-protocal-transport to collect logData  
agent1.sources.source1.type = avro
agent1.sources.source1.channels = channel1
agent1.sources.source1.bind = 192.168.216.201
agent1.sources.source1.port = 4545
  
# use a channel which buffers events in memory  
# type:memory or file is to temporary to save buffer data which is sink using  
agent1.channels.channel1.type=memory  
agent1.channels.channel1.capacity=5000  
agent1.channels.channel1.transactionCapacity=1000  

# describe sink. there are using mysqlSink that is a jar  
agent1.sinks.mysqlSink.type=yichao.mym.base.bean.MysqlSink  
agent1.sinks.mysqlSink.hostname=localhost  
agent1.sinks.mysqlSink.port=3306  
agent1.sinks.mysqlSink.databaseName=firstflume  
agent1.sinks.mysqlSink.tableName=flume_test  
agent1.sinks.mysqlSink.user=root  
agent1.sinks.mysqlSink.password=123456  
agent1.sinks.mysqlSink.channel=channel1  
agent1.sinks.mysqlSink.batchSize=5  


分别配置好并且启动服务。(可先启动机器201,因为机器202需要连接机器201)

3.启动测试

机器 201 的flume启动命令:在flume目录下的bin目录中执行

./flume-ng agent -c ../conf -f ../conf/avro-mysql.conf -n agent1 -Dflume.root.logger=INFO,console

机器 202 的flume启动命令:在flume目录下的bin目录中执行

./flume-ng agent -c ../conf -f ../conf/tail-avro.conf -n agent1 -Dflume.root.logger=INFO,console


启动完之后在机器202上进行模拟log文件数据动态生成:

for i in {1..150};do echo "exec tail-name-$i,$i" >> /usr/local/tomcat/logs/ac.log;sleep 1;done;


此时可以查看机器201上的数据库表的数据是否有动态添加:


至此多节点agent的测试完成!






声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/640274
推荐阅读
相关标签
  

闽ICP备14008679号