APache Flume官网:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#memory-channel
目录
flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。
Flume 运行的核心是 Agent。Flume以agent为最小的独立运行单位。一个agent就是一个JVM。
Flume 的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据
Flume 传输的数据的基本单位是 Event,如果是文本文件,通常是一行记录,这也是事务的基本单位。 Event 从 Source,流向 Channel,再到 Sink,本身为一个 byte 数组,并可携带 headers 信息。 Event 代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。
值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。
比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个agent可以协同工作。
Flume 使用事务性的方式保证传送Event整个过程的可靠性。 Sink 必须在Event 已经被传达到下一站agent里,又或者,已经被存入外部数据目的地之后,才能把 Event 从 Channel 中 remove 掉。这样数据流里的 event 无论是在一个 agent 里还是多个 agent 之间流转,都能保证可靠,因为以上的事务保证了 event 会被成功存储起来。比如 Flume支持在本地保存一份channel文件作为备份,而memory channel 将event存在内存 queue 里,速度快,但丢失的话无法恢复。
这种情况应用的场景比较多,比如要收集Web网站的用户行为日志, Web网站为了可用性使用的负载集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /usr/local/soft/
在环境变量中增加如下命令,可以使用 soft 快速切换到 /usr/local/soft
alias soft='cd /usr/local/soft/'
mv apache-flume-1.9.0-bin/ flume-1.9.0
vim /etc/profile
source /etc/profile
flume-ng version
[root@master soft]# flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
[root@master soft]#
监控一个目录,将数据打印出来
新建目录
mkdir /usr/local/data/flume/
配置文件
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data/
a1.sources.r1.fileSuffix = .zyl
a1.sources.r1.fileHeader = true
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = \d{3,6}
a1.sources.r1.interceptors.i1.excludeEvents = true
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动agent
flume-ng agent -n a1 -f ./flumetest -Dflume.root.logger=DEBUG,console
在/usr/local/data/flume/目录下新建文件,输入内容,观察flume进程打印的日志
vim 1
spoolingToHDFS.conf
配置文件
a.sources = r1
a.sinks = k1
a.channels = c1
#指定spooldir的属性
a.sources.r1.type = spooldir
a.sources.r1.spoolDir = /usr/local/data/flume/
a.sources.r1.fileHeader = true
a.sources.r1.interceptors = i1
a.sources.r1.interceptors.i1.type = timestamp
#指定sink的类型
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = /flume/data/dir1
a.sinks.k1.hdfs.filePrefix = student
a.sinks.k1.hdfs.rollSaize = 102400
a.sinks.k1.hdfs.rollCount = 1000
a.sinks.k1.hdfs.fileType = DataStream
a.sinks.k1.hdfs.writeFormat = text
a.sinks.k1.hdfs.fileSuffix = .txt
#指定channel
a.channels.c1.type = memory
#存储在通道中的最大事件数
a.channels.c1.capacity = 1000
a.channels.c1.transactionCapacity = 100
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
在 /usr/local/data/flume/目录下准备数据
The Zen of Python, by Tim Peters
Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one-- and preferably only one --obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better than right now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea -- let's do more of those!
启动agent
flume-ng agent -n a -f ./spoolingToHDFS.conf -Dflume.root.logger=DEBUG,console
hbaseLogToHDFS
配置文件
a.sources = r1
a.sinks = k1
a.channels = c1
#指定exec的属性
a.sources.r1.type = exec
a.sources.r1.command = tail -f /usr/local/soft/hbase-1.4.6/logs/hbase-root-master-master.log
#指定sink的类型
a.sinks.k1.type = hdfs
a.sinks.k1.hdfs.path = /flume/data/dir2
a.sinks.k1.hdfs.filePrefix = hbaselog
a.sinks.k1.hdfs.rollSize = 102400
a.sinks.k1.hdfs.rollCount = 1000
a.sinks.k1.hdfs.fileType = DataStream
a.sinks.k1.hdfs.writeFormat = text
a.sinks.k1.hdfs.fileSuffix = .txt
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
a.channels.c1.transactionCapacity = 100
a.sources.r1.channels = c1
a.sinks.k1.channel = c1z
hbaselogToHBase
在hbase中创建log表
create 'log','cf1'
配置文件
a.sources = r1
a.sinks = k1
a.channels = c1
#指定exec的属性
a.sources.r1.type = exec
a.sources.r1.command = cat /usr/local/soft/hbase-1.4.6/logs/hbase-root-master-master.log
#指定sink的类型
a.sinks.k1.type = hbase
a.sinks.k1.table = log
a.sinks.k1.columnFamily = cf1
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 100000
a.channels.c1.transactionCapacity = 100
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
netcatLogger(TCP)
监听telnet端口
安装telnet
yum install telnet
配置文件
a.sources = r1
a.sinks = k1
a.channels = c1
#指定netcat的属性
a.sources.r1.type = netcat
#0.0.0.0表示所有地址
a.sources.r1.bind = 0.0.0.0
a.sources.r1.port = 8888
#指定sink的类型
a.sinks.k1.type = logger
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
a.channels.c1.transactionCapacity = 100
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
启动
先启动agent
flume-ng agent -n a -f ./netcatToLogger.conf -Dflume.root.logger=DEBUG,console
在启动telnet,测试主机的连通性
telnet master 8888
httpToLogger(Http)
配置文件
a.sources = r1
a.sinks = k1
a.channels = c1
#指定http的属性
a.sources.r1.type = http
a.sources.r1.port = 6666
#指定sink的类型
a.sinks.k1.type = logger
#指定channel
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
a.channels.c1.transactionCapacity = 100
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
启动
先启动agent
flume-ng agent -n a -f ./httpToLogger.conf -Dflume.root.logger=DEBUG,console
再使用curl发起一个http请求
curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"},{"headers" :{"a2" : "a11","b2" : "b12"},"body" : "hello2~http2~flume2~"}]' http://master:6666
手机扫一扫
移动阅读更方便
你可能感兴趣的文章