PHP处理kafka消息队列
阅读原文时间:2023年07月15日阅读:1

安装php-kafka 扩展后,就可以开始编写 php 消费消息的脚本了,php-rdkafka 扩展提供了几种消息处理的方式

这种方式没有消费组的概念

setLogLevel(LOG\_DEBUG); // 指定 broker 地址,多个地址用"," 分割 $rk->addBrokers("192.168.33.1:9092"); $topic = $rk->newTopic("test"); $topic->consumeStart(0, RD\_KAFKA\_OFFSET\_BEGINNING); while (true) { // 第一个参数是分区号 // 第二个参数是超时时间 $msg = $topic->consume(0, 1000); if ($msg->err) { echo $msg->errstr(), "\\n"; break; } else { echo $msg->payload, "\\n"; } } 这种方式可以指定消费组,一个消费组内,一个consumer 进程只能读取一个分区, setRebalanceCb(function (RdKafka\\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD\_KAFKA\_RESP\_ERR\_\_ASSIGN\_PARTITIONS: echo "Assign: "; var\_dump($partitions); $kafka->assign($partitions); break; case RD\_KAFKA\_RESP\_ERR\_\_REVOKE\_PARTITIONS: echo "Revoke: "; var\_dump($partitions); $kafka->assign(NULL); break; default: throw new \\Exception($err); } }); // 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。 $conf->set('group.id', 'myConsumerGroup1'); //添加 kafka集群服务器地址 $conf->set('metadata.broker.list', '192.168.33.1:9092'); $topicConf = new RdKafka\\TopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning //当没有初始偏移量时,从哪里开始读取 $topicConf->set('auto.offset.reset', 'smallest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\\KafkaConsumer($conf); // 让消费者订阅log 主题 $consumer->subscribe(\['log'\]); while (true) { $message = $consumer->consume(120\*1000); switch ($message->err) { case RD\_KAFKA\_RESP\_ERR\_NO\_ERROR: var\_dump($message); break; case RD\_KAFKA\_RESP\_ERR\_\_PARTITION\_EOF: echo "No more messages; will wait for more\\n"; break; case RD\_KAFKA\_RESP\_ERR\_\_TIMED\_OUT: echo "Timed out\\n"; break; default: throw new \\Exception($message->errstr(), $message->err); break; } } ?>