KafkaProducer是用户向kafka servers发送消息的客户端。官网上对producer的记载如下:
Kafka所有的节点都可以应答metadata的请求,这些metadata中包含了分区所对应的leader信息,而这些leader允许生产者直接将数据发送到分区leader所在的broker。这样子客户端就可以直接将数据发送给这些leader对应的broker中,而不用经过路由。
客户端可以通过继承接口来控制将消息发送到哪一个分区。用户可以随机发送,也可以通过特定的方式指定发送到某个特定的分区。
批处理是提升效率的一种方式,kafkaProducer可以在内存中积累数据,然后在通过一个请求将这些数据发送出去。并且数据量的大小和积累时间的长短都是可以控制的。
KafkaProducer包含在org.apache.kafka.clients
这个包内。参照官方文档使用的时候也比较容易,下面是一个简单的例子。
package com.zjl.play;
import org.apache.kafka.clients.producer.*;
import org.apache.log4j.BasicConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import static com.zjl.play.ProducerConstant.*;
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(com.zjl.play.Producer.class);
private KafkaProducer<String,String> producer;
private Properties kafkaProperties = new Properties();
private List<Future<RecordMetadata>> kafkaFutures;
private String topic;
public void process(List<String> events) {
if (events == null) {
logger.error("process list is null");
return;
}
int processEvents = events.size();
if (processEvents == 0) {
logger.info("the number of process event is zero");
}
try {
ProducerRecord<String, String> record;
kafkaFutures.clear();
for (String event : events) {
long startTime = System.currentTimeMillis();
Integer partitionId = null;
String eventKey = null;
record = new ProducerRecord(topic, partitionId, eventKey, event);
kafkaFutures.add(producer.send(record, new ProducerCallback(startTime)));
}
} catch (Exception e) {
logger.error("get exception: " + e.toString());
}
try {
if (processEvents > 0) {
for (Future<RecordMetadata> future : kafkaFutures) {
future.get();
}
}
} catch (Exception e) {
logger.error(e.toString());
}
}
public void start() {
kafkaFutures = new LinkedList<Future<RecordMetadata>>();
producer = new KafkaProducer<String, String>(kafkaProperties);
}
public void stop() {
producer.close();
}
public void loadKafkaProperties(String bootStrapServers) {
kafkaProperties.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
//Defaults overridden based on config
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER);
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER);
// kafkaProperties.putAll(context.getSubProperties(KAFKA_PRODUCER_PREFIX));
kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
topic = DEFAULT_TOPIC;
}
public static void main(String args[]) {
BasicConfigurator.configure();
System.setProperty("log4j.configuration", "conf/log4j.properties");
Producer producer = new Producer();
producer.loadKafkaProperties("sha2hb06:9092");
producer.start();
List<String> testList = new ArrayList<String>();
testList.add("123");
testList.add("456");
testList.add("789");
producer.process(testList);
producer.stop();
}
}
class ProducerCallback implements Callback {
private static final Logger logger = LoggerFactory.getLogger(ProducerCallback.class);
private long startTime;
public ProducerCallback(long startTime) {
this.startTime = startTime;
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
logger.debug("Error sending message to Kafka {} ", exception.getMessage());
}
if (logger.isDebugEnabled()) {
long eventElapsedTime = System.currentTimeMillis() - startTime;
logger.debug("Acked message partition:{} ofset:{}", metadata.partition(), metadata.offset());
logger.debug("Elapsed time for send: {}", eventElapsedTime);
}
}
}
从上面的代码看,整个过程主要包含两部分。
看了KafkaProducer的源码,主要包含两个部分,获取集群的metadata,将消息发送到对应的broker中。整个消息的网络传输是通过NIO来实现的。
图1
图1中,Metadata里面保存了集群的topic信息,RecordAccmulator 类似一个队列,里面保存了要发送的内容,Sender会从RecordAccmulator队列中取出消息,并交给NetworkClient进行发送。
图2
图2表示了producer的代码层次,从下往上层层封装。
首先简要看下每层里面的代码结构,然后不断深入。
org.apache.kafka.clients.producer
整个producer包里面是放着producer的客户端实现,以及和客户端相关的接口,用户实现接口来完成不同的功能。
org.apache.kafka.clients.producer.internals
KafkaProducer 这个类相当于一个builder,它初始化了interceptors, accumulator, metadata, NetworkClient, Sender等多个对象。并启动了一个守护线程来不断地跑Sender.run函数。
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
....
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
....
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
....
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE,
....
time);
....
NetworkClient client = new NetworkClient(
....
this.requestTimeoutMs, time);
this.sender = new Sender(client,
....
this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
....
} catch (Throwable t) {
....
}
}
KafkaProducer 的 send 函数实际上是将record 添加到 accumulator 队列中。
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
/**
* Implementation of asynchronously send a record to a topic.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
....添加到accumulator中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
....返回future 对象,里面保存了record发送的结果。
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
....处理各种异常
}
}
而实际的发送是通过Sender的run 函数实现的。
void run(long now) {
获取到当前的集群信息
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
获取当前准备发送的partitions,获取的条件如下:
1.record set 满了
2.record 等待的时间达到了 lingerms
3.accumulator 的内存满了
4.accumulator 要关闭了
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
如果有些partition没有leader信息,更新metadata
// if there are any partitions whose leaders are not known yet, force metadata update
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
去掉那些不能发送信息的节点,能够发送的原因有:
1.当前节点的信息是可以信赖的
2.能够往这些节点发送信息
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
获取要发送的records
// create produce requests
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
保证发送的顺序
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
检查那些过期的records
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
sensors.updateProduceRequestMetrics(batches);
构建request并发送
List<ClientRequest> requests = createProduceRequests(batches, now);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}
将这些requests加入channel中
for (ClientRequest request : requests)
client.send(request, now);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
真正的发送消息
this.client.poll(pollTimeout, now);
}
上面的内容描述了producer这个层面消息发送的整体情况。通过上面的内容,我们知道producer是将消息放到了一个队列中,并通过一个线程不断的从这个队列中取内容,然后发送到服务器。在这个层面中,我们没有看到nio的一点影子,所有的发送请求都是通过调用org.apache.kafka.clients.client
这个包里面的函数进行的,实现了很好的封装。
手机扫一扫
移动阅读更方便
你可能感兴趣的文章