前情回顾
前文我们完成了kafka消费逻辑实现,并将消息放入elasticsearch,然后通过kibana可视化工具查看我们的日志。
本节目标
前文只是完成了kafka消息消费以及放入elastic,这次将项目完善,使其支持热更新,就是当config.yaml中监控的日志改变,或者etcd数据有改变时,动态启动协程监控新增日志,关闭取消的日志监控协程。
新增变量控制协程自启动
kafkaconsumer.go中新增如下代码
1 | var topicMap map[string]map[int32]*TopicData |
topicMap用来存储config.yaml中直接记录的日志topic以及协程参数,etcd_topicMap用来记录etcd中记录的topic以及协程参数。
topicSet用来记录config.yaml中直接记录的日志topic, etcd_topicSet用来记录etcd中记录的topic。
topicChan当监控日志写入elastic的协程异常崩溃时,通过该chan返回topic信息,然后我们通过topicMap找到topic对应的协程重启。
etcd_topicChan协程根据etcd的保存的topic,监控etlastic处理,如果该协程崩溃,则通过etcd_topicMap中查找topic对应的协程重启。
consumer_list保存了kafka消费者列表。
etcdcli时etcd的客户端,用来处理etcd读写。
根据config中配置的日志topic生成set
1 | func ConstructTopicSet() map[string]bool { |
根据config中配置的etcd键值获取val,然后获取topic生成set
etcdconsumer.go中通过GetTopicSet从etcd中读取topic生生set
1 | func GetTopicSet(cli *clientv3.Client) (interface{}, error) { |
将集合转化为map,并配置协程然后启动
1 | func ConvertSet2Map(consumer sarama.Consumer, topicSet map[string]bool, |
从kafka中读取消息,并调用上面的函数启动协程监控es
从kafka中读取信息,然后根据配置生成set和map,启动协程监控es
1 | func ConsumeTopic(consumer sarama.Consumer) { |
go logconfig.WatchConfig 启动协程,调用vipper监控配置,当配置有更新时消费者协程处理更新的topic。
同时支持子协程异常崩溃时,消费者协程重启该协程。
通过kibana看到的日志信息
kibana中ManageMent管理,然后新增index,elastic的index我们设置的是topic,所以我们新建几个index,
etcd_log, golang_log, logdir2。
然后kibana中可以看到这几个index的日志信息了
源码下载
https://github.com/secondtonone1/golang-/tree/master/logcatchsys
感谢关注我的公众号