在实际业务场景中,为了提高系统的实时性,减轻日志存储压力,需要将日志直接生产至消息中间件,减少flume或flumted收集所导致的延时及性能压力,本文实现了一下功能:
实现了一个静态调用的异步生产者 AsyncProducer
封装了一个用于异步发送的生产器 Agent
//@description kafka代理 //@author chenbintao //@data 2017-09-27 10:30 初稿 // 2017-09-27 11:15 规范代码 // 2017-09-28 14:15 对发送逻辑进行了优化 package kafkaAgent import ( "fmt" "log" "runtime/debug" "strings" "time" "github.com/Shopify/sarama" ) const ( _BROKER_LIST_ = `localhost:9092` ) const ( _LABEL_ = "[_kafkaAgent_]" ) var ( IS_DEBUG = false _PAUSE_ = false ) func SetDebug(debug bool) { IS_DEBUG = debug } type Agent struct { flag bool BrokerList string TopicList string SendTimeOut time.Duration ReceiveTimeOut time.Duration AsyncProducer sarama.AsyncProducer } func (this *Agent) Set(BrokerList, TopicList string, SendTimeOut, ReceiveTimeOut time.Duration) bool { //只允许初始化一次 if this.flag { return false } this.flag = true this.BrokerList = BrokerList this.TopicList = TopicList this.SendTimeOut = SendTimeOut this.ReceiveTimeOut = ReceiveTimeOut this.AsyncProducer = getProducer(this.BrokerList, this.SendTimeOut, true) if nil == this.AsyncProducer { return false } return this.Check() } func (this *Agent) Check() bool { if "" == this.BrokerList || "" == this.TopicList { return false } if 0 == this.SendTimeOut && 0 == this.ReceiveTimeOut { return false } return true } func (this *Agent) Send(msg string) bool { defer func() { if e, ok := recover().(error); ok { log.Println(_LABEL_, "WARN: panic in %v", e) log.Println(_LABEL_, string(debug.Stack())) this.AsyncProducer.Close() this.AsyncProducer = getProducer(this.BrokerList, this.SendTimeOut, true) } }() if !this.Check() { return false } return asyncProducer( this.AsyncProducer, this.TopicList, msg, ) } //========================================================================= // asyncProducer 异步生产者 func AsyncProducer(kafka_list, topics, s string, timeout time.Duration) bool { if "" == kafka_list || "" == topics { return false } producer := getProducer(kafka_list, timeout, false) if nil == producer { return false } defer producer.Close() go func(p sarama.AsyncProducer) { errors := p.Errors() success := p.Successes() for { select { case err := <-errors: if err != nil { if IS_DEBUG { log.Println(_LABEL_, err) } return } else { return } case <-success: return } } }(producer) return asyncProducer(producer, topics, s) } func asyncProducer(p sarama.AsyncProducer, topics, s string) bool { if nil == p { return false } msg := &sarama.ProducerMessage{ Topic: topics, Value: sarama.ByteEncoder(s), } p.Input() <- msg if IS_DEBUG { fmt.Println(_LABEL_, msg) } return true } func getProducer(kafka_list string, timeout time.Duration, monitor bool) sarama.AsyncProducer { config := sarama.NewConfig() config.Producer.Return.Successes = true config.Producer.Timeout = timeout producer, err := sarama.NewAsyncProducer(strings.Split(kafka_list, ","), config) if err != nil { if IS_DEBUG { log.Println(_LABEL_, err) } } if monitor { //消费状态消息,防止死锁 go func(producer sarama.AsyncProducer) { if nil == producer { log.Println(_LABEL_, "getProducer() producer error!") return } errors := producer.Errors() success := producer.Successes() for { select { case err := <-errors: if err != nil { if IS_DEBUG { log.Println(_LABEL_, err) } continue } else { continue } case <-success: continue } } }(producer) } return producer }
评论前必须登录!
注册