当前位置 博文首页 > Shockang的博客:Flume 如何自定义 Mysql Source?

    Shockang的博客:Flume 如何自定义 Mysql Source?

    作者:[db:作者] 时间:2021-08-24 13:27

    前言

    本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

    本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系

    正文

    场景描述

    官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些source。
    
    如:实时监控MySQL,从MySQL中获取数据传输到HDFS或者其他存储框架,所以此时需要我们自己实现MySQLSource。
    

    自定义MysqlSource步骤

    • 1、根据官方说明自定义 mysqlsource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。
    • 2、实现对应的方法
      • configure(Context context)
        • 初始化context
      • process()
        • 从mysql表中获取数据,然后把数据封装成event对象写入到channel,该方法被一直调用
      • stop()
        • 关闭相关资源

    实践

    1. 创建 mysql 数据库以及 mysql 数据库表
    --创建一个数据库
    CREATE DATABASE IF NOT EXISTS mysqlsource DEFAULT CHARACTER SET utf8 ;
    
    --创建一个表,用户保存拉取目标表位置的信息
    CREATE TABLE mysqlsource.flume_meta (
                                            source_tab varchar(255) NOT NULL,
                                            currentIndex varchar(255) NOT NULL,
                                            PRIMARY KEY (source_tab)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    --插入数据
    insert  into mysqlsource.flume_meta(source_tab,currentIndex) values ('student','4');
    
    
    --创建要拉取数据的表
    CREATE TABLE mysqlsource.student(
                                        id int(11) NOT NULL AUTO_INCREMENT,
                                        name varchar(255) NOT NULL,
                                        PRIMARY KEY (id)
    ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;
    
    --向student表中添加测试数据
    insert  into mysqlsource.student(id,name) values (1,'zhangsan'),(2,'lisi'),(3,'wangwu'),(4,'zhaoliu');
    
    1. 构建maven工程,添加依赖
        <properties>
            <flume.version>1.9.0</flume.version>
            <mysql.version>8.0.24</mysql.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>${flume.version}</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.12.0</version>
            </dependency>
        </dependencies>
    
    1. 在 resources 资源文件夹下添加 flume/jdbc.properties
    dbDriver=com.mysql.jdbc.Driver
    dbUrl=jdbc:mysql://node1:3306/mysqlsource?useUnicode=true&characterEncoding=utf-8
    dbUser=root
    dbPassword=123456
    
    1. 定义 QueryMysql 工具类
    package com.shockang.study.bigdata.flume;
    
    import org.apache.flume.Context;
    import org.apache.flume.conf.ConfigurationException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.sql.*;
    import java.text.ParseException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    
    public class QueryMysql {
        private static final Logger LOG = LoggerFactory.getLogger(QueryMysql.class);
    
        private int runQueryDelay,   //两次查询的时间间隔
                startFrom,            //开始id
                currentIndex,          //当前id
                recordSixe = 0,        //每次查询返回结果的条数
                maxRow;                //每次查询的最大条数
    
        private String table,          //要操作的表
                columnsToSelect,       //用户传入的查询的列
                customQuery,          //用户传入的查询语句
                query,                 //构建的查询语句
                defaultCharsetResultSet;//编码集
    
        //上下文,用来获取配置文件
        private Context context;
    
        //为定义的变量赋值(默认值),可在flume任务的配置文件中修改
        private static final int DEFAULT_QUERY_DELAY = 10000;
        private static final int DEFAULT_START_VALUE = 0;
        private static final int DEFAULT_MAX_ROWS = 2000;
        private static final String DEFAULT_COLUMNS_SELECT = "*";
        private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";
    
        private static Connection conn = null;
        private static PreparedStatement ps = null;
        private static String connectionURL, connectionUserName, connectionPassword;
    
        //加载静态资源
        static {
            Properties p = new Properties();
            try {
                p.load(QueryMysql.class.getClassLoader().getResourceAsStream("flume/jdbc.properties"));
                connectionURL = p.getProperty("dbUrl");
                connectionUserName = p.getProperty("dbUser");
                connectionPassword = p.getProperty("dbPassword");
                Class.forName(p.getProperty("dbDriver"));
            } catch (Exception e) {
                LOG.error(e.toString());
            }
        }
    
        //获取JDBC连接
        private static Connection InitConnection(String url, String user, String pw) {
            try {
                Connection conn = DriverManager.getConnection(url, user, pw);
                if (conn == null)
                    throw new SQLException();
                return conn;
            } catch (SQLException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        //构造方法
        QueryMysql(Context context) throws ParseException {
            //初始化上下文
            this.context = context;
    
            //有默认值参数:获取flume任务配置文件中的参数,读不到的采用默认值
            this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);
            this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);
            this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);
            this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);
    
            //无默认值参数:获取flume任务配置文件中的参数
            this.table = context.getString("table");
            this.customQuery = context.getString("custom.query");
    
            connectionURL = context.getString("connection.url");
            connectionUserName = context.getString("connection.user");
            connectionPassword = context.getString("connection.password");
            conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
    
            //校验相应的配置信息,如果没有默认值的参数也没赋值,抛出异常
            checkMandatoryProperties();
            //获取当前的id
            currentIndex = getStatusDBIndex(startFrom);
            //构建查询语句
            query = buildQuery();
        }
    
        //校验相应的配置信息(表,查询语句以及数据库连接的参数)
        private