当前位置 博文首页 > Shockang的博客:Flume 如何自定义 Mysql Sink?

    Shockang的博客:Flume 如何自定义 Mysql Sink?

    作者:[db:作者] 时间:2021-08-24 13:27

    前言

    本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

    本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系

    正文

    场景描述

    官方提供的sink类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些sink。
    
    如:需要把接受到的数据按照规则进行过滤之后写入到某张mysql表中,所以此时需要我们自己实现MySQLSink。
    

    自定义 Mysql Sink 步骤

    • 1、根据官方说明自定义 MysqlSink 需要继承 AbstractSink 类并实现 Configurable

    • 2、实现对应的方法

      • configure(Context context)
        • 初始化context
      • start()
        • 启动准备操作
      • process()
        • 从channel获取数据,然后解析之后,保存在mysql表中
      • stop()
        • 关闭相关资源

    实践

    1. 创建 mysql 数据库以及 mysql 数据库表
    --创建一个数据库
    CREATE
    DATABASE IF NOT EXISTS mysqlsource DEFAULT CHARACTER SET utf8 ;
    
    --创建一个表,用户保存拉取目标表位置的信息
    CREATE TABLE mysqlsource.flume2mysql
    (
        id         int(11) NOT NULL AUTO_INCREMENT,
        createTime varchar(64)  NOT NULL,
        content    varchar(255) NOT NULL,
        PRIMARY KEY (id)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    1. 构建maven工程,添加依赖
        <properties>
            <flume.version>1.9.0</flume.version>
            <mysql.version>8.0.24</mysql.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>${flume.version}</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.12.0</version>
            </dependency>
        </dependencies>
    
    1. 定义 MysqlSink 类
    package com.shockang.study.bigdata.flume;
    
    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.Transaction;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * 自定义MysqlSink
     */
    public class MysqlSink extends AbstractSink implements Configurable {
        private String mysqlurl = "";
        private String username = "";
        private String password = "";
        private String tableName = "";
    
        Connection con = null;
    
        @Override
        public Status process() {
            Status status = null;
            // Start transaction
            Channel ch = getChannel();
            Transaction txn = ch.getTransaction();
            txn.begin();
            try {
                Event event = ch.take();
    
                if (event != null) {
                    //获取body中的数据
                    String body = new String(event.getBody(), "UTF-8");
    
                    //如果日志中有以下关键字的不需要保存,过滤掉
                    if (body.contains("delete") || body.contains("drop") || body.contains("alert")) {
                        status = Status.BACKOFF;
                    } else {
    
                        //存入Mysql
                        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        String createtime = df.format(new Date());
    
                        PreparedStatement stmt = con.prepareStatement("insert into " + tableName + " (createtime, content) values (?, ?)");
                        stmt.setString(1, createtime);
                        stmt.setString(2, body);
                        stmt.execute();
                        stmt.close();
                        status = Status.READY;
                    }
                } else {
                    status = Status.BACKOFF;
                }
    
                txn.commit();
            } catch (Throwable t) {
                txn.rollback();
                t.getCause().printStackTrace();
                status = Status.BACKOFF;
            } finally {
                txn.close();
            }
    
            return status;
        }
    
        /**
         * 获取配置文件中指定的参数
         *
         * @param context
         */
        @Override
        public void configure(Context context) {
            mysqlurl = context.getString("mysqlurl");
            username = context.getString("username");
            password = context.getString("password");
            tableName = context.getString("tablename");
        }
    
        @Override
        public synchronized void start() {
            try {
                //初始化数据库连接
                con = DriverManager.getConnection(mysqlurl, username, password);
                super.start();
                System.out.println("finish start");
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    
        @Override
        public synchronized void stop() {
            try {
                con.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
            super.stop();
        }
    
    }
    
    1. 测试

    ① 程序打成jar包,上传jar包到flume的lib目录下

    ② 配置文件准备

    vim mysqlsink.conf
    
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    #配置source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/bigdata/flumeData/data.log
    a1.sources.r1.channels = c1
    
    #配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    #配置sink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.type = com.shockang.study.bigdata.flume.MysqlSink
    a1.sinks.k1.mysqlurl=jdbc:mysql://node1:3306/mysqlsource?useSSL=false
    a1.sinks.k1.username=root
    a1.sinks.k1.password=123456
    a1.sinks.k1.tablename=flume2mysql
    

    ③ 启动flume配置

    flume-ng agent -n a1 -c /opt/bigdata/flume/myconf -f /opt/bigdata/flume/myconf/mysqlsink.conf -Dflume.root.logger=info,console
    

    ④ 最后向文件中添加数据,观察mysql表中的数据

    cs
    上一篇:没有了
    下一篇:没有了