Springboot改造之配置--Kafka配置篇
阅读原文时间:2021年04月21日阅读:1

Kafka 作为目前应用十分广泛的分布式消息中间件技术,可以实时的处理大量数据以满足各种需求场景。下面就讲一下 Sprintboot 中 Kafka 的配置 。

Kafka 的代码集成度比较高,开发时只需要引入下面这个jar包就可以了:

 <!-- kafka -->
 <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>1.1.3.RELEASE</version>
  </dependency>

下面看下 Kafka 的主要配置:

Producer的配置类我用红色显示,Consumer的配置类我用橙色显示。

  • 下面对 Producer 的配置进行说明:

application.properties(基础属性配置)

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288

以上是基础配置,其余参数大家可以根据实际情况添加或调整。下面是kafka 集群的配置,由于各个环境的机器不同,所以在kafka-xxx.proporties (profiles路径下)中配置。

spring.kafka.bootstrap-servers=192.168.xx.xx:9092

Kafka Producer 配置

KafkaSendUtil.java(自定义的发送Kafka消息的工具类)

import com.alibaba.fastjson.JSONObject;
import com.mljr.acs.deposit.core.utils.SpringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;

/**
 * KAFKA消息发送器
 */
public class KafkaSendUtil {

    private static Logger logger = LoggerFactory.getLogger(KafkaSendUtil.class);

    public static void send(String topic, String message) {
        logger.info("kafka Producer发送消息,topic="+topic);
        KafkaTemplate<String, String> kafkaTemplate = (KafkaTemplate<String, String>) SpringUtil.getBean("kafkaTemplate");
        ListenableFuture<SendResult<String, String>> listenableFuture= kafkaTemplate.send(topic, message);
        sendCallBack(listenableFuture);
    }

    private static void sendCallBack(ListenableFuture<SendResult<String, String>> listenableFuture){
        try {
            SendResult<String,String> sendResult = listenableFuture.get();
            listenableFuture.addCallback(SuccessCallback -> {
                        logger.info("kafka Producer发送消息成功!topic=" + sendResult.getRecordMetadata().topic()+",partition"+sendResult.getRecordMetadata().partition()+",offset="+sendResult.getRecordMetadata().offset());
                    },
                    FailureCallback->logger.error("kafka Producer发送消息失败!sendResult=" + JSONObject.toJSONString(sendResult.getProducerRecord())));
        } catch (Exception e) {
            logger.error("获取producer返回值失败",e);
        }
    }
}

至此,Producer的配置就完成了,发送消息直接调用:KafkaSendUtil.send(topic, message) 就可以了。

  • 下面开始对 Consumer 的配置进行说明:

    application.properties(基础属性配置)

    spring.kafka.consumer.group-id=acs-deposit
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

KafkaMessageListener.java(消息监听处理器)

import com.alibaba.fastjson.JSONObject;
import com.mljr.acs.common.kafka.KafkaMessage;
import com.mljr.acs.common.kafka.KafkaMessageHeader;
import com.mljr.acs.deposit.core.utils.consts.KafkaTopicConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * KafkaListener
 */
@Component
public class KafkaMessageListener {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageListener.class);

    @KafkaListener(topics = {KafkaTopicConstants.MALM_NOTICE_DEPOSIT})
    public void handle1(String message) {
        logger.info("kafka handler处理类接收消息:" + JSONObject.toJSONString(message));

        KafkaMessage kafkaMessage = com.alibaba.fastjson.JSONObject.parseObject(message, KafkaMessage.class);
        KafkaMessageHeader header = kafkaMessage.getMessageHeader();
        String messageBody = kafkaMessage.getJsonString();

        switch (header.getMessageType()) {

            case "model1":
                System.out.println("请在此处添加自定义的业务逻辑,处理messageBody...");
                break;

            case "model2":
                System.out.println("请在此处添加自定义的业务逻辑,处理messageBody...");
                break;

            default:
                logger.info("没有对应的消息类型匹配, message={}", message);
        }
    }

    @KafkaListener(topics = {KafkaTopicConstants.MALM_NOTICE_DEPOSIT})
    public void handle2(String message) {
        ... ... ... ... ...
    }
}

上面的这段代码解释一下,采用 KafkaMessage 结构体定义消息有个好处:可以复用某个topic。比如公司很多应用的kafka集群是公用的,那么随着集群里添加的微服务增多,如果每个业务消息都生成一个topic, 会对kafka集群的消息处理产生很重的负担,因为每个topic的每个分区都是一个文件,文件数增多,消息处理的I/O效率会大幅下降。
所以如果每个微服务可以将涉及到的业务消息共用一个topic,会提高资源利用率,同时在众多消息中追溯消息的来源也更加简单。
所以设计了 KafkaMessage 数据结构,可以复用topic,实现如下:

KafkaMessage.java(消息定义)

import java.io.Serializable;

/**
 * Kafka消息,包含消息头和消息体
 */
public class KafkaMessage implements Serializable,Cloneable {

    private static final long serialVersionUID = -3626421563498382832L;

    /**
     * 统一消息头
     */
    protected KafkaMessageHeader messageHeader;

    /**
     * JSON格式消息体
     */
    private String jsonString;

    public KafkaMessage(KafkaMessageHeader messageHeader, String jsonString){
        this.messageHeader = messageHeader;
        this.jsonString = jsonString;
    }

    public KafkaMessageHeader getMessageHeader() {
        return messageHeader;
    }

    public void setMessageHeader(KafkaMessageHeader messageHeader) {
        this.messageHeader = messageHeader;
    }

    public String getJsonString() {
        return jsonString;
    }

    public void setJsonString(String jsonString) {
        this.jsonString = jsonString;
    }
}

KafkaMessageHeader.java(消息头定义)

import java.io.Serializable;
import java.util.Date;

public class KafkaMessageHeader implements Serializable,Cloneable {

    private static final long serialVersionUID = -362384753498698092L;

    //消息惟一标识
    private String serialNo;

    //系统关键业务单号
    private String systemSourceId;

    //创建时间
    private Date createTime;

    //消息类型,用于区分不同业务消息
    private String messageType;

    public String getSerialNo() {
        return serialNo;
    }

    public void setSerialNo(String serialNo) {
        this.serialNo = serialNo;
    }

    public String getSystemSourceId() {
        return systemSourceId;
    }

    public void setSystemSourceId(String systemSourceId) {
        this.systemSourceId = systemSourceId;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public String getMessageType() {
        return messageType;
    }

    public void setMessageType(String messageType) {
        this.messageType = messageType;
    }
}

messageType 字段相当于之前每种业务消息对应的topic,所以复用topic的关键就在于这个 messageType。

MessageType.java(消息类型枚举,可以每个微服务定义这样一个枚举类)

public enum  MessageType {
    LOG_SETTLE
}