Flume(三)【进阶】
阅读原文时间:2023年07月11日阅读:1

[toc]

一.Flume 数据传输流程

重要组件:

1)Channel选择器(ChannelSelector)

​ ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和**Multiplexing**(多路复用)。

​ ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。

2)SinkProcessor

​ SinkProcessor共有三种类型,分别是DefaultSinkProcessorLoadBalancingSinkProcessorFailoverSinkProcessor

​ DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能。

二.Flume 事务

1.Put 事务流程

​ 将批数据先写入临时缓冲区 putList,检查 channel 内存队列是否足够合并,channel 内存队列空间不足,回滚数据。

2.Take 事务流程

​ 将数据取到临时缓冲区 takeList,并将数据发送到 HDFS,如果数据全部发送成功,则清除临时缓冲区 takeList数据,发送过程中如果出现异常,rollback 将临时缓冲区 takeList 中的数据归还给 channel 内存队列。

三.Flume 拓扑结构和案例实操

1.简单串联

​ 这种模式是将多个flume顺序连接起来了,从最初的source开始到最终sink传送的目的存储系统。此模式不建议桥接过多的flume数量, flume数量过多不仅会影响传输速率,而且一旦传输过程中某个节点flume宕机,会影响整个传输系统。

2.复制和多路复用

​ Flume支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel中,或者将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。

案例

需求:使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem.

需求分析

步骤

1)准备工作

在/opt/module/flume/job目录下创建group1文件夹

[atguigu@hadoop102 job]$ cd group1/

在/opt/module/datas/目录下创建flume3文件夹

[atguigu@hadoop102 datas]$ mkdir flume3

2)编写Flume Agent配置文件

Flume1(hadoop102)

在hadoop102上/opt/module/flume/job/group1创建flume-exec-arvo.conf配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/testdata/test_file1.txt
a1.sources.r1.shell = /bin/bash -c

# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating

# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4142

# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

Flume2(hadoop103)

在hadoop103上/opt/module/flume/job/group1创建flume-arvo-hdfs.conf配置文件

添加以下内容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source端的avro是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9820/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3(hadoop104)

在hadoop104上/opt/module/flume/job/group1创建flume-arvo-fileroll.conf配置文件

配置上级Flume输出的Source,输出是到本地目录的Sink。

注意:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。

添加以下内容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
# source端的avro是一个数据接收服务
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/datas/flume3

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

3)启动flume agent

依次启动flume,先启下游,再启上游

flume-arvo-filerool-->flume-arvo-hdfsk-->flume-exec-arvo

4)检查hdfs数据和本地文件的数据

3.负载均衡和故障转移

​ Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。

案例

需求:使用Flume1(hadoop102)监控一个端口,其sink组中的sink分别对接Flume2(hadoop103)和Flume3(hadoop104),分别采用Load balancing Sink Processor实现负载均衡,FailoverSinkProcessor实现故障转移的功能

需求分析

步骤

1)flume1(hadoop102)

在job下新建group2文件夹,新建flume-netstat-arvo.conf

# Name the components on this agent( 描述这个Agent,给各个组件取名字)
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/testdata/3.txt
a1.sources.r1.shell = /bin/bash -c
###########################################配置为负载均衡(failover)
#指定类型为故障转移, 启动k1的权重为5,k2为10,k2启动为active,k1备用
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
#发送的目的主机ip
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
#发送的目的主机ip
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4141

# Describe the channel
#channel的类型为memory或者file
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
#1个source,2个channel
a1.sources.r1.channels = c1
#sink组
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
  1. flume2(hadoop103)

在job下新建group2文件夹,新建flume-avro-logger.conf

# Name the components on this agent( 描述这个Agent,给各个组件取名字)
a2.sources = r1
a2.channels = c1
a2.sinks = k1

# Describe/configure the source
# source端的avro是一个数据接收服务
a2.sources.r1.type = avro
#接收的主机
a2.sources.r1.bind = hadoop103
#要和上级的avro的sink的端口一致
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel
#channel的类型为memory或者file
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
#1个source,2个channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
  1. flume3(hadoop104)

在job下新建group2文件夹,新建flume-avro-logger.conf

# Name the components on this agent( 描述这个Agent,给各个组件取名字)
a3.sources = r1
a3.channels = c1
a3.sinks = k1

# Describe/configure the source
# source端的avro是一个数据接收服务
a3.sources.r1.type = avro
#接收的主机
a3.sources.r1.bind = hadoop104
#要和上级的avro的sink的端口一致
a3.sources.r1.port = 4141

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
#channel的类型为memory或者file
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
#1个source,2个channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

4)分别启动flume2,flume3,最后启动fulme1

#flume2
bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-avro-logger.conf -Dflume.root.logger=INFO,console
#flume3
bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-avro-logger.conf -Dflume.root.logger=INFO,console
#flume1
bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-avro-logger.conf

5)观察现象,向flume1发数据,数据只会发往flume3,flume2没有数据;

​ 然后kill掉flume3,继续往flume1发数据,flume2会收到数据

4.聚合

案例

需求:Flume1(hadoop102)与Flume2(hadoop103)将数据发送给Flume3(hadoop104),Flume3将最终数据打印到控制台。

需求分析

1.单source

1)flume1(hadoop102)

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/testdata/3.txt
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2)flume2(hadoop103)

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = exec
a2.sources.r1.command = tail -F /opt/module/testdata/3.txt
a2.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一个数据发送者
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

3)flume3(hadoop104)

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
2.多source

只需要更改flume3,增加一个source, 需要注意2个source的端口不能一样, flume1,flume2分别对接这个2个端口

# Name the components on this agent
a3.sources = r1 r2
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141
a3.sources.r2.type = avro
a3.sources.r2.bind = hadoop104
a3.sources.r2.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sources.r2.channels = c1
a3.sinks.k1.channel = c1

四.自定义Interceptor

​ 在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到Flume拓扑结构中的Multiplexing(**多路复用)**结构,结合channel选择器根据event中Header的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的value赋予不同的值。

案例

需求:判断消息boby中是否含有“hello”,有发往hadoop103,并打印出来;其余发往hadoop104;

需求分析

1)创建模块

2)引入依赖

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
</dependency>

3)自定义interceptor,实现Interceptor接口

package com.bigdata.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TypeInterceptor implements Interceptor {

    public List<Event> addEventList;

    @Override
    public void initialize() {
     addEventList=new ArrayList<>();
    }

    @Override
    // 单个处理event
    public Event intercept(Event event) {
        // 1. 获取event的头信息
        Map<String, String> headers = event.getHeaders();
        // 2. 获取event的里面的身体信息
        String body = new String(event.getBody());
        // 3. 判断 body里面是否包含hello 来决定加头信息
        if (body.contains("hello")){
            // 4. 如果包含,在头信息加上值为hello的键值对
            headers.put("type","hello");
        }else {
            // 5. 如果不包含,在头信息加上值为nohello的键值对
            headers.put("type","nohello");
        }
        return event;
    }

    @Override
    //批量处理event
    public List<Event> intercept(List<Event> events) {
        // 清空事件集合
        addEventList.clear();
        // for添加到集合中
        for (Event event : events) {
            addEventList.add(intercept(event));
        }
        return addEventList;
    }

    @Override
    public void close() {

    }
    public static class Builder implements Interceptor.Builder{
        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }

        @Override
        // 获取或设定配置信息
        public void configure(Context context) {

        }
    }

4)打jar包,上传至/opt/module/kafka/lib目录下

5)flume1的配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#自定义过滤器名称
a1.sources.r1.interceptors = i1
#全类名+$Builder
a1.sources.r1.interceptors.i1.type = com.atguigu.interceptor.TypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
#header的key
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.hello = c1
a1.sources.r1.selector.mapping.nohello = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141

a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop104
a1.sinks.k2.port = 4242

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

6)flume2配置文件

a2.sources = r1
a2.sinks = k1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop103
a2.sources.r1.port = 4141

a2.sinks.k1.type = logger

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

a2.sinks.k1.channel = c1
a2.sources.r1.channels = c1

7)flume3配置文件

a3.sources = r1
a3.sinks = k1
a3.channels = c1

a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4242

a3.sinks.k1.type = logger

a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

a3.sinks.k1.channel = c1
a3.sources.r1.channels = c1

8)分别在hadoop103,hadoop104,hadoop102上启动flume进程,注意先后顺序。

9)在hadoop102使用netcat向localhost:44444发送字母和数字。

10)观察hadoop103和hadoop104打印的日志