KafkaProducer的整体逻辑
阅读原文时间:2023年09月30日阅读:2

KafkaProducer是用户向kafka servers发送消息的客户端。官网上对producer的记载如下:

  1. Kafka所有的节点都可以应答metadata的请求,这些metadata中包含了分区所对应的leader信息,而这些leader允许生产者直接将数据发送到分区leader所在的broker。这样子客户端就可以直接将数据发送给这些leader对应的broker中,而不用经过路由。

  2. 客户端可以通过继承接口来控制将消息发送到哪一个分区。用户可以随机发送,也可以通过特定的方式指定发送到某个特定的分区。

  3. 批处理是提升效率的一种方式,kafkaProducer可以在内存中积累数据,然后在通过一个请求将这些数据发送出去。并且数据量的大小和积累时间的长短都是可以控制的。

KafkaProducer包含在org.apache.kafka.clients这个包内。参照官方文档使用的时候也比较容易,下面是一个简单的例子。

package com.zjl.play;

import org.apache.kafka.clients.producer.*;
import org.apache.log4j.BasicConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;

import static com.zjl.play.ProducerConstant.*;

public class Producer {

    private static final Logger logger = LoggerFactory.getLogger(com.zjl.play.Producer.class);
    private KafkaProducer<String,String> producer;
    private Properties kafkaProperties = new Properties();
    private List<Future<RecordMetadata>> kafkaFutures;
    private String topic;

    public void process(List<String> events) {
        if (events == null) {
            logger.error("process list is null");
            return;
        }
        int processEvents = events.size();

        if (processEvents == 0) {
            logger.info("the number of process event is zero");
        }

        try {
            ProducerRecord<String, String> record;
            kafkaFutures.clear();
            for (String event : events) {
                long startTime = System.currentTimeMillis();
                Integer partitionId = null;
                String eventKey = null;
                record = new ProducerRecord(topic, partitionId, eventKey, event);
                kafkaFutures.add(producer.send(record, new ProducerCallback(startTime)));
            }
        } catch (Exception e) {
            logger.error("get exception: " + e.toString());
        }

        try {
            if (processEvents > 0) {
                for (Future<RecordMetadata> future : kafkaFutures) {
                    future.get();
                }
            }
        } catch (Exception e) {
            logger.error(e.toString());
        }

    }

    public void start() {
        kafkaFutures = new LinkedList<Future<RecordMetadata>>();
        producer = new KafkaProducer<String, String>(kafkaProperties);
    }

    public void stop() {
        producer.close();
    }

    public void loadKafkaProperties(String bootStrapServers) {
        kafkaProperties.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
        //Defaults overridden based on config
        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER);
        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER);
//        kafkaProperties.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX));
        kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        topic = DEFAULT_TOPIC;
    }

    public static void main(String args[]) {
        BasicConfigurator.configure();
        System.setProperty("log4j.configuration", "conf/log4j.properties");
        Producer producer = new Producer();
        producer.loadKafkaProperties("sha2hb06:9092");
        producer.start();
        List<String> testList = new ArrayList<String>();
        testList.add("123");
        testList.add("456");
        testList.add("789");
        producer.process(testList);
        producer.stop();
    }
}

class ProducerCallback implements Callback {
    private static final Logger logger = LoggerFactory.getLogger(ProducerCallback.class);
    private long startTime;

    public ProducerCallback(long startTime) {
        this.startTime = startTime;
    }

    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            logger.debug("Error sending message to Kafka {} ", exception.getMessage());
        }

        if (logger.isDebugEnabled()) {
            long eventElapsedTime = System.currentTimeMillis() - startTime;
            logger.debug("Acked message partition:{} ofset:{}",  metadata.partition(), metadata.offset());
            logger.debug("Elapsed time for send: {}", eventElapsedTime);
        }
    }
}

从上面的代码看,整个过程主要包含两部分。

  1. 创建KafkaProducer实例。
  2. 调用send()函数异步发送。

总体

看了KafkaProducer的源码,主要包含两个部分,获取集群的metadata,将消息发送到对应的broker中。整个消息的网络传输是通过NIO来实现的。

图1

图1中,Metadata里面保存了集群的topic信息,RecordAccmulator 类似一个队列,里面保存了要发送的内容,Sender会从RecordAccmulator队列中取出消息,并交给NetworkClient进行发送。

图2

图2表示了producer的代码层次,从下往上层层封装。

首先简要看下每层里面的代码结构,然后不断深入。

  • org.apache.kafka.clients.producer

    整个producer包里面是放着producer的客户端实现,以及和客户端相关的接口,用户实现接口来完成不同的功能。

    • KafkaProducer: producer客户端。
    • Partitioner: 分区接口,可以实现它来制定不同的分区策略。
    • ProducerInterceptor: 过滤接口,可以实现它来对数据进行过滤。
    • ProducerRecord: 封装发送到kafka的数据,里面除了消息,还有其他的一些相关属性值,例如topic,partition。
    • RecordMetadata: 封装了kafka server 返回的数据信息。
    • Callback: 回调接口
  • org.apache.kafka.clients.producer.internals

    • BufferPool: 一个ByteBuffers资源池,用来分配内存
    • DefaultPartitioner: 实现一个默认的分区方式,如果指定了partition,就使用它,然后如果指定了key,就使用hash,然后如果都没有,就轮训使用
    • ErrorLoggingCallback: 一个Callback实现方式
    • FutureRecordMetadata: The future result of a record send
    • ProducerRequestResult: 一个类封装了将一条信息发送到对应的一个partition后的返回结果。这里面有一个done 函数,调用它之后会提示对应的线程这条record已经处理完毕。
    • producerInterceptors:这个类是是一个容器,里面包含了一个的list 对象,list中是用户自定义的 ProducerInterceptor。 每条 record 在 序列化之前都会被list 中的每个 ProducerInterceptor 进行预处理。
    • RecordAccmulator:这个类维护了一个队列,保存了将要发送的records
    • RecordBatch: 一个类保存了一批将要发送的record
    • Sender: 这个类不断的从accumulator 里面获取records,并发送

Kafka producer

KafkaProducer 这个类相当于一个builder,它初始化了interceptors, accumulator, metadata, NetworkClient, Sender等多个对象。并启动了一个守护线程来不断地跑Sender.run函数。

    private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
            ....

            this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

            ....
            this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);

            ....
            this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE,
            ....
                    time);
            ....
            NetworkClient client = new NetworkClient(
                    ....
                    this.requestTimeoutMs, time);

            this.sender = new Sender(client,
                    ....
                    this.requestTimeoutMs);
            String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();
            ....
        } catch (Throwable t) {
            ....
        }
    }

KafkaProducer 的 send 函数实际上是将record 添加到 accumulator 队列中。

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

    /**
     * Implementation of asynchronously send a record to a topic.
     */
    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            // first make sure the metadata for the topic is available

            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            ....添加到accumulator中
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
            ....返回future 对象,里面保存了record发送的结果。
            return result.future;
            // handling exceptions and record the errors;
            // for API exceptions return them in the future,
            // for other exceptions throw directly
        } catch (ApiException e) {
        ....处理各种异常
        }
    }

而实际的发送是通过Sender的run 函数实现的。

    void run(long now) {
        获取到当前的集群信息
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        获取当前准备发送的partitions,获取的条件如下:
        1.record set 满了
        2.record 等待的时间达到了 lingerms
        3.accumulator 的内存满了
        4.accumulator 要关闭了
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
        如果有些partition没有leader信息,更新metadata
        // if there are any partitions whose leaders are not known yet, force metadata update
        if (!result.unknownLeaderTopics.isEmpty()) {
            // The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
            this.metadata.requestUpdate();
        }
        去掉那些不能发送信息的节点,能够发送的原因有:
        1.当前节点的信息是可以信赖的
        2.能够往这些节点发送信息
        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        获取要发送的records
        // create produce requests
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);
        保证发送的顺序
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<RecordBatch> batchList : batches.values()) {
                for (RecordBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        检查那些过期的records
        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        // update sensors
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);

        构建request并发送
        List<ClientRequest> requests = createProduceRequests(batches, now);
        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }
        将这些requests加入channel中
        for (ClientRequest request : requests)
            client.send(request, now);

        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        真正的发送消息
        this.client.poll(pollTimeout, now);
    }

上面的内容描述了producer这个层面消息发送的整体情况。通过上面的内容,我们知道producer是将消息放到了一个队列中,并通过一个线程不断的从这个队列中取内容,然后发送到服务器。在这个层面中,我们没有看到nio的一点影子,所有的发送请求都是通过调用org.apache.kafka.clients.client 这个包里面的函数进行的,实现了很好的封装。