一、日志采集:从网络端口接收数据,下沉到logger
文件netcat-logger.conf:
1 # Name the components on this agent 2 #给那三个组件取个名字 3 a1.sources = r1 4 a1.sinks = k1 5 a1.channels = c1 6 7 # Describe/configure the source 8 #类型, 从网络端口接收数据,在本机启动, 所以localhost, type=spoolDir采集目录源,目录里有就采 9 a1.sources.r1.type = netcat10 a1.sources.r1.bind = localhost11 a1.sources.r1.port = 4444412 13 # Describe the sink14 a1.sinks.k1.type = logger15 16 # Use a channel which buffers events in memory17 #下沉的时候是一批一批的, 下沉的时候是一个个eventChannel参数解释:18 #capacity:默认该通道中最大的可以存储的event数量19 #trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量20 a1.channels.c1.type = memory21 a1.channels.c1.capacity = 100022 a1.channels.c1.transactionCapacity = 10023 24 # Bind the source and sink to the channel25 a1.sources.r1.channels = c126 a1.sinks.k1.channel = c1
启动命令:
#告诉flum启动一个agent,指定配置参数, --name:agent的名字,flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console传入数据:
[root@mini03 ~]# telnet localhost 44444Trying ::1...telnet: connect to address ::1: Connection refusedTrying 127.0.0.1...Connected to localhost.Escape character is '^]'.hello world!^H^H^H^H^H^H^H^H^H^H^H^H^H^HOKtianjun2012!OK
控制台看到的数据
2017-05-08 13:41:35,766 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 21 08 08 08 08 hello world!.... }2017-05-08 13:41:40,153 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 74 69 61 6E 6A 75 6E 32 30 31 32 21 0D tianjun2012!. }
二、监视文件夹
启动命令: bin/flume-ng agent -c ./conf -f ./conf/spooldir-logger.conf -n a1 -Dflume.root.logger=INFO,console
测试: 往/home/hadoop/flumespool放文件(mv ././xxxFile /home/hadoop/flumeSpool),但是不要在里面生成文件
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the source#监听目录,spoolDir指定目录, fileHeader要不要给文件夹前坠名a1.sources.r1.type = spooldira1.sources.r1.spoolDir = /home/hadoop/flumespoola1.sources.r1.fileHeader = true# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
三、用tail命令获取数据,下沉到hdfs
1 # Name the components on this agent 2 a1.sources = r1 3 a1.sinks = k1 4 a1.channels = c1 5 6 # Describe/configure the source 7 a1.sources.r1.type = exec 8 a1.sources.r1.command = tail -F /home/hadoop/log/test.log 9 a1.sources.r1.channels = c110 11 # Describe the sink12 a1.sinks.k1.type = hdfs13 a1.sinks.k1.channel = c114 a1.sinks.k1.hdfs.path = hdfs://mini01:9000/flume/events/%y-%m-%d/%H%M/15 a1.sinks.k1.hdfs.filePrefix = events-16 a1.sinks.k1.hdfs.round = true17 a1.sinks.k1.hdfs.roundValue = 1018 a1.sinks.k1.hdfs.roundUnit = minute19 a1.sinks.k1.hdfs.rollInterval = 320 a1.sinks.k1.hdfs.rollSize = 2021 a1.sinks.k1.hdfs.rollCount = 522 a1.sinks.k1.hdfs.batchSize = 123 a1.sinks.k1.hdfs.useLocalTimeStamp = true24 #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本25 a1.sinks.k1.hdfs.fileType = DataStream26 27 28 29 # Use a channel which buffers events in memory30 a1.channels.c1.type = memory31 a1.channels.c1.capacity = 100032 a1.channels.c1.transactionCapacity = 10033 34 # Bind the source and sink to the channel35 a1.sources.r1.channels = c136 a1.sinks.k1.channel = c1
启动命令:
flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1模拟写入日志:
1 [root@mini03 log]# i=1;2 while(( $i<=500000 ));3 do echo $i >> /home/hadoop/log/test.log;4 sleep 0.5; 5 let 'i++';done
查看hdfs上的文件内容
1 [root@mini01 ~]# hdfs dfs -cat /flume/events/17-05-08/1530/* 2 1 3 2 4 3 5 4 6 5 7 6 8 7 9 810 911 1012 1113 1214 1315 1416 1517 1618 1719 1820 1921 20
注意,本例中,为了快速看到效果,这个值都设置比较小,真实情况需要调整
a1.sinks.k1.hdfs.roundValue = 10a1.sinks.k1.hdfs.rollInterval = 3a1.sinks.k1.hdfs.rollSize = 20a1.sinks.k1.hdfs.rollCount = 522 a1.sinks.k1.hdfs.batchSize = 1 下面给一个真实环境中的配置:
agent1.sources = spooldirSource
agent1.channels = fileChannelagent1.sinks = hdfsSinkagent1.sources.spooldirSource.type=spooldir
agent1.sources.spooldirSource.spoolDir=/home/hadoop/logagent1.sources.spooldirSource.channels=fileChannelagent1.sinks.hdfsSink.type=hdfs
agent1.sinks.hdfsSink.hdfs.path=hdfs://mini01:9000/weblog/flume-input/%y-%m-%dagent1.sinks.hdfsSink.hdfs.filePrefix=flume-agent1.sinks.sink1.hdfs.round = true# Number of seconds to wait before rolling current file (0 = never roll based on time interval)agent1.sinks.hdfsSink.hdfs.rollInterval = 3600# File size to trigger roll, in bytes (0: never roll based on file size)agent1.sinks.hdfsSink.hdfs.rollSize = 128000000agent1.sinks.hdfsSink.hdfs.rollCount = 0agent1.sinks.hdfsSink.hdfs.batchSize = 1000#Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
agent1.sinks.hdfsSink.hdfs.roundValue = 1agent1.sinks.hdfsSink.hdfs.roundUnit = minuteagent1.sinks.hdfsSink.hdfs.useLocalTimeStamp = trueagent1.sinks.hdfsSink.channel=fileChannelagent1.sinks.hdfsSink.hdfs.fileType = DataStream agent1.channels.fileChannel.type = fileagent1.channels.fileChannel.checkpointDir=/tmp/flume/flume-bineckpointagent1.channels.fileChannel.dataDirs=/tmp/flume/dataDir
bin/flume-ng agent --conf ./conf/ -f conf/spooldir-hdfs.conf -Dflume.root.logger=DEBUG,console -n agent1 > log.log 2>&1 &