当前位置 博文首页 > Shockang的博客:Flume 怎样实现数据的断点续传?

    Shockang的博客:Flume 怎样实现数据的断点续传?

    作者:[db:作者] 时间:2021-08-24 13:27

    前言

    本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

    本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系

    正文

    • 当一个flume挂掉之后重启的时候还是可以接着上一次的数据继续收集

    • flume在 1.7 版本之前使用的监控一个文件(source exec)、监控一个目录(source spool dir)都无法直接实现

    • flume在 1.7 版本之后已经集成了该功能

    • 其本质就是记录下每一次消费的位置,把消费信息的位置保存到文件中,后续程序挂掉了再重启的时候,可以接着上一次消费的数据位置继续拉取。

    配置文件

    # source 类型---->taildir
    vim taildir.conf
    
    a1.channels = ch1
    a1.sources = s1
    a1.sinks = hdfs-sink1
    
    #channel
    a1.channels.ch1.type = memory
    a1.channels.ch1.capacity=10000
    a1.channels.ch1.transactionCapacity=500
    
    #source
    a1.sources.s1.channels = ch1
    #监控一个目录下的多个文件新增的内容
    a1.sources.s1.type = taildir
    #通过 json 格式存下每个文件消费的偏移量,避免从头消费
    a1.sources.s1.positionFile = /opt/bigdata/flume/index/taildir_position.json
    a1.sources.s1.filegroups = f1 f2 f3 
    a1.sources.s1.filegroups.f1 = /home/hadoop/taillogs/access.log
    a1.sources.s1.filegroups.f2 = /home/hadoop/taillogs/nginx.log
    a1.sources.s1.filegroups.f3 = /home/hadoop/taillogs/web.log
    a1.sources.s1.headers.f1.headerKey = access
    a1.sources.s1.headers.f2.headerKey = nginx
    a1.sources.s1.headers.f3.headerKey = web
    a1.sources.s1.fileHeader  = true
    
    ##sink
    a1.sinks.hdfs-sink1.channel = ch1
    a1.sinks.hdfs-sink1.type = hdfs
    a1.sinks.hdfs-sink1.hdfs.path =hdfs://node1:9000/demo/data/%{headerKey}
    a1.sinks.hdfs-sink1.hdfs.filePrefix = event_data
    a1.sinks.hdfs-sink1.hdfs.fileSuffix = .log
    a1.sinks.hdfs-sink1.hdfs.rollSize = 1048576
    a1.sinks.hdfs-sink1.hdfs.rollInterval =20
    a1.sinks.hdfs-sink1.hdfs.rollCount = 10
    a1.sinks.hdfs-sink1.hdfs.batchSize = 1500
    a1.sinks.hdfs-sink1.hdfs.round = true
    a1.sinks.hdfs-sink1.hdfs.roundUnit = minute
    a1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25
    a1.sinks.hdfs-sink1.hdfs.fileType =DataStream
    a1.sinks.hdfs-sink1.hdfs.writeFormat = Text
    a1.sinks.hdfs-sink1.hdfs.callTimeout = 60000
    

    运行后生成的 taildir_position.json 文件信息如下:

    [
    {"inode":102626782,"pos":123,"file":"/home/hadoop/taillogs/access.log"},
    {"inode":102626785,"pos":123,"file":"/home/hadoop/taillogs/web.log"},
    {"inode":102626786,"pos":123,"file":"/home/hadoop/taillogs/nginx.log"}
    ]
    

    这里inode就是标记文件的,文件名称改变,这个inode不会变,pos记录偏移量,file就是绝对路径

    cs