kafka的监控和告警
其实对于大多数用kafka的人来说,一般都会选择两个开源的工具:KafkaOffsetMonitor和kafka-web-console,这两款我都有用过,而且各有优缺点。
KafkaOffsetMonitor:最大的好处就是配置简单,只需要配个zookeeper的地址就能用了,坑爹的地方就是不能自动刷新,手动刷新时耗时较长,而且有时候都刷不出来,另外就是图像用了一段时间就完全显示不了了,不知道大家是不是这样。
kafka-web-console:相比与前者,数据是落地的,因此刷新较快,而且支持在前端自定义zookeeper的地址,还能列出实时的topic里的具体内容。但是搭建比较复杂,而且github上的默认数据库是H2的,像我们一般用mysql的,还得自己转化。另外在用的过程中,我遇到一个问题,在连接kafka的leader失败的时候,会一直重试,其结果就是导致我kafka的那台机子连接数过高,都到2w了,不知道是不是它的一个bug。
而且我们还得关心其他指标吧,http://kafka.apache.org/documentation.html 里的momitor部分不是列出了那么多监控项么,迫不得已,我得靠自己去另辟新法,我现在的做法是用ganglia来做监控。 哈哈,github上一搜,有戏,https://github.com/criteo/kafka-ganglia,只需要在server.properties里添加几个配置项就解决问题了,结果反复试验都没有成功,一看都一年没更新了,估计是版本问题吧,也懒得管他了。
当然还有个比较傻一些的办法,用CSVMetricsReporter,在配置文件中开启之后,就会把相应的指标分别写入到csv文件中,然后再用脚本去采集即可,这个的确是可行的,但是对资源的消耗比较大, 等等,不是还有这个嘛https://github.com/adambarthelson/kafka-ganglia ,用JMXTrans来做,修改kafka配置,将其jmx端口暴露出来,然后用JMXTrans把数据发到ganglia,你的JMXTrans的配置文件可以是这样:
{ "servers": [ { "port": "9999", "host": "xxxxxx", "queries": [ { "outputWriters": [ { "@class": "com.googlecode.jmxtrans.model.output.GangliaWriter", "settings": { "groupName": "jvmheapmemory", "port": 8649, "host": "xxxxx" } } ], "obj": "java.lang:type=Memory", "resultAlias": "heap", "attr": [ "HeapMemoryUsage", "NonHeapMemoryUsage" ] }, ................#[size=16]#其他配置[/size] ] }这里可以把http://kafka.apache.org/documentation.html 里列出来的mbean都加入进来,而且jmxtrans还支持GraphiteWriter,这样数据就落地了,你再想怎么处理就很easy啦。 若是你仅仅是想监控lag和logsize这些指标,亦如KafkaOffsetMonitor中展示的那样,这里提供两个方法:
用bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test就能列出你想要的,你可以写个脚本去定时获取lag,再制定出大于多少就发告警邮件啊什么的。一
用命令行的方式总感觉不像脚本该做的,有没有client可以去获取呢,答案是有,我就把我用KafkaClient和KazooClient获取lag的脚本贡献给大家吧:二
#!/usr/local/bin/python
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kazoo.client import KazooClient
# Zookeepers - no need to add ports
zookeepers="localhost"
# Kafka broker
kafka="localhost:9092"
#consumer group
group="test"
if __name__ == '__main__':
broker = KafkaClient(kafka)
lags = {}
zk = KazooClient(hosts=zookeepers, read_only=True) #zookeeper客户端,read_only确保不会对zookeeper更改
zk.start()
logsize=0
topics = zk.get_children("/consumers/%s/owners" %(group))
for topic in topics:
logsize =0
consumer = SimpleConsumer(broker, group, str(topic))
latest_offset = consumer.pending()
partitions = zk.get_children("/consumers/%s/offsets/%s" %(group, topic))
for partition in partitions:
log = "/consumers/%s/offsets/%s/%s" % (group, topic, partition)
if zk.exists(log):
data, stat = zk.get(log)
logsize += int(data)
lag = latest_offset - logsize
lags[topic] = lag
zk.stop()
上面的lags就是一个当前topic的lag的字典咯,其实大体的逻辑就是通过SimpleConsumer获取到当前的offset,再由KazooClient对zookeeper层层剥皮,获取topic和partition的信息,得到每个partition的logsize后累加与offset比较,就能有lag信息了,之后你想干嘛干嘛了,比如发报警邮件等等。当然也可以像KafkaOffsetMonitor那样做自己的展示了。
分享阅读原文:http://www.opscoder.info/kafka_monitor.html