springboot 集成kafka 实现多个customer不同group
阅读原文时间:2021年04月21日阅读:1

springboot正常集成kafka

这个网上很多资料都有些集成,我就不浪费太多篇幅和时间了,笔者找了篇还算很容易理解的博客,自行学习

https://blog.csdn.net/tzs_1041218129/article/details/78988439

多个customer不同group

刚那篇博客,很容易就能上手,那么问题来了,group-id在配置文件中设置,我要是想要有不同的groupid怎么办?

莫慌,看看@KafkaListener 注解,里面就有group属性可以设置啊,感觉还是比较人性化的吗?

@KafkaListener(group = "= =!!!")

马上试试。。。。

配置文件设置consumer的group-id为myGroup。

@Component
public class KafkaConsumer {

    @KafkaListener(id = "test1", topics="test-topic", group = "test1")
    public void processMessage(String content) throws InterruptedException {
            System.out.println("收到消息 1=>" + content);
        }

    @KafkaListener(id = "test2", topics="test-topic", group = "test2")
    public void processMessage1(String content) throws InterruptedException {
            System.out.println("收到消息 2=>" + content);
        }
}

启动,额,控制台打印

[           main] xxx    : Kafka version : 0.10.1.1
[           main] xxx    : Kafka commitId : f10ef2720b03b247
[           main] xxx    : Tomcat started on port(s): 82 (http)
[           main] xxx    : Started App in 3.653 seconds (JVM running for 4.119)
[    test2-0-C-1] xxx    : Discovered coordinator 192.168.52.133:9092 (id: 2147483644 rack: null) for group myGroup.
[    test1-0-C-1] xxx    : Discovered coordinator 192.168.52.133:9092 (id: 2147483644 rack: null) for group myGroup.

最后几行???????我猜的没错的话,两个listener都被归属到myGroup去了。。

嗯哼??说好的人性化呢?TM压根没反应,没用啊?

再试多几次,好吧,我放弃了,反正是没用。

绝望,好吧,问题还是要解决的。

最后在一阵搜寻下,找到了一个办法,请看:

那就是消费者,不要用配置文件配置的方式

细心的话,会发现@KafkaListener 注解,里面有一个containerFactory参数,就是让你指定容器工厂的

动手吧。

新建一个KafkaConsumerConfig类,代码如下,指定了两个容器,也就两个group

分别为kafkaListenerContainerFactory1和kafkaListenerContainerFactory2

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

@Configuration
public class KafkaConsumerConfig {

    private String brokers = "192.168.52.130:9092,192.168.52.131:9092,192.168.52.133:9092";

    private String group1 = "test1";
    private String group2 = "test2";

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory1() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory1());
        factory.setConcurrency(4);
        factory.getContainerProperties().setPollTimeout(4000);
        return factory;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory2() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory2());
        factory.setConcurrency(4);
        factory.getContainerProperties().setPollTimeout(4000);
        return factory;
    }

    public Map<String, Object> getCommonPropertis() {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return properties;
    }


    public ConsumerFactory<String, String> consumerFactory1() {
        Map<String, Object> properties = getCommonPropertis();
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);
        return new DefaultKafkaConsumerFactory<String, String>(properties);
    }

    public ConsumerFactory<String, String> consumerFactory2() {
         Map<String, Object> properties = getCommonPropertis();
         properties.put(ConsumerConfig.GROUP_ID_CONFIG, group2);
         return new DefaultKafkaConsumerFactory<String, String>(properties);
    }
}

上面代码中,其实,很多配置项,你也可以直接用@value的方式,从配置文件中读取过来,那么需要修改参数值的时候,就直接更改配置文件就行了,这点相信就不用教了,不懂的网上一搜一堆。

最后,在@KafkaListener 中指定容器名称

@KafkaListener(id="test1",topics = "test-topic", containerFactory="kafkaListenerContainerFactory1")
@KafkaListener(id="test2",topics = "test-topic", containerFactory="kafkaListenerContainerFactory2")

启动,你就会发现,卧槽,还真可以

[           main] xxx     : Kafka version : 0.10.1.1
[           main] xxx     : Kafka commitId : f10ef2720b03b247
[           main] xxx     : Tomcat started on port(s): 82 (http)
[           main] xxx     : Started App in 3.913 seconds (JVM running for 4.321)
[    test2-0-C-1] xxx     : Discovered coordinator 192.168.52.133:9092 (id: 2147483644 rack: null) for group test2.
[    test1-0-C-1] xxx     : Discovered coordinator 192.168.52.131:9092 (id: 2147483645 rack: null) for group test1.

至此,就实现了多个customer不同group的功能,亲测有效。

感谢您的阅读。

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器