技术分享 使用php操作kafka

jim · 2019-04-10 19:48:54 · 热度: 54

1:php的kafka扩展安装

1:安装librdkafka库接口,项目地址:https://github.com/edenhill/librdkafka

//下载librdkafka安装包:我下载的是0.9.4版本的库
$ wget https://github.com/edenhill/librdkafka/archive/0.9.4.x.zip
//解压,安装
$ unzip librdkafka-0.9.4.x.zip
$ cd librdkafka-0.9.4
$ make && make install

2:安装php-rdkafka扩展,项目地址:https://github.com/arnaud-lb/php-rdkafka

//下载
$ wget https://github.com/arnaud-lb/php-rdkafka/archive/master.zip
//解压
$ unzip php-rdkafka-master.zip
//安装 php 的php-rdkafka扩展
$ cd php-rdkafka
/usr/local/php56/bin/phpize
$ ./configure --with-php-config=/usr/local/php56/bin/php-config
$ make && make install
//编译完成后在/usr/local/php56/lib/php/extensions/no-debug-zts-20131226目录中查看是否有rdkafka.so文件

将rdkafka的扩展写入php.ini文件

extension=rdkafka.so

使用 php -m 查看php的rdkafka扩展是否安装完成

2:使用rdkafka扩展

参考文档:https://arnaud-lb.github.io/php-rdkafka/phpdoc/book.rdkafka.html

1:高级consumer接口

<php
$conf = new RdKafka\Conf();
//Set a rebalance callback to log partition assignments (optional) 
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;

         case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
             echo "Revoke: ";
             var_dump($partitions);
             $kafka->assign(NULL);
             break;

         default:
            throw new \Exception($err);
    }
});

//Configure the group.id. All consumer with the same group.id will consume
//different partitions.
$conf->set('group.id', 'test-consumer-group');

$conf->set('metadata.broker.list', '192.168.16.131,192.168.16.132');

$topicConf = new RdKafka\TopicConf();

//Set where to start consuming messages when there is no initial offset in
//offset store or the desired offset is out of range.
//'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

//Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

//Subscribe to topic 'test'
$consumer->subscribe(['zoot9']);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

猜你喜欢:
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册