- Flume 개요
- Flume Architecture
- Source/Channel/Sink 종류
- CentOS에서 Flume 설치
- 사전 준비 사항
- 설치
- 서비스 확인
- Flume 매뉴얼
- Flume conf 개요
- Flume Source
- Flume Channel
- Flume Sink
- Avro로 agent 연결
- Monitor
- Flume Agent 사례
- exec -> memory -> avro
- avro -> memory -> file_roll
- exec -> memory -> null
- Flume 개발
- 로그 데이터 구조
- 기술지원
- Flume 구성
- 참고 문헌
분산 환경에서 대량의 스트리밍 데이터(로그 등)를 효율적으로 수집하는 오픈소스인 Flume을 정리 합니다.
라이선스 : Apache 2.0
플랫폼 : Java
Flume 개요
분산 환경에서 대량의 스트리밍 데이터(로그 등)를 효율적으로 수집하는 오픈소스
Flume Architecture
{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center"
|-
|width="30%" align="center" valign="middle" style="background-color:#eee;"|Source
|width="70%" align="left" valign="middle"|
데이터를 수집 합니다. align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" 수집한 데이터를 변경 또는 삭제 합니다.
종류 - 삽입 : Timestamp, Host, Static, UUID
종류 - 변형/삭제 : Morphline, Regex Filtering, Regex Extractor align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" Source에서 Channel로 연동시 Channel을 지정 합니다.
종류 : Replicating (Default), Multiplexing, Custom align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" 데이터를 Source에서 Sink로 전달하는 통로 align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" 데이터 저장, 전달 합니다. align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" Sink할 대상을 다중 선택 합니다.
종류 : Default, Failover, Loadbalancing, Custom
Sink Group : 여러개의 Sink를 하나의 그룹으로 관리
|}
{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center"
|-
|width="30%" align="center" valign="middle" style="background-color:#eee;"|Serializer / Deserializer
|width="70%" align="left" valign="middle"|
Serializer : Body Text, Avro Event
Deserializer : LINE, AVRO, BlobDeserializer align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" JSONHandler, BlobHandler align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" - align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" Log4J Appender, Load Balancing Log4J Appender align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" Ganglia, JSON, Custom align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" $FLUME_HOME/plugins.d/플러그인명/
$FLUME_HOME/plugins.d/플러그인명/lib/~.jar
$FLUME_HOME/plugins.d/플러그인명/libext/~.jar
$FLUME_HOME/plugins.d/플러그인명/native/~.so
|}
Source/Channel/Sink 종류
{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center"
|-
|width="35%" align="center" valign="middle" style="background-color:#eee;"|Source
|width="30%" align="center" valign="middle" style="background-color:#eee;"|Channel
|width="35%" align="center" valign="middle" style="background-color:#eee;"|Sink
|-
|align="left" valign="middle"|Avro : Avro 프로토콜로 수집
|align="left" valign="middle"|Memory : Memory 사용
|align="left" valign="middle"|Avro : Avro 프로토콜로 전송
|-
|align="left" valign="middle"|Thrift : Thrift 프로토콜로 수집
|align="left" valign="middle"|JDBC : DB 사용
|align="left" valign="middle"|Thrift : Thrift 프로토콜로 전송
|-
|align="left" valign="middle"|
Syslog : Syslog 프로토콜로 수집
Syslog TCP, Multiport Syslog TCP
Syslog UDP
|align="left" valign="middle"|File : File 사용
|align="left" valign="middle"|IRC : IRC 프로토콜로 전송
|-
|align="left" valign="middle"|HTTP : HTTP 프로토콜로 수집 |align="left" valign="middle"| |align="left" valign="middle"|ElasticSearch : Elastic에 저장 |- |align="left" valign="middle"| JMS : JMS 프로토콜로 수집Pluggable converter 지원
|align="left" valign="middle"|
|align="left" valign="middle"|MorphlineSolr : Solr에 저장
|-
|align="left" valign="middle"|NetCat : TCP/IP 데이터 수집 |align="left" valign="middle"| |align="left" valign="middle"|HDFS : HDFS에 저장 |- |align="left" valign="middle"|Exec : Linux 명령어로 수집 |align="left" valign="middle"| |align="left" valign="middle"| HBase : HBase에 저장HBaseSink, AsyncHBaseSink align="left" valign="middle" align="left" valign="middle" align="left" valign="middle" - align="left" valign="middle" align="left" valign="middle" align="left" valign="middle" - align="left" valign="middle" Legacy : 이전 버전의 Flume으로부터 데이터 수집 Avro Legacy, Thrift legacy
|align="left" valign="middle"|
|align="left" valign="middle"|Null : 아무 일도 하지 않음
|-
|align="left" valign="middle"|Custom |align="left" valign="middle"|Custom |align="left" valign="middle"|Custom |}
CentOS에서 Flume 설치
사전 준비 사항
설치
Apache Flume을 다운로드 합니다.
wget http://apache.tt.co.kr/flume/1.4.0/apache-flume-1.4.0-bin.tar.gz
tar zxvf apache-flume-1.4.0-bin.tar.gz
chown -R root:root apache-flume-1.4.0-bin
mv apache-flume-1.4.0-bin /nas/appl/flume
cd /nas/appl/flume/conf
cp flume-conf.properties.template flume-conf.properties
cp flume-env.sh.template flume-env.sh
chmod 755 flume-env.sh
vi /nas/appl/flume/conf/log4j.properties
flume.log.dir=/nas/appl/flume/logs
vi ~/.bashrc
export FLUME_HOME=/nas/appl/flume
export PATH=$PATH:$FLUME_HOME/bin
버전 확인
flume-ng version
Flume을 설치하면 Avro jar 파일이 아래와 같이 존재 합니다.
lib/avro-1.7.2.jar
lib/avro-ipc-1.7.2.jar
서비스 확인
파일 저장 폴더 생성
mkdir /nas/appl/flume/storage
mkdir /nas/appl/flume/storage/checkpoint
mkdir /nas/appl/flume/storage/data
mkdir /nas/appl/flume/storage/file
vi conf/flume-conf.properties
agent_001.sources=source-001
agent_001.sources.source-001.channels=channel-001
agent_001.sources.source-001.type = exec
agent_001.sources.source-001.command = /bin/cat /nas/appl/flume/conf/flume-conf.properties
agent_001.channels=channel-001
agent_001.channels.channel-001.type = file
agent_001.channels.channel-001.checkpointDir = /nas/appl/flume/storage/checkpoint
agent_001.channels.channel-001.dataDirs = /nas/appl/flume/storage/data
#agent_001.channels.channel-001.type = memory
#agent_001.channels.channel-001.capacity = 1000
#agent_001.channels.channel-001.transactionCapacity = 100
agent_001.sinks=sink-001
agent_001.sinks.sink-001.channel=channel-001
agent_001.sinks.sink-001.type = file_roll
agent_001.sinks.sink-001.sink.directory = /nas/appl/flume/storage/file
Agent 실행
flume-ng agent -n agent_001 -c /nas/appl/flume/conf -f /nas/appl/flume/conf/flume-conf.properties
-Dflume.monitoring.type=http -Dflume.monitoring.port=41414
Agent 동작 확인
http://localhost:41414/ 사이트에 접속하여 확인 합니다.
/nas/appl/flume/logs/flume.log 로그 파일에 오류가 있는지 확인 합니다.
/nas/appl/flume/storage/file/ 폴더에 파일이 생성 되었는지 확인 합니다.
Flume 매뉴얼
Flume conf 개요
Flume 도움말
flume-ng help
Agent 실행
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
-Dflume.monitoring.type=http -Dflume.monitoring.port=41414
avro-client
flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
vi /nas/appl/flume/conf/flume-conf.properties
agent_001.sources = source-001
agent_001.channels = channel-001
agent_001.sinks = sink-001
agent_001.sources.source-001.channels = channel-001
agent_001.sinks.sink-001.channel = channel-001
Flume Channel Selector
agent_001.sources.$source.selector.type = replicating, multiplexing, com.jopenbusiness.flume.selector001
Flume Interceptor
Interceptor : host, timestamp, static, regex_filter
agent_001.sources.$source.interceptors = i1 i2
agent_001.sources.$source.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent_001.sources.$source.interceptors.i1.preserveExisting = false
agent_001.sources.$source.interceptors.i1.hostHeader = hostname
agent_001.sources.$source.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
regex_filter interceptor
agent_001.sources.$source.interceptors.i1.regex = (\\d):(\\d):(\\d)
agent_001.sources.$source.interceptors.i1.serializers = s1 s2 s3
agent_001.sources.$source.interceptors.i1.serializers.s1.name = one
agent_001.sources.$source.interceptors.i1.serializers.s2.name = two
agent_001.sources.$source.interceptors.i1.serializers.s3.name = three
Event Serializers
serializer : text, avro_event
agent_001.sinks.$sink.sink.serializer = text
agent_001.sinks.$sink.sink.serializer.appendNewline = false
Flume Sink Processor
agent_001.sinks.$sink.processor.type = default, failover, load_balance, com.jopenbusiness.flume.processor001
failover sik processor
agent_001.sinkgroups = group001
agent_001.sinkgroups.group001.sinks = sink001 sink002
agent_001.sinkgroups.group001.type = failover
agent_001.sinkgroups.group001.priority.sink001 = 5
agent_001.sinkgroups.group001.priority.sink002 = 10
agent_001.sinkgroups.group001.maxpenalty = 10000
Flume Source
source 종류
exec : cat. 파일 내용, tail. 추가되는 파일 내용
hdfs
avro, thrift, memory
jms, spooldir, netcat, seq, http,
syslogtcp, multiport_syslogtcp, syslogudp
org.apache.flume.source.avroLegacy.AvroLegacySource
org.apache.flume.source.thriftLegacy.ThriftLegacySource
org.apache.flume.source.scribe.ScribeSource
custom source
agent_001.sources.$source.type = com.jopenbusiness.flume.soure001
hdfs source
agent_001.sources.$source.type = hdfs
agent_001.sources.$source.bind = hdfs://namenode/user/flume/weblog
exec source
agent_001.sources.$source.type = exec
agent_001.sources.$source.command = /bin/bash -c
agent_001.sources.$source.shell = /bin/bash -c
agent_001.sources.$source.restartThrottle = 10000
agent_001.sources.$source.restart = false
agent_001.sources.$source.logStdErr = false
agent_001.sources.$source.batchSize = 1000
agent_001.sources.$source.selector.type = replicating 또는 multiplexing
agent_001.sources.$source.selector.* = ~
agent_001.sources.$source.interceptors = 1000
agent_001.sources.$source.interceptors.* = ~
agent_001.sources.$source.type = exec
agent_001.sources.$source.command = tail -f ~
agent_001.sources.$source.logStdErr = true
agent_001.sources.$source.restart = true
agent_001.sources.$source.type = exec
agent_001.sources.$source.command = for i in /path/*.txt; do cat $i; done
agent_001.sources.$source.shell = /bin/bash -c
avro source
agent_001.sources.$source.type = avro
agent_001.sources.$source.bind = 0.0.0.0
agent_001.sources.$source.port = 10000
thrift source
agent_001.sources.$source.type = thrift
agent_001.sources.$source.bind = 0.0.0.0
agent_001.sources.$source.port = 4141
memory source
agent_001.sources.$source.type = memory
agent_001.sources.$source.capacity = 1000
agent_001.sources.$source.transactionCapacity = 100
multiplexing selector
agent_001.sources.$source.selector.type = multiplexing
agent_001.sources.$source.selector.header =
agent_001.sources.$source.selector.mapping. =
agent_001.sources.$source.selector.mapping. =
agent_001.sources.$source.selector.mapping. =
Flume Channel
Channel 종류
memory, file
jdbc
org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel
org.apache.flume.channel.PseudoTxnMemoryChannel
Custom channel
agent_001.channels.$channel.type = com.jopenbusiness.flume.channel001
memory channel
agent_001.channels.$channel.type = memory
agent_001.channels.$channel.capacity = 1000
file channel
mkdir /nas/appl/flume/storage
mkdir /nas/appl/flume/storage/checkpoint
mkdir /nas/appl/flume/storage/data
agent_001.channels.$channel.type = file
agent_001.channels.$channel.checkpointDir = /nas/appl/flume/storage/checkpoint
agent_001.channels.$channel.dataDirs = /nas/appl/flume/storage/data
Flume Sink
sink 종류
hdfs
file_roll
logger, avro, thrift, irc, null
hbase, asynchbase
org.apache.flume.sink.elasticsearch.ElasticSearchSink
Custom sink
agent_001.sinks.$sink.type = com.jopenbusiness.flume.sink001
hdfs sink
hadoop.jar 필요
agent_001.sinks.$sink.type = hdfs
agent_001.sinks.$sink.hdfs.path= hdfs://namenode:9000/user/flume/weblog/%y%m%d
agent_001.sinks.$sink.hdfs.filePrefix = http-
agent_001.sinks.$sink.hdfs.fileSuffix = .log
agent_001.sinks.$sink.hdfs.round = true
agent_001.sinks.$sink.hdfs.roundValue = 10
agent_001.sinks.$sink.hdfs.roundUnit = minute
file_roll sink
mkdir /nas/appl/flume/storage
mkdir /nas/appl/flume/storage/file
agent_001.sinks.$sink.type = file_roll
agent_001.sinks.$sink.sink.directory = /nas/appl/flume/storage/file
avro sink
agent_001.sinks.$sink.type = avro
agent_001.sinks.$sink.hostname = localhost
agent_001.sinks.$sink.port = 4545
logger sink
agent_001.sinks.$sink.type = logger
Avro로 agent 연결
agent001 -> agent002 연결
agent_001.sinks.$sink.type = avro
agent_001.sinks.$sink.hostname = hostForAgent002
agent_001.sinks.$sink.port = 41414
agent_001.sinks.$sink.batch-size = 100
agent_001.sinks.$sink.runner.type = polling
agent_001.sinks.$sink.runner.polling.interval = 10
//--- Collector에서 port를 bind 합니다.
agent_002.sources.$source.type = avro
agent_002.sources.$source.bind = hostForAgent002
agent_002.sources.$source.port = 41414
Monitor
ganglia reporting
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
JSON Reporting
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
-Dflume.monitoring.type=http -Dflume.monitoring.port=41414
Custom reporting
flume-ng agent -n $agent_name -c conf -f flume-conf.properties
-Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
Flume Agent 사례
exec -> memory -> avro
flume-ng agent -n agent_001 -f /nas/appl/flume/conf/flume-conf.properties
{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center"
|-
|width="20%" align="center" valign="middle" style="background-color:#eee;"|Event 정보
|width="80%" align="left" valign="middle"|Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초 align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle"
agent_001.sources = source-001
agent_001.sources.source-001.channels = channel-001
agent_001.sources.source-001.type = exec
agent_001.sources.source-001.command = /cloudnas/develop/tail_00001.bash
agent_001.sources.source-001.restartThrottle = 1000
agent_001.sources.source-001.restart = true
agent_001.sources.source-001.logStdErr = false
agent_001.sources.source-001.batchSize = 100
restartThrottle : 비정상 종료시 재시작 시간 (ms)
batchSize : Transaction당 event 수 align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle"
agent_001.channels = channel-001
agent_001.channels.channel-001.type = memory
agent_001.channels.channel-001.capacity = 240000
agent_001.channels.channel-001.transactionCapactiy = 100
agent_001.channels.channel-001.keep-alive = 3
agent_001.channels.channel-001.byteCapacityBufferPercentage = 20
agent_001.channels.channel-001.byteCapacity = 24960000
capacity : 채널에 저장 가능한 event의 최대 개수 (Count * 100)
transactionCapacity : 한 transaction당 수신/송신할 수 있는 최대 event 개수
byteCapacity : 저장 가능한 bytes 수 (capacity * Size * 2) align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle"
agent_001.sinks = sink-001
agent_001.sinks.sink-001.channel = channel-001
agent_001.sinks.sink-001.type = avro
agent_001.sinks.sink-001.hostname = 192.168.56.201
agent_001.sinks.sink-001.port = 4141
agent_001.sinks.sink-001.batch-size = 100
agent_001.sinks.sink-001.connect-timeout = 20000
agent_001.sinks.sink-001.request-timeout = 20000
batch-size : Transaction당 event 수
|}
avro -> memory -> file_roll
flume-ng agent -n agent_002 -f /nas/appl/flume/conf/flume-conf.properties
{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center"
|-
|width="20%" align="center" valign="middle" style="background-color:#eee;"|Event 정보
|width="80%" align="left" valign="middle"|Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초 align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle"
agent_002.sources = source-002
agent_002.sources.source-002.channels = channel-002
agent_002.sources.source-002.type = avro
agent_002.sources.source-002.bind = 0.0.0.0
agent_002.sources.source-002.port = 4141
|-
|align="center" valign="middle" style="background-color:#eee;"|Channel
(memory)
|align="left" valign="middle"|생략
|-
|align="center" valign="middle" style="background-color:#eee;"|Sink
(file_roll)
|align="left" valign="middle"|
agent_002.sinks = sink-002
agent_002.sinks.sink-002.channel = channel-002
agent_002.sinks.sink-002.type = file_roll
agent_002.sinks.sink-002.sink.directory = /cloudnas/develop/output/fileRoll
agent_002.sinks.sink-002.sink.rollInterval = 30
agent_002.sinks.sink-002.batchSize = 100
batchSize : Transaction당 event 수
|}
exec -> memory -> null
flume-ng agent -n agent_001 -f /nas/appl/flume/conf/flume-conf.properties
{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center"
|-
|width="20%" align="center" valign="middle" style="background-color:#eee;"|Event 정보
|width="80%" align="left" valign="middle"|Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초 align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" - align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" - align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle"
agent_001.sinks = sink-001
agent_001.sinks.sink-001.channel = channel-001
agent_001.sinks.sink-001.type = null
agent_001.sinks.sink-001.batchSize = 100
batchSize : Transaction당 event 수
|}
Flume 개발
Flume Library
lib/flume-ng-configuration-1.4.0.jar
lib/flume-ng-sdk-1.4.0.jar
lib/flume-ng-core-1.4.0.jar
{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center"
|-
|width="30%" align="center" valign="middle" style="background-color:#eee;"|Source
|width="70%" align="left" valign="middle"|
org.apache.flume.Source
구현 : org.apache.flumen.source.ExecSource align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" org.apache.flume.interceptor.Interceptor
구현 : org.apache.flume.interceptor.TimestampInterceptor$Builder
agent_001.sources.source-001.interceptors = ConversionInterceptor
agent_001.sources.source-001.interceptors.ConversionInterceptor.type =
com.jopenbusiness.hadoop.flume.interceptor.ConversionInterceptor$Builder
|-
|align="center" valign="middle" style="background-color:#eee;"|Channel Selector
|align="left" valign="middle"|
org.apache.flume.ChannelSelector
구현 : org.apache.flume.channel.MultiplexingChannelSelector align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" org.apache.flume.Channel
구현 : org.apache.flume.channel.MemoryChannel align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" org.apache.flume.Sink
구현 : org.apache.flume.sink.LoggerSink align="center" valign="middle" style="background-color:#eee;" align="left" valign="middle" org.apache.flume.SinkProcessor
구현 : org.apache.flume.sink.LoadBalancingSinkProcessor
|}Component
Event
Source
Sink
Channel
Source and Sink Runners
Agent
Configuration Provider
Client
Physical node
Logical node
로그 데이터 구조
로그 데이터
ymd string : 년월일
hms string : 시분초
severity string : 심각도 (ERROR, WARNING, INFO, ...)
server string
process_id int
message string
기술지원
오류 : org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel
Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count
Flume 구성
참고 문헌
Streaming data into Apache HBase using Apache Flume, 2012.12
http://blog.sematext.com/2011/07/28/flume-and-hbase-integration/
분류: BigData