상단

분산 환경에서 대량의 스트리밍 데이터(로그 등)를 효율적으로 수집하는 오픈소스인 Flume을 정리 합니다.

 
 
 

Flume 개요


분산 환경에서 대량의 스트리밍 데이터(로그 등)를 효율적으로 수집하는 오픈소스

 

Flume Architecture

700px 
700px

{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" 
|- |width="30%" align="center" valign="middle" style="background-color:#eee;"|Source |width="70%" align="left" valign="middle"|

  • 데이터를 수집 합니다.
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
  • 수집한 데이터를 변경 또는 삭제 합니다.

  • 종류 - 삽입 : Timestamp, Host, Static, UUID

  • 종류 - 변형/삭제 : Morphline, Regex Filtering, Regex Extractor
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
  • Source에서 Channel로 연동시 Channel을 지정 합니다.

  • 종류 : Replicating (Default), Multiplexing, Custom
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
  • 데이터를 Source에서 Sink로 전달하는 통로
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
  • 데이터 저장, 전달 합니다.
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
  • Sink할 대상을 다중 선택 합니다.

  • 종류 : Default, Failover, Loadbalancing, Custom

  • Sink Group : 여러개의 Sink를 하나의 그룹으로 관리
    |}

 
 

{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" 
|- |width="30%" align="center" valign="middle" style="background-color:#eee;"|Serializer / Deserializer |width="70%" align="left" valign="middle"|

  • Serializer : Body Text, Avro Event

  • Deserializer : LINE, AVRO, BlobDeserializer
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
  • JSONHandler, BlobHandler
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
    -
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
  • Log4J Appender, Load Balancing Log4J Appender
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
  • Ganglia, JSON, Custom
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
  • $FLUME_HOME/plugins.d/플러그인명/

  • $FLUME_HOME/plugins.d/플러그인명/lib/~.jar

  • $FLUME_HOME/plugins.d/플러그인명/libext/~.jar

  • $FLUME_HOME/plugins.d/플러그인명/native/~.so
    |}

 

Source/Channel/Sink 종류

{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" 
|- |width="35%" align="center" valign="middle" style="background-color:#eee;"|Source |width="30%" align="center" valign="middle" style="background-color:#eee;"|Channel

|width="35%" align="center" valign="middle" style="background-color:#eee;"|Sink 
|- |align="left" valign="middle"|Avro : Avro 프로토콜로 수집 |align="left" valign="middle"|Memory : Memory 사용

|align="left" valign="middle"|Avro : Avro 프로토콜로 전송 
|- |align="left" valign="middle"|Thrift : Thrift 프로토콜로 수집 |align="left" valign="middle"|JDBC : DB 사용

|align="left" valign="middle"|Thrift : Thrift 프로토콜로 전송 
|- |align="left" valign="middle"| Syslog : Syslog 프로토콜로 수집

  • Syslog TCP, Multiport Syslog TCP

  • Syslog UDP
    |align="left" valign="middle"|File : File 사용 
    |align="left" valign="middle"|IRC : IRC 프로토콜로 전송 
    |- 
    |align="left" valign="middle"|HTTP : HTTP 프로토콜로 수집 |align="left" valign="middle"| |align="left" valign="middle"|ElasticSearch : Elastic에 저장 |- |align="left" valign="middle"| JMS : JMS 프로토콜로 수집

  • Pluggable converter 지원
    |align="left" valign="middle"| 
    |align="left" valign="middle"|MorphlineSolr : Solr에 저장 
    |- 
    |align="left" valign="middle"|NetCat : TCP/IP 데이터 수집 |align="left" valign="middle"| |align="left" valign="middle"|HDFS : HDFS에 저장 |- |align="left" valign="middle"|Exec : Linux 명령어로 수집 |align="left" valign="middle"| |align="left" valign="middle"| HBase : HBase에 저장

  • HBaseSink, AsyncHBaseSink
    align="left" valign="middle"
    align="left" valign="middle"
    align="left" valign="middle"
    -
    align="left" valign="middle"
    align="left" valign="middle"
    align="left" valign="middle"
    -
    align="left" valign="middle"
    Legacy : 이전 버전의 Flume으로부터 데이터 수집
  • Avro Legacy, Thrift legacy
    |align="left" valign="middle"| 
    |align="left" valign="middle"|Null : 아무 일도 하지 않음 
    |- 
    |align="left" valign="middle"|Custom |align="left" valign="middle"|Custom |align="left" valign="middle"|Custom |}

 

CentOS에서 Flume 설치


사전 준비 사항

설치

  • Apache Flume을 다운로드 합니다.

 
 wget http://apache.tt.co.kr/flume/1.4.0/apache-flume-1.4.0-bin.tar.gz
 tar zxvf apache-flume-1.4.0-bin.tar.gz
 chown -R root:root apache-flume-1.4.0-bin
 mv apache-flume-1.4.0-bin /nas/appl/flume
 
 cd /nas/appl/flume/conf
 cp flume-conf.properties.template flume-conf.properties
 cp flume-env.sh.template flume-env.sh
 chmod 755 flume-env.sh
 
  • vi /nas/appl/flume/conf/log4j.properties

 
 flume.log.dir=/nas/appl/flume/logs
 
  • vi ~/.bashrc

 
 export FLUME_HOME=/nas/appl/flume
 export PATH=$PATH:$FLUME_HOME/bin
 
  • 버전 확인

 
 flume-ng version
 
  • Flume을 설치하면 Avro jar 파일이 아래와 같이 존재 합니다.

 
 lib/avro-1.7.2.jar
 lib/avro-ipc-1.7.2.jar
 

서비스 확인

  • 파일 저장 폴더 생성

 
 mkdir /nas/appl/flume/storage
 mkdir /nas/appl/flume/storage/checkpoint
 mkdir /nas/appl/flume/storage/data
 mkdir /nas/appl/flume/storage/file
 
  • vi conf/flume-conf.properties

 
 agent_001.sources=source-001
 agent_001.sources.source-001.channels=channel-001
 agent_001.sources.source-001.type = exec
 agent_001.sources.source-001.command = /bin/cat /nas/appl/flume/conf/flume-conf.properties
 
 agent_001.channels=channel-001
 agent_001.channels.channel-001.type = file
 agent_001.channels.channel-001.checkpointDir = /nas/appl/flume/storage/checkpoint
 agent_001.channels.channel-001.dataDirs = /nas/appl/flume/storage/data
 
 #agent_001.channels.channel-001.type = memory
 #agent_001.channels.channel-001.capacity = 1000
 #agent_001.channels.channel-001.transactionCapacity = 100
 
 agent_001.sinks=sink-001
 agent_001.sinks.sink-001.channel=channel-001 
 agent_001.sinks.sink-001.type = file_roll
 agent_001.sinks.sink-001.sink.directory = /nas/appl/flume/storage/file
 
  • Agent 실행

 
 flume-ng agent -n agent_001 -c /nas/appl/flume/conf -f /nas/appl/flume/conf/flume-conf.properties
     -Dflume.monitoring.type=http -Dflume.monitoring.port=41414
 
  • Agent 동작 확인

    • http://localhost:41414/ 사이트에 접속하여 확인 합니다.

    • /nas/appl/flume/logs/flume.log 로그 파일에 오류가 있는지 확인 합니다.

    • /nas/appl/flume/storage/file/ 폴더에 파일이 생성 되었는지 확인 합니다.

 
 

Flume 매뉴얼


Flume conf 개요

  • Flume 도움말

 
 flume-ng help
 
  • Agent 실행

 
 flume-ng agent -n $agent_name -c conf -f flume-conf.properties
 flume-ng agent -n $agent_name -c conf -f flume-conf.properties
     -Dflume.monitoring.type=http -Dflume.monitoring.port=41414
 
  • avro-client

 
 flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
 
  • vi /nas/appl/flume/conf/flume-conf.properties

 
 agent_001.sources = source-001
 agent_001.channels = channel-001
 agent_001.sinks = sink-001
 
 agent_001.sources.source-001.channels = channel-001
 agent_001.sinks.sink-001.channel = channel-001
 
  • Flume Channel Selector

 
 agent_001.sources.$source.selector.type = replicating, multiplexing, com.jopenbusiness.flume.selector001
 
  • Flume Interceptor

    • Interceptor : host, timestamp, static, regex_filter

 
 agent_001.sources.$source.interceptors = i1 i2
 agent_001.sources.$source.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
 agent_001.sources.$source.interceptors.i1.preserveExisting = false
 agent_001.sources.$source.interceptors.i1.hostHeader = hostname
 agent_001.sources.$source.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
  • regex_filter interceptor

 
 agent_001.sources.$source.interceptors.i1.regex = (\\d):(\\d):(\\d)
 agent_001.sources.$source.interceptors.i1.serializers = s1 s2 s3
 agent_001.sources.$source.interceptors.i1.serializers.s1.name = one
 agent_001.sources.$source.interceptors.i1.serializers.s2.name = two
 agent_001.sources.$source.interceptors.i1.serializers.s3.name = three
 
  • Event Serializers

    • serializer : text, avro_event

 
 agent_001.sinks.$sink.sink.serializer = text
 agent_001.sinks.$sink.sink.serializer.appendNewline = false
 
  • Flume Sink Processor

 
 agent_001.sinks.$sink.processor.type = default, failover, load_balance, com.jopenbusiness.flume.processor001
  • failover sik processor

 
 agent_001.sinkgroups = group001
 agent_001.sinkgroups.group001.sinks = sink001 sink002
 
 agent_001.sinkgroups.group001.type = failover
 agent_001.sinkgroups.group001.priority.sink001 = 5
 agent_001.sinkgroups.group001.priority.sink002 = 10
 agent_001.sinkgroups.group001.maxpenalty = 10000
 

Flume Source

  • source 종류

    • exec : cat. 파일 내용, tail. 추가되는 파일 내용

    • hdfs

    • avro, thrift, memory

    • jms, spooldir, netcat, seq, http,

    • syslogtcp, multiport_syslogtcp, syslogudp

    • org.apache.flume.source.avroLegacy.AvroLegacySource

    • org.apache.flume.source.thriftLegacy.ThriftLegacySource

    • org.apache.flume.source.scribe.ScribeSource

    • custom source

 
 agent_001.sources.$source.type = com.jopenbusiness.flume.soure001
 
  • hdfs source

 
 agent_001.sources.$source.type = hdfs
 agent_001.sources.$source.bind = hdfs://namenode/user/flume/weblog
 
  • exec source

 
 agent_001.sources.$source.type = exec
 agent_001.sources.$source.command = /bin/bash -c
 agent_001.sources.$source.shell = /bin/bash -c
 agent_001.sources.$source.restartThrottle = 10000
 agent_001.sources.$source.restart = false
 agent_001.sources.$source.logStdErr = false
 agent_001.sources.$source.batchSize = 1000
 agent_001.sources.$source.selector.type = replicating 또는 multiplexing
 agent_001.sources.$source.selector.* = ~
 agent_001.sources.$source.interceptors = 1000
 agent_001.sources.$source.interceptors.* = ~
 
 agent_001.sources.$source.type = exec
 agent_001.sources.$source.command = tail -f ~
 agent_001.sources.$source.logStdErr = true
 agent_001.sources.$source.restart = true
 
 agent_001.sources.$source.type = exec
 agent_001.sources.$source.command = for i in /path/*.txt; do cat $i; done
 agent_001.sources.$source.shell = /bin/bash -c
 
  • avro source

 
 agent_001.sources.$source.type = avro
 agent_001.sources.$source.bind = 0.0.0.0
 agent_001.sources.$source.port = 10000
 
  • thrift source

 
 agent_001.sources.$source.type = thrift
 agent_001.sources.$source.bind = 0.0.0.0
 agent_001.sources.$source.port = 4141
 
  • memory source

 
 agent_001.sources.$source.type = memory
 agent_001.sources.$source.capacity = 1000
 agent_001.sources.$source.transactionCapacity = 100
 
  • multiplexing selector

 
 agent_001.sources.$source.selector.type = multiplexing
 agent_001.sources.$source.selector.header = 
 agent_001.sources.$source.selector.mapping. = 
 agent_001.sources.$source.selector.mapping. =  
 agent_001.sources.$source.selector.mapping. = 
 

Flume Channel

  • Channel 종류

    • memory, file

    • jdbc

    • org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel

    • org.apache.flume.channel.PseudoTxnMemoryChannel

    • Custom channel

 
 agent_001.channels.$channel.type = com.jopenbusiness.flume.channel001
 
  • memory channel

 
 agent_001.channels.$channel.type = memory
 agent_001.channels.$channel.capacity = 1000
 
  • file channel

    • mkdir /nas/appl/flume/storage

    • mkdir /nas/appl/flume/storage/checkpoint

    • mkdir /nas/appl/flume/storage/data

 
 agent_001.channels.$channel.type = file
 agent_001.channels.$channel.checkpointDir = /nas/appl/flume/storage/checkpoint
 agent_001.channels.$channel.dataDirs = /nas/appl/flume/storage/data
 

Flume Sink

  • sink 종류

    • hdfs

    • file_roll

    • logger, avro, thrift, irc, null

    • hbase, asynchbase

    • org.apache.flume.sink.elasticsearch.ElasticSearchSink

    • Custom sink

 
 agent_001.sinks.$sink.type = com.jopenbusiness.flume.sink001
 
  • hdfs sink

    • hadoop.jar 필요

 
 agent_001.sinks.$sink.type = hdfs
 agent_001.sinks.$sink.hdfs.path= hdfs://namenode:9000/user/flume/weblog/%y%m%d
 agent_001.sinks.$sink.hdfs.filePrefix = http-
 agent_001.sinks.$sink.hdfs.fileSuffix = .log
 agent_001.sinks.$sink.hdfs.round = true
 agent_001.sinks.$sink.hdfs.roundValue = 10
 agent_001.sinks.$sink.hdfs.roundUnit = minute
 
  • file_roll sink

    • mkdir /nas/appl/flume/storage

    • mkdir /nas/appl/flume/storage/file

 
 agent_001.sinks.$sink.type = file_roll
 agent_001.sinks.$sink.sink.directory = /nas/appl/flume/storage/file
 
  • avro sink

 
 agent_001.sinks.$sink.type = avro
 agent_001.sinks.$sink.hostname = localhost
 agent_001.sinks.$sink.port = 4545
 
  • logger sink

 
 agent_001.sinks.$sink.type = logger
 

Avro로 agent 연결

 
 agent_001.sinks.$sink.type = avro
 agent_001.sinks.$sink.hostname = hostForAgent002
 agent_001.sinks.$sink.port = 41414
 agent_001.sinks.$sink.batch-size = 100
 agent_001.sinks.$sink.runner.type = polling
 agent_001.sinks.$sink.runner.polling.interval = 10
 
 //--- Collector에서 port를 bind 합니다.
 agent_002.sources.$source.type = avro
 agent_002.sources.$source.bind = hostForAgent002
 agent_002.sources.$source.port = 41414
 

Monitor

  • ganglia reporting

 
 flume-ng agent -n $agent_name -c conf -f flume-conf.properties
     -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
 
  • JSON Reporting

 
 flume-ng agent -n $agent_name -c conf -f flume-conf.properties
     -Dflume.monitoring.type=http -Dflume.monitoring.port=41414
 
  • Custom reporting

 
 flume-ng agent -n $agent_name -c conf -f flume-conf.properties
     -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
 

Flume Agent 사례


exec -> memory -> avro

  • flume-ng agent -n agent_001 -f /nas/appl/flume/conf/flume-conf.properties
    {|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" 
    |- 
    |width="20%" align="center" valign="middle" style="background-color:#eee;"|Event 정보 
    |width="80%" align="left" valign="middle"|

  • Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
 agent_001.sources = source-001
 agent_001.sources.source-001.channels = channel-001
 agent_001.sources.source-001.type = exec
 agent_001.sources.source-001.command = /cloudnas/develop/tail_00001.bash
 agent_001.sources.source-001.restartThrottle = 1000
 agent_001.sources.source-001.restart = true
 agent_001.sources.source-001.logStdErr = false
 agent_001.sources.source-001.batchSize = 100
  • restartThrottle : 비정상 종료시 재시작 시간 (ms)

  • batchSize : Transaction당 event 수
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
 agent_001.channels = channel-001
 agent_001.channels.channel-001.type = memory
 agent_001.channels.channel-001.capacity = 240000
 agent_001.channels.channel-001.transactionCapactiy = 100
 agent_001.channels.channel-001.keep-alive = 3
 agent_001.channels.channel-001.byteCapacityBufferPercentage = 20
 agent_001.channels.channel-001.byteCapacity = 24960000
  • capacity : 채널에 저장 가능한 event의 최대 개수 (Count * 100)

  • transactionCapacity : 한 transaction당 수신/송신할 수 있는 최대 event 개수

  • byteCapacity : 저장 가능한 bytes 수 (capacity * Size * 2)
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
 agent_001.sinks = sink-001
 agent_001.sinks.sink-001.channel = channel-001
 agent_001.sinks.sink-001.type = avro
 agent_001.sinks.sink-001.hostname = 192.168.56.201
 agent_001.sinks.sink-001.port = 4141
 agent_001.sinks.sink-001.batch-size = 100
 agent_001.sinks.sink-001.connect-timeout = 20000
 agent_001.sinks.sink-001.request-timeout = 20000
  • batch-size : Transaction당 event 수
    |}

 

avro -> memory -> file_roll

  • flume-ng agent -n agent_002 -f /nas/appl/flume/conf/flume-conf.properties
    {|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" 
    |- 
    |width="20%" align="center" valign="middle" style="background-color:#eee;"|Event 정보 
    |width="80%" align="left" valign="middle"|

  • Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
 agent_002.sources = source-002
 agent_002.sources.source-002.channels = channel-002
 agent_002.sources.source-002.type = avro
 agent_002.sources.source-002.bind = 0.0.0.0
 agent_002.sources.source-002.port = 4141

|- 
|align="center" valign="middle" style="background-color:#eee;"|Channel
(memory)

|align="left" valign="middle"|생략 
|- |align="center" valign="middle" style="background-color:#eee;"|Sink
(file_roll) 
|align="left" valign="middle"|

 agent_002.sinks = sink-002
 agent_002.sinks.sink-002.channel = channel-002
 agent_002.sinks.sink-002.type = file_roll
 agent_002.sinks.sink-002.sink.directory = /cloudnas/develop/output/fileRoll
 agent_002.sinks.sink-002.sink.rollInterval = 30
 agent_002.sinks.sink-002.batchSize = 100
  • batchSize : Transaction당 event 수
    |}

 

exec -> memory -> null

  • flume-ng agent -n agent_001 -f /nas/appl/flume/conf/flume-conf.properties
    {|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" 
    |- 
    |width="20%" align="center" valign="middle" style="background-color:#eee;"|Event 정보 
    |width="80%" align="left" valign="middle"|

  • Size (52 bytes) * Count (2,400 / 초) = 124,800 bytes / 초
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
    -
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
    -
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
 agent_001.sinks = sink-001
 agent_001.sinks.sink-001.channel = channel-001
 agent_001.sinks.sink-001.type = null
 agent_001.sinks.sink-001.batchSize = 100
  • batchSize : Transaction당 event 수
    |}

 

Flume 개발


  • Flume Library

    • lib/flume-ng-configuration-1.4.0.jar

    • lib/flume-ng-sdk-1.4.0.jar

    • lib/flume-ng-core-1.4.0.jar

 
 

{|cellspacing="0" cellpadding="2" border="1" width="100%" bgcolor="#FFFFFF" align="center" 
|- |width="30%" align="center" valign="middle" style="background-color:#eee;"|Source |width="70%" align="left" valign="middle"|

  • org.apache.flume.Source

  • 구현 : org.apache.flumen.source.ExecSource
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
  • org.apache.flume.interceptor.Interceptor

  • 구현 : org.apache.flume.interceptor.TimestampInterceptor$Builder

 
 agent_001.sources.source-001.interceptors = ConversionInterceptor
 agent_001.sources.source-001.interceptors.ConversionInterceptor.type =
     com.jopenbusiness.hadoop.flume.interceptor.ConversionInterceptor$Builder

|- 
|align="center" valign="middle" style="background-color:#eee;"|Channel Selector |align="left" valign="middle"|

  • org.apache.flume.ChannelSelector

  • 구현 : org.apache.flume.channel.MultiplexingChannelSelector
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
  • org.apache.flume.Channel

  • 구현 : org.apache.flume.channel.MemoryChannel
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
  • org.apache.flume.Sink

  • 구현 : org.apache.flume.sink.LoggerSink
    align="center" valign="middle" style="background-color:#eee;"
    align="left" valign="middle"
  • org.apache.flume.SinkProcessor

  • 구현 : org.apache.flume.sink.LoadBalancingSinkProcessor
    |}

  • Component

    • Event

    • Source

    • Sink

    • Channel

    • Source and Sink Runners

    • Agent

    • Configuration Provider

    • Client

     
  • Physical node

  • Logical node

 
 

700px 
350px 
700px

 
 
 

로그 데이터 구조


  • 로그 데이터

    • ymd string : 년월일

    • hms string : 시분초

    • severity string : 심각도 (ERROR, WARNING, INFO, ...)

    • server string

    • process_id int

    • message string

 
 

기술지원


  • 오류 : org.apache.flume.ChannelException: Unable to put batch on required channel: org.apache.flume.channel.MemoryChannel

    • Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count

 
 

Flume 구성


 
 

참고 문헌


 
 

분류: BigData

최종 수정일: 2024-09-30 12:26:18

이전글 :
다음글 :