这是一篇寻求帮助的博客,也是我第一次写,不太规范什么的还请不要在意。直接步入正题吧,我在用springboot
集成的kafka进行解析消费的时候遇到的问题。在我解析第一个topic的时候是可以正确解析并消费的;然后我解析第二
个topic也是可以正确消费的。但是当我想要让他们一起消费的时候就出现了这样的错误。(过后展示消费的具体代码)
这个错误我也搜索过,但是作为一个经验并不丰富的小白来讲,着实不明白。大部分出来的都是关于hadoop的文章,所以我把重点放在了订阅topic上。根据好多网上看到的博客,我是这样写,还有配置文件。
Consumer
package com.example.kafka;
import com.example.data.*;
import com.example.protobuf.CfetsMarketData;
import com.google.protobuf.InvalidProtocolBufferException;
import com.zork.sip2.UiMsgCallback;
import kafka.utils.Json;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer implements UiMsgCallback {
Logger logger = Logger.getLogger(Consumer.class);
@Autowired
MongoTemplate mongoTemplate;
// @KafkaListener(topics = "#{'${spring.kafkaListenerList}'.split(',')}") //指定订阅的topic
/*
*!!!这里是在网上借鉴的方法
* @KafkaListener(topics = "#{'${topics}'.split(',')}")
*/
@KafkaListener(topics = "topic2")
public void receiver(byte[] tags){ //以byte[]数组接受消息
logger.debug("========共收到" + tags.length + "条消息=========");
SipTagMsg sipTagMsg = new SipTagMsg();//前40个头信息
sipTagMsg.parse(tags);
short msgData = sipTagMsg.getMsgType();
QuoteBufferParse quoteBufferParse = null;
switch (msgData){
case QuoteBufferParse.MSGTYPE_CFFEX_MARKETDATA:
quoteBufferParse = new CffexData();
break;
case QuoteBufferParse.CFETS_MARKET_DATA:
quoteBufferParse = new CfetsMarketData();
break;
// default:
// logger.debug("tags==="+sipTagMsg.getCode()+"msgType="+sipTagMsg.getMsgType());
}
if(quoteBufferParse!=null){
// quoteBufferParse.parse(sipTagMsg.getMsgData());
// CffexData shfe = (CffexData) quoteBufferParse;
mongoTemplate.insert(shfe);
// System.out.println("topic1消费信息-----"+shfe.toString());
try {
// CfetsMarketData.Pb_CfetsTradeMarketData data1 = CfetsMarketData.Pb_CfetsTradeMarketData.parseFrom(sipTagMsg.getMsgData());
CfetsMarketData.Pb_CfetsTradeMarketDataSubscribeReceiveMessage data4 = CfetsMarketData.Pb_CfetsTradeMarketDataSubscribeReceiveMessage.parseFrom(sipTagMsg.getMsgData());
// CfetsMarketData.Pb_CfetsTradeMarketData data2 = CfetsMarketData.Pb_CfetsTradeMarketData.parser();
System.out.println("topic2消费消息----"+data4.toString());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
@Override
public void onTagMsgs(com.zork.sip2.SipTagMsg[] sipTagMsgs) {
}
@Override
public void onNotify(String s, int i) {
}
}
application.properties
spring.kafka.bootstrap-servers=10.187.161.251:9092
spring.kafka.template.default-topic= topic2
#这里也是借鉴的(下面这条)
#topics = topic1,topic2
#spring.kafkaListenerList = topic1,topic2
spring.data.mongodb.host= 10.187.161.251
spring.data.mongodb.port= 27017
spring.data.mongodb.database= test
server.port= 8082
spring.kafka.producer.retries= 0
spring.kafka.producer.batch-size= 16384
spring.kafka.producer.buffer-memory= 33554432
spring.kafka.producer.acks= 1
#spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer= org.apache.kafka.common.serialization.StringSerializer
#读取位置
spring.kafka.consumer.auto-offset-reset= latest
#自动提交
spring.kafka.consumer.enable-auto-commit= true
#心跳间隔
spring.kafka.consumer.auto-commit-interval= 100ms
spring.kafka.consumer.group-id= GruopId
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
要强调的我在代码中加了注释,如果分别单独消费这两个主题都是没有问题的,但是一起消费就不行了。springboot的@KafkaListener中topics应该是可以多个订阅topic消费的呀。
其中第一个topic是直接转型解析的,第二个是用了那个protobuf来解析的,这是唯一的区别。
求大神告知应该怎么办,或者其他的什么方法。
自己研究解析结果在消费卡住了……
如果你有什么想法的话可以和我具体聊聊,我非常急迫的需要你呀
手机扫一扫
移动阅读更方便
你可能感兴趣的文章