Flume + Kafka + Storm 部署指南

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

Storm是一个分布式的、容错的实时计算系统,为分布式实时计算提供了一组通用原语,可被用于“流处理”之中,实时处理消息并更新数据库。这是管理队列及工作者集群的另一种方式。 Storm也可被用于连续计算(continuous computation),对数据流做连续查询,在计算时就将结果以流的形式输出给用户。

环境要求

  • JDK 1.7+ JAVA环境
  • Maven 3.* 项目构建工具
  • Zookeeper 分布式开放源码的分布式应用程序协调服务,Kafka与Storm必须

JDK

MAC

查看JDK版本 java -version

Mac 自带的 JDK 为1.6版本,到 Oracle 官方网站下载:

http://www.oracle.com/technetwork/java/javase/downloads/java-archive-downloads-javase7-521261.html

根据苹果的官方说明,Mac OS X 10.5 及以后的版本应该使用 /usr/libexec/java_home 命令来确定 JAVA_HOME

CentOS

  • CentOS 系统自带JDK的情况

CentOS自带的应该是OpenJDK,是JDK的开源版本,通过yum install java 安装的:

java -version
openjdk version "1.8.0_51"
OpenJDK Runtime Environment (build 1.8.0_51-b16)
OpenJDK 64-Bit Server VM (build 25.51-b03, mixed mode)

进一步查看JDK rpm包信息,输出类似:

rpm -qa | grep java
tzdata-java-2015e-1.el6.noarch
java-1.8.0-openjdk-1.8.0.51-0.b16.el6_6.x86_64
java-1.8.0-openjdk-headless-1.8.0.51-0.b16.el6_6.x86_64

卸载系统自带的OpenJDK,可执行以下操作:yum remove java

  • 可能安装过 Oracle JDK 的旧版本
java -version
java version "1.7.0_80"
Java(TM) SE Runtime Environment (build 1.7.0_80-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode)

卸载后安装:

rpm -qa | grep jdk
rpm -e jdk
rpm -ivh jdk-7-linux-x64.rpm
java -version

JDK环境变量

vi /etc/profile

JAVA_HOME=/usr/java/jdk1.7.0
PATH=$PATH:$JAVA_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib;
export JAVA_HOME PATH CLASSPATH

source /etc/profile

echo $JAVA_HOME

PS:

  1. CLASSPATH 这个环境变量在Storm下貌似没有用

  2. 有个神奇的工具 alternatives 还可以切换java版本:

alternatives --config java

  选择    命令
*  1           /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.51-0.b16.el6_6.x86_64/jre/bin/java
 + 2           /usr/java/jdk1.7.0_80/bin/java

按 Enter 来保存当前选择[+],或键入选择号码:

Maven

Maven是基于项目对象模型(POM),可以通过一小段描述信息来管理项目的构建,即pom.xml配置文件,报告和文档的软件项目管理工具。

个人理解类似C下面的Makefile,用于编译打包jar,解决包依赖。

官网:

http://maven.apache.org/

文档:

http://maven.apache.org/install.html

下载:

http://apache.fayea.com/maven/maven-3/3.3.3/binaries/apache-maven-3.3.3-bin.tar.gz

解压到/usr/local/server/maven

将/usr/local/server/maven/bin写入PATH

检测安装成功与否,这个需要在项目目录下执行可能才正常:

mvn -v
Apache Maven 3.3.3 (7994120775791599e205a5524ec3e0dfe41d4a06; 2015-04-22T19:57:37+08:00)
Maven home: /usr/local/server/maven
Java version: 1.7.0_80, vendor: Oracle Corporation
Java home: /Library/Java/JavaVirtualMachines/jdk1.7.0_80.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: ISO8859-1
OS name: "mac os x", version: "10.10.2", arch: "x86_64", family: "mac"

Flume

官网:

http://flume.apache.org/

用户手册:

http://flume.apache.org/FlumeUserGuide.html
https://cwiki.apache.org/confluence/display/FLUME/Getting+Started

应当重点关注手册的 source, sink, 以及过滤器部分

扩展阅读:

Flume NG:Flume 发展史上的第一次革命:http://www.ibm.com/developerworks/cn/data/library/bd-1404flumerevolution/index.html

安装

Flume源码编译需要JDK 1.6+, Apache Maven 3.x,一般使用二进制包即可。

wget http://www.apache.org/dyn/closer.cgi/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz

解压到/usr/local/server/flume

cp conf/flume-conf.properties.template conf/flume.conf

cp conf/flume-env.sh.template conf/flume-env.sh

简单示例

令avro监听作为数据来源,内存作为管道,file_roll作为去向。运行结果是从avro读取的数据输出到文件中:

vi example.conf
agent.channels = memory
agent.sources = avro
agent.sinks = file

agent.channels.memory.type = memory

agent.sources.avro.channels = memory
agent.sources.avro.type = avro
agent.sources.avro.bind = 0.0.0.0
agent.sources.avro.port = 41414

agent.sinks.file.channel = memory
agent.sinks.file.type = file_roll
agent.sinks.file.sink.directory = /tmp/flume

在客户端运行agent,其中

  • flume-ng agent 是客户端指令
  • -n 表示agent 名称
  • -Dproperty 用于传递 java option 参数
/usr/local/server/flume/bin/flume-ng agent -n agent --conf /usr/local/server/flume/conf/ -f /usr/local/server/flume/conf/example.conf -Dflume.root.logger=DEBUG,console

可通过 netstat -tunlp flume自动启动了一个监听在41414端口的avro服务端

flume 自带了avro客户端脚本,用于向服务端写数据: avro-client

下面运行avro客户端并输出 /etc/passwd 文件内容到avro服务端

/usr/local/server/flume/bin/flume-ng avro-client --conf /usr/local/server/flume/conf/ -H localhost -p 41414 -F /etc/passwd -Dflume.root.logger=DEBUG,console

运行正常的话。查看/tmp/flume目录下是否含有/etc/passwd的内容。

Kafka

官网:

http://kafka.apache.org/

文档:

http://kafka.apache.org/08/documentation.html

扩展阅读:

http://www.ibm.com/developerworks/cn/opensource/os-cn-kafka/index.html

安装

下载:

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz

解压到/usr/local/server/kafka

Kafka 依赖zookeeper 运行,本身有内置zookeeper,直接分别启动:

/usr/local/server/kafka/bin/zookeeper-server-start.sh /usr/local/server/kafka/config/zookeeper.properties

/usr/local/server/kafka/bin/kafka-server-start.sh /usr/local/server/kafka/config/server.properties

默认 Zookeeper 监听 2181,Kafka 监听 9092

如果是自行安装的zookeeper,则需要修改从server.properties中的 Zookeeper 相关的配置段

简单示例

测试 Kafka 单机是否正常,命令行创建一个话题 log:

/usr/local/server/kafka/bin/kafka-topics.sh --create --topic log --partitions 1 --zookeeper localhost:2181 --replication-factor 1

列出当前话题:

/usr/local/server/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

向话题log发送内容,提示符后面随便输入字符:

/usr/local/server/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic log

接收话题log的消息:

/usr/local/server/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic log --from-beginning

如果有专门的日志汇总服务器,大概是flume->avro->kafka情景:

collecter:
/usr/local/server/flume/bin/flume-ng agent -n collecter --conf /usr/local/server/flume/conf/ -f /usr/local/server/flume/conf/collecter.conf -Dflume.root.logger=DEBUG,console

agent:
/usr/local/server/flume/bin/flume-ng agent -n agent --conf /usr/local/server/flume/conf/ -f /usr/local/server/flume/conf/agent.conf -Dflume.root.logger=DEBUG,console

Storm

官网:

http://storm.apache.org/

下载:
http://www.apache.org/dyn/closer.cgi/storm/apache-storm-0.9.4/apache-storm-0.9.4.tar.gz
解压到/usr/local/server/storm

文档:
http://storm.apache.org/documentation/Home.html

安装

Storm 提供了本机部署的方法,配置文件几乎不需要做任何更改

Storm 默认配置文件会使用默认的 Zookeeper 端口

因为我们在启动 Kafka 的时候启动了一个默认的,所以直接就可以运行

依次启动:

storm nimbus
storm supervisor
storm ui
  • nimbus 主节点。用于分配代码、布置任务及故障检测。
  • supervisor 工作节点。用于监听工作,开始并终止工作进程。Nimbus和Supervisor都能快速失败,而且是无状态的,这样一来它们就变得十分健壮,两者的协调工作是由ZooKeeper来完成的,节点一般是/storm
  • ui 可视化管理界面,可用于查看管理topology,以及运行日志

storm ui 默认运行在 8080 端口

http://localhost:8080/

示例

storm 程序包中自带了example目录,现在我们来尝试运行一下。示例文档:
https://github.com/apache/storm/tree/master/examples/storm-starter

进入到storm的example目录,使用mvn构建storm本地库,这些jar包会自动下载生成到external/目录下

cd /usr/local/server/storm/examples/storm-starter
mvn clean install -DskipTests=true

没有用到多语言特性的example,可以直接在Storm的本地编译并运行

mvn compile exec:java -Dstorm.topology=storm.starter.ExclamationTopology

提交到Storm集群,我们需要生成jar,并提交到Storm集群

// 生成jar包
mvn clean package
// 本地运行
storm jar storm-starter-*.jar storm.starter.RollingTopWords
// 提交到远程Storm运行
storm jar storm-starter-*.jar storm.starter.RollingTopWords topology-name

一般本地运行和远程运行是通过传递的参数来控制的

运行 logserver Topology

logserver 当前源码地址为:https://github.com/langwan/storm_monitor/tree/master/log

我们需要熟悉一下 Eclipse,了解如何调试编写Java代码,和构建 Maven 项目

进入到log源码目录,重复上面的几个步骤:

mvn compile exec:java -Dstorm.topology=com.cmstop.logserver.App
// 打包
mvn package
// 本地运行
storm jar logserver-*.jar com.cmstop.logserver.App
// 远程运行
storm jar logserver-*.jar com.cmstop.logserver.App logserver [config DIR]

logserver的远程运行需要额外两个参数,分别是topology name与config path

提交到Storm运行后,我们可以通过Storm UI 查看运行状态

基于docker 环境下部署 Storm

docker hub 与Storm相关的镜像主要有以下几个,这些镜像可以通过docker-compose来进行运行管理:

wurstmeister/base 
wurstmeister/zookeeper
wurstmeister/storm-ui 
wurstmeister/storm  
wurstmeister/storm-nimbus
wurstmeister/storm-supervisor
wurstmeister/kafka

wurstmeister 的 storm docker 使用supervisor来进行进程管理,别与storm-supervisor搞混了,他在wurstmeister/base中安装,然后配置路径是/etc/supervisor/,详情参看这里:

http://supervisord.org/index.html

上面说了storm的东西是无状态的,分布式消息依靠zookeeper,所以supervisor的功能就是监控进程状态,如果退出了,自动重启。

最后看一下docker-compose.yml,这个是在一个机器上运行所有docker并且实现在容器内部通信情况:

vi docker-compose.yml

zookeeper:
  image: wurstmeister/zookeeper
  ports:
    - "2181:2181"
    - "22"
kafka:
  image: wurstmeister/kafka
  ports:
    - "9092:9092"
  links:
    - zookeeper:zk
  environment:
    KAFKA_ADVERTISED_HOST_NAME: 10.171.200.57
    KAFKA_CREATE_TOPICS: "log:1:1"
  volumes:
    - /var/run/docker.sock:/var/run/docker.sock
nimbus:
  image: wurstmeister/storm-nimbus
  ports:
    - "49773:3773"
    - "49772:3772"
    - "6627:6627"
    - "22"
  links:
    - zookeeper:zk
  volumes:
    - /data/logserver:/data/logserver
supervisor:
  image: wurstmeister/storm-supervisor
  ports:
    - "8000"
    - "22"
  links:
    - nimbus:nimbus
    - zookeeper:zk
  volumes:
    - /data/logserver:/data/logserver
ui:
  image: wurstmeister/storm-ui
  ports:
    - "8080:8080"
    - "22"
  links:
    - nimbus:nimbus
    - zookeeper:zk
  volumes:
    - /data/logserver:/data/logserver

使用以下命令进行管理:

docker-compose up     创建并启动 -d 后台运行
docker-compose start  启动
docker-compose stop   停止
docker-compose ps
docker-compose logs

检验docker-compose是否成功

Kafka 会自动创建一个topic=log的话题,通过下面查看

$KAFKA_HOME/bin/kafka-topics.sh  --list --zookeeper localhost:2181  

然后运行访问宿主IP:8080查看 storm ui 界面。

线上环境部署

工作目录:/data/logserver/


├── logs
│   └── store     # 日志收集目录
└── storm-docker  # docker-compose运行目录
    ├── config    # 配置文件,如果提交到Storm集群,需要将该路径作为参数
    │   ├── kafka.properties
    │   ├── main.properties
    │   ├── mysql.properties
    │   └── store.properties
    ├── docker-compose.yml
    └── logserver-0.1.0-jar-with-dependencies.jar

MySQL运行在宿主机器,数据库为cloud,相关表为host, monitor_log。

日志收集目录为:/data/logserver/logs/store

运行docker-compose,启动Storm以及相关组件

cd /data/logserver/storm-docker

docker-compose up -d

客户端机器运行flume:

/usr/local/server/flume/bin/flume-ng agent -n agent --conf /usr/local/server/flume/conf/ -f /usr/local/server/flume/conf/cmstop-log.conf  -Dflume.root.logger=DEBUG,console

cmstop-log.conf 大概是这样

agent.sources = nginx mysql php redis mysqlslow phpslow
agent.channels = memory
agent.sinks = kafka

agent.channels.memory.type = memory
agent.channels.memory.capacity = 1000
agent.channels.memory.transactionCapacity = 1000

agent.sinks.kafka.type = com.cmstop.flume.sink.kafka.KafkaSink
agent.sinks.kafka.channel = memory
agent.sinks.kafka.topic = log
agent.sinks.kafka.brokerList = 10.171.200.57:9092
agent.sinks.kafka.requiredAcks = 1
agent.sinks.kafka.batchSize = 20

agent.sources.nginx.command = tail -F /data/weblogs/tengine/nginx_error.log
agent.sources.mysql.command = tail -F /data/mysql/logs/mysql_error.log
agent.sources.php.command = tail -F /data/weblogs/php/error.log
agent.sources.redis.command = tail -F /data/weblogs/redis/redis.log
agent.sources.mysqlslow.command = tail -F /data/logs/mysql/slow.log
agent.sources.phpslow.command = tail -F /data/weblogs/php/site.log.slow

agent.sources.nginx.type = exec
agent.sources.nginx.channels = memory
agent.sources.nginx.interceptors = i1 i2
agent.sources.nginx.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.nginx.interceptors.i1.preserveExisting = true
agent.sources.nginx.interceptors.i1.useIP = false
agent.sources.nginx.interceptors.i1.hostHeader = hostname
agent.sources.nginx.interceptors.i2.type = static
agent.sources.nginx.interceptors.i2.key = type
agent.sources.nginx.interceptors.i2.value = nginx.error

agent.sources.mysql.type = exec
agent.sources.mysql.channels = memory
agent.sources.mysql.interceptors = i1 i2
agent.sources.mysql.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.mysql.interceptors.i1.preserveExisting = true
agent.sources.mysql.interceptors.i1.useIP = false
agent.sources.mysql.interceptors.i1.hostHeader = hostname
agent.sources.mysql.interceptors.i2.type = static
agent.sources.mysql.interceptors.i2.key = type
agent.sources.mysql.interceptors.i2.value = mysql.error

agent.sources.php.type = exec
agent.sources.php.channels = memory
agent.sources.php.interceptors = i1 i2
agent.sources.php.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.php.interceptors.i1.preserveExisting = true
agent.sources.php.interceptors.i1.useIP = false
agent.sources.php.interceptors.i1.hostHeader = hostname
agent.sources.php.interceptors.i2.type = static
agent.sources.php.interceptors.i2.key = type
agent.sources.php.interceptors.i2.value = php.error

agent.sources.redis.type = exec
agent.sources.redis.channels = memory
agent.sources.redis.interceptors = i1 i2
agent.sources.redis.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.redis.interceptors.i1.preserveExisting = true
agent.sources.redis.interceptors.i1.useIP = false
agent.sources.redis.interceptors.i1.hostHeader = hostname
agent.sources.redis.interceptors.i2.type = static
agent.sources.redis.interceptors.i2.key = type
agent.sources.redis.interceptors.i2.value = redis.error

agent.sources.mysqlslow.type = exec
agent.sources.mysqlslow.channels = memory
agent.sources.mysqlslow.interceptors = i1 i2
agent.sources.mysqlslow.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.mysqlslow.interceptors.i1.preserveExisting = true
agent.sources.mysqlslow.interceptors.i1.useIP = false
agent.sources.mysqlslow.interceptors.i1.hostHeader = hostname
agent.sources.mysqlslow.interceptors.i2.type = static
agent.sources.mysqlslow.interceptors.i2.key = type
agent.sources.mysqlslow.interceptors.i2.value = mysql.slow

agent.sources.phpslow.type = exec
agent.sources.phpslow.channels = memory
agent.sources.phpslow.interceptors = i1 i2
agent.sources.phpslow.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.phpslow.interceptors.i1.preserveExisting = true
agent.sources.phpslow.interceptors.i1.useIP = false
agent.sources.phpslow.interceptors.i1.hostHeader = hostname
agent.sources.phpslow.interceptors.i2.type = static
agent.sources.phpslow.interceptors.i2.key = type
agent.sources.phpslow.interceptors.i2.value = php.slow

可通过kafka消费者查看话题:

/data/server/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic log --from-beginning

提交 Topology 到Storm

提交 Topology,其中192.168.42.62为docker中nimbus的IP地址,可通过UI查看:

本地测试运行与远程提交:

// 本地
storm jar logserver-0.1.0-jar-with-dependencies.jar com.cmstop.logserver.App -c nimbus.host=192.168.42.62
// 远程
storm jar logserver-0.1.0-jar-with-dependencies.jar com.cmstop.logserver.App logserver /data/logserver/storm-docker/config/ -c nimbus.host=192.168.42.62

因为我们在宿主机器上运行,所以需要 -c参数来指定提交位置

问题总结

  1. 客户端 flume 接收到的hostname目前是localhost,但是客户端机器的hostname明明是cmstop-cloud

  2. 日志里面有一下几个情况,nginx.error,php.error是以行来记录的,而mysql.error的日志可能是一大段,另外php.slow一般会根据进程池来配置多个slow文件,现在只监控了后台的slow

  3. docker 会自动分配容器的IP地址,这点不爽。docker-compose 虽然提供了方便,但是由于下载的那个源里面的脚本有点问题,不能重复运行。如果停止了需要执行一下步骤。

docker-compose stop   # 假如停止了
docker-compose start  # 直接启动不行,因为他的脚本写的不严谨
解决办法是:
docker-compose stop
docker-compose rm # 直接删掉
docker-compose up # 重新运行