当前位置 博文首页 > Shockang的博客:Flume 如何自定义 Mysql Source?
本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!
本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系
官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些source。
如:实时监控MySQL,从MySQL中获取数据传输到HDFS或者其他存储框架,所以此时需要我们自己实现MySQLSource。
--创建一个数据库
CREATE DATABASE IF NOT EXISTS mysqlsource DEFAULT CHARACTER SET utf8 ;
--创建一个表,用户保存拉取目标表位置的信息
CREATE TABLE mysqlsource.flume_meta (
source_tab varchar(255) NOT NULL,
currentIndex varchar(255) NOT NULL,
PRIMARY KEY (source_tab)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--插入数据
insert into mysqlsource.flume_meta(source_tab,currentIndex) values ('student','4');
--创建要拉取数据的表
CREATE TABLE mysqlsource.student(
id int(11) NOT NULL AUTO_INCREMENT,
name varchar(255) NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;
--向student表中添加测试数据
insert into mysqlsource.student(id,name) values (1,'zhangsan'),(2,'lisi'),(3,'wangwu'),(4,'zhaoliu');
<properties>
<flume.version>1.9.0</flume.version>
<mysql.version>8.0.24</mysql.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>${flume.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
</dependencies>
dbDriver=com.mysql.jdbc.Driver
dbUrl=jdbc:mysql://node1:3306/mysqlsource?useUnicode=true&characterEncoding=utf-8
dbUser=root
dbPassword=123456
package com.shockang.study.bigdata.flume;
import org.apache.flume.Context;
import org.apache.flume.conf.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class QueryMysql {
private static final Logger LOG = LoggerFactory.getLogger(QueryMysql.class);
private int runQueryDelay, //两次查询的时间间隔
startFrom, //开始id
currentIndex, //当前id
recordSixe = 0, //每次查询返回结果的条数
maxRow; //每次查询的最大条数
private String table, //要操作的表
columnsToSelect, //用户传入的查询的列
customQuery, //用户传入的查询语句
query, //构建的查询语句
defaultCharsetResultSet;//编码集
//上下文,用来获取配置文件
private Context context;
//为定义的变量赋值(默认值),可在flume任务的配置文件中修改
private static final int DEFAULT_QUERY_DELAY = 10000;
private static final int DEFAULT_START_VALUE = 0;
private static final int DEFAULT_MAX_ROWS = 2000;
private static final String DEFAULT_COLUMNS_SELECT = "*";
private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";
private static Connection conn = null;
private static PreparedStatement ps = null;
private static String connectionURL, connectionUserName, connectionPassword;
//加载静态资源
static {
Properties p = new Properties();
try {
p.load(QueryMysql.class.getClassLoader().getResourceAsStream("flume/jdbc.properties"));
connectionURL = p.getProperty("dbUrl");
connectionUserName = p.getProperty("dbUser");
connectionPassword = p.getProperty("dbPassword");
Class.forName(p.getProperty("dbDriver"));
} catch (Exception e) {
LOG.error(e.toString());
}
}
//获取JDBC连接
private static Connection InitConnection(String url, String user, String pw) {
try {
Connection conn = DriverManager.getConnection(url, user, pw);
if (conn == null)
throw new SQLException();
return conn;
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
//构造方法
QueryMysql(Context context) throws ParseException {
//初始化上下文
this.context = context;
//有默认值参数:获取flume任务配置文件中的参数,读不到的采用默认值
this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);
this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);
this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);
this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);
//无默认值参数:获取flume任务配置文件中的参数
this.table = context.getString("table");
this.customQuery = context.getString("custom.query");
connectionURL = context.getString("connection.url");
connectionUserName = context.getString("connection.user");
connectionPassword = context.getString("connection.password");
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
//校验相应的配置信息,如果没有默认值的参数也没赋值,抛出异常
checkMandatoryProperties();
//获取当前的id
currentIndex = getStatusDBIndex(startFrom);
//构建查询语句
query = buildQuery();
}
//校验相应的配置信息(表,查询语句以及数据库连接的参数)
private