Go接入kafka
阅读原文时间:2023年07月08日阅读:1
github.com/Shopify/sarama // kafka主要的库*
github.com/bsm/sarama-cluster // kafka消费组


package producer

import (
    "fmt"
    "github.com/HappyTeemo7569/teemoKit/tlog"
    "github.com/Shopify/sarama"
    "kafkaDemo/define"
)

var (
    ProducerId = 1
)

type Producer struct {
    Producer   sarama.SyncProducer
    Topic      string //主题
    ProducerID int    //生产者Id
    MessageId  int
}

func (p *Producer) InitProducer() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
    config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
    config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

    // 连接kafka
    client, err := sarama.NewSyncProducer([]string{define.SERVER_LIST}, config)
    if err != nil {
        tlog.Error("producer closed, err:", err)
        return
    }

    p.Producer = client
    p.Topic = define.TOPIC
    p.ProducerID = ProducerId
    p.MessageId = 1

    ProducerId++
}

func (p *Producer) SendMessage() {
    // 构造一个消息
    msg := &sarama.ProducerMessage{}
    msg.Topic = p.Topic
    txt := fmt.Sprintf("ProducerID:%d  this is a test log %d",
        p.ProducerID, p.MessageId)
    msg.Value = sarama.StringEncoder(txt)

    // 发送消息
    pid, offset, err := p.Producer.SendMessage(msg)
    //_, _, err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send msg failed, err:", err)
        return
    }
    tlog.Info(fmt.Sprintf("ProducerID:%d pid:%v offset:%v msg:%s",
        p.ProducerID, pid, offset, txt))

    p.MessageId++
}

func (p *Producer) Close() {
    p.Producer.Close()
}



package consumer

import (
    "github.com/HappyTeemo7569/teemoKit/tlog"
    "github.com/Shopify/sarama"
    "kafkaDemo/define"
)

type Consumer struct {
    Consumer   sarama.Consumer
    Topic      string
    ConsumerId int //消费者Id
}

func (c *Consumer) InitConsumer() error {
    consumer, err := sarama.NewConsumer([]string{define.SERVER_LIST}, nil)
    if err != nil {
        return err
    }
    c.Consumer = consumer
    c.Topic = define.TOPIC
    c.ConsumerId = ConsumerId
    ConsumerId++
    return nil
}

//指定partition
//offset 可以指定,传-1为获取最新offest
func (c *Consumer) GetMessage(partitionId int32, offset int64) {
    if offset == -1 {
        offset = sarama.OffsetNewest
    }
    pc, err := c.Consumer.ConsumePartition(c.Topic, partitionId, offset)
    if err != nil {
        tlog.Error("failed to start consumer for partition %d,err:%v", partitionId, err)
        //That topic/partition is already being consumed
        return
    }

    // 异步从每个分区消费信息
    go func(sarama.PartitionConsumer) {
        for msg := range pc.Messages() {
            tlog.Info("ConsumerId:%d Partition:%d Offset:%d Key:%v Value:%v", c.ConsumerId, msg.Partition, msg.Offset, msg.Key, string(msg.Value))
        }
    }(pc)
}

//遍历所有分区
func (c *Consumer) GetMessageToAll(offset int64) {
    partitionList, err := c.Consumer.Partitions(c.Topic) // 根据topic取到所有的分区
    if err != nil {
        tlog.Error("fail to get list of partition:err%v", err)
        return
    }
    tlog.Info("所有partition:", partitionList)

    for partition := range partitionList { // 遍历所有的分区
        c.GetMessage(int32(partition), offset)
    }
}



func main() {
    tlog.Info("开始")

    go producer.Put()
    go consumer.Get()

    for {
        time.Sleep(time.Hour * 60)
    }
}

func Put() {
    producer := new(Producer)
    producer.InitProducer()
    go func() {
        for {
            producer.SendMessage()
            time.Sleep(1 * time.Second)
        }
    }()
}

func Get() {

    offest := int64(0)

    consumer := new(Consumer)
    err := consumer.InitConsumer()
    if err != nil {
        tlog.Error("fail to init consumer, err:%v", err)
        return
    }
    consumer.GetMessageToAll(offest)
}

具体源码可以查看:

kafka_demo

  • 可以存储消费的节点到redis

  • 需要顺序的消费的放到一个partition,或者利用哈希算法投递

  • 传入一个通道,将业务逻辑和底层逻辑解耦。

    package kafka

    import (
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "github.com/Shopify/sarama"
    "io/ioutil"
    "log"
    "sync"
    "vliao.com/stellar/internal/core"
    )

    type KafkaConsumer struct {
    Node []string
    Consumer sarama.Consumer
    Topic string
    MessageQueue chan []byte
    }

    func NewKafkaConsumer(topic string) KafkaConsumer {
    return KafkaConsumer{
    Node: core.GetKafkaConn().Conn,
    Topic: core.GetServerMode() + "_" + topic,
    }
    }

    // Consume 获取所有分区
    func (c *KafkaConsumer) Consume() {
    config := sarama.NewConfig()

    config.Net.SASL.Enable = true
    config.Net.SASL.User = core.GetKafkaConn().SASLUser
    config.Net.SASL.Password = core.GetKafkaConn().SASLPassword
    config.Net.SASL.Handshake = true
    
    certBytes, err := ioutil.ReadFile(GetFullPath("only-4096-ca-cert"))
    if err != nil {
        fmt.Println("kafka client read cert file failed ", err.Error())
        return
    }
    clientCertPool := x509.NewCertPool()
    ok := clientCertPool.AppendCertsFromPEM(certBytes)
    if !ok {
        fmt.Println("kafka client failed to parse root certificate")
        return
    }
    config.Net.TLS.Config = &tls.Config{
        RootCAs:            clientCertPool,
        InsecureSkipVerify: true,
    }
    config.Net.TLS.Enable = true
    
    consumer, err := sarama.NewConsumer(c.Node, config)
    if err != nil {
        log.Fatal("NewConsumer err: ", err)
    }
    defer consumer.Close()
    
    // 先查询该 topic 有多少分区
    partitions, err := consumer.Partitions(c.Topic)
    if err != nil {
        log.Fatal("Partitions err: ", err)
    }
    var wg sync.WaitGroup
    wg.Add(len(partitions))
    // 然后每个分区开一个 goroutine 来消费
    for _, partitionId := range partitions {
        //不开异步会导致一个消费完才会消费另外一个
        go c.consumeByPartition(consumer, c.Topic, partitionId, &wg)
    }
    wg.Wait()

    }

    // 暂时只是业务一对一,也就是一个生产者产生的消息不会触发多个业务的变动
    // 但是可以开多个消费者增加处理能力
    func getOffsetCacheKey(topic string, partitionId int32) string {
    return fmt.Sprintf("kafka_offset_%s_%d", topic, partitionId)
    }

    func setConsumeOffset(topic string, partitionId int32, offset int64) {
    core.RedisBy(core.RedisTypeServer).SetInt64(getOffsetCacheKey(topic, partitionId), offset)
    }
    func getConsumeOffset(topic string, partitionId int32) (offset int64) {
    key := getOffsetCacheKey(topic, partitionId)
    if core.RedisBy(core.RedisTypeServer).Exists(key) {
    return core.RedisBy(core.RedisTypeServer).GetInt64(key) + 1
    }

    //默认从最新开始
    setConsumeOffset(topic, partitionId, sarama.OffsetNewest)
    return sarama.OffsetNewest

    }

    func (c *KafkaConsumer) consumeByPartition(consumer sarama.Consumer, topic string, partitionId int32, wg *sync.WaitGroup) {
    defer wg.Done()
    offset := getConsumeOffset(topic, partitionId)
    partitionConsumer, err := consumer.ConsumePartition(topic, partitionId, offset)
    if err != nil {
    log.Fatal("ConsumePartition err: ", err)
    }
    defer partitionConsumer.Close()
    for message := range partitionConsumer.Messages() {
    log.Printf("[Consumer] topic: %s ; partitionid: %d; offset:%d, value: %s\n", topic, message.Partition, message.Offset, string(message.Value))
    setConsumeOffset(topic, partitionId, message.Offset)
    c.MessageQueue <- message.Value
    }
    }

    package kafka

    import (
    "log"
    "testing"
    "vliao.com/stellar/internal/core"
    )

    func Test_Get(t *testing.T) {
    core.TestMain()
    topic := "test_log"
    var kafkaConsumer = NewKafkaConsumer(topic)

    kafkaConsumer.MessageQueue = make(chan []byte, 1000)
    go kafkaConsumer.Consume()
    
    for {
        msg := <-kafkaConsumer.MessageQueue
        deal(msg)
    }

    }

    func deal(msg []byte) {
    log.Printf(string(msg))
    }