springboot集成的kafka如何订阅多个topic呢??
阅读原文时间:2021年04月21日阅读:1

@springboot集成的kafka消费遇到的问题

       这是一篇寻求帮助的博客,也是我第一次写,不太规范什么的还请不要在意。直接步入正题吧,我在用springboot
   集成的kafka进行解析消费的时候遇到的问题。在我解析第一个topic的时候是可以正确解析并消费的;然后我解析第二
   个topic也是可以正确消费的。但是当我想要让他们一起消费的时候就出现了这样的错误。(过后展示消费的具体代码)

这个错误我也搜索过,但是作为一个经验并不丰富的小白来讲,着实不明白。大部分出来的都是关于hadoop的文章,所以我把重点放在了订阅topic上。根据好多网上看到的博客,我是这样写,还有配置文件。

Consumer

package com.example.kafka;


import com.example.data.*;
import com.example.protobuf.CfetsMarketData;
import com.google.protobuf.InvalidProtocolBufferException;
import com.zork.sip2.UiMsgCallback;

import kafka.utils.Json;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;


@Component
public class Consumer implements UiMsgCallback {

    Logger logger = Logger.getLogger(Consumer.class);

    @Autowired
    MongoTemplate mongoTemplate;
//    @KafkaListener(topics = "#{'${spring.kafkaListenerList}'.split(',')}")  //指定订阅的topic
/*
*!!!这里是在网上借鉴的方法
*      @KafkaListener(topics = "#{'${topics}'.split(',')}")
*/

    @KafkaListener(topics = "topic2")
    public void receiver(byte[] tags){   //以byte[]数组接受消息

        logger.debug("========共收到" + tags.length + "条消息=========");           

            SipTagMsg sipTagMsg =  new SipTagMsg();//前40个头信息
            sipTagMsg.parse(tags);
            short msgData = sipTagMsg.getMsgType();
            QuoteBufferParse quoteBufferParse = null;
            switch (msgData){
                 case QuoteBufferParse.MSGTYPE_CFFEX_MARKETDATA:
                    quoteBufferParse = new CffexData();
                 break;

                 case QuoteBufferParse.CFETS_MARKET_DATA:
                    quoteBufferParse = new CfetsMarketData();
                    break;
//               default:
//               logger.debug("tags==="+sipTagMsg.getCode()+"msgType="+sipTagMsg.getMsgType());
            }

            if(quoteBufferParse!=null){
//                quoteBufferParse.parse(sipTagMsg.getMsgData());
//                CffexData shfe = (CffexData) quoteBufferParse;
                mongoTemplate.insert(shfe);
//                System.out.println("topic1消费信息-----"+shfe.toString());

                try {
//                    CfetsMarketData.Pb_CfetsTradeMarketData data1 = CfetsMarketData.Pb_CfetsTradeMarketData.parseFrom(sipTagMsg.getMsgData());
                    CfetsMarketData.Pb_CfetsTradeMarketDataSubscribeReceiveMessage data4 =  CfetsMarketData.Pb_CfetsTradeMarketDataSubscribeReceiveMessage.parseFrom(sipTagMsg.getMsgData());
//                    CfetsMarketData.Pb_CfetsTradeMarketData data2 = CfetsMarketData.Pb_CfetsTradeMarketData.parser();
                    System.out.println("topic2消费消息----"+data4.toString());
                } catch (InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }
            }

    @Override
    public void onTagMsgs(com.zork.sip2.SipTagMsg[] sipTagMsgs) {

    }

    @Override
    public void onNotify(String s, int i) {

    }

}

application.properties

spring.kafka.bootstrap-servers=10.187.161.251:9092
spring.kafka.template.default-topic= topic2
#这里也是借鉴的(下面这条)
#topics = topic1,topic2
#spring.kafkaListenerList = topic1,topic2

spring.data.mongodb.host= 10.187.161.251
spring.data.mongodb.port= 27017
spring.data.mongodb.database= test

server.port= 8082


spring.kafka.producer.retries= 0
spring.kafka.producer.batch-size= 16384
spring.kafka.producer.buffer-memory= 33554432
spring.kafka.producer.acks= 1
#spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer= org.apache.kafka.common.serialization.StringSerializer

#读取位置
spring.kafka.consumer.auto-offset-reset= latest
#自动提交
spring.kafka.consumer.enable-auto-commit= true
#心跳间隔
spring.kafka.consumer.auto-commit-interval= 100ms
spring.kafka.consumer.group-id= GruopId
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

要强调的我在代码中加了注释,如果分别单独消费这两个主题都是没有问题的,但是一起消费就不行了。springboot的@KafkaListener中topics应该是可以多个订阅topic消费的呀。
其中第一个topic是直接转型解析的,第二个是用了那个protobuf来解析的,这是唯一的区别。
求大神告知应该怎么办,或者其他的什么方法。
自己研究解析结果在消费卡住了……
如果你有什么想法的话可以和我具体聊聊,我非常急迫的需要你呀