3 Star 3 Fork 2

Gitee 极速下载 / KafkaBridge

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/Qihoo360/kafkabridge
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
MIT

简介 English

  • Qbusbridge 是 pub-sub 消息系统的客户端 SDK,目前它支持:

    用户可以通过修改配置切换到任意一个 pub-sub 消息系统。默认配置是访问 Kafka,如果想要切换到 Pulsar,需要修改配置为:

    mq.type=pulsar
    # Other configs for Pulsar...

    更多细节见 config

  • Qbusbridge-Kafka 底层基于 librdkafka, 与之相比封装了大量的使用细节,简单易用,使用者无需了解过多的Kafka系统细节,只需调用极少量的接口,就可完成消息的生产和消费;

  • 针对使用者比较关心的消息生产的可靠性,作了近一步的提升

特点

  • 支持多种语言:c++、php、python、golang, 且各语言接口完全统一;
  • 接口少,简单易用;
  • 针对高级用户,支持通过配置文件调整所有的librdkafka的配置;
  • 在非按key写入数据的情况下,尽最大努力将消息成功写入;
  • 支持同步和异步两种数据写入方式;
  • 在消费时,除默认自动提交offset外,允许用户通过配置手动提交offset;
  • 在php-fpm场景中,复用长连接生产消息,避免频繁创建断开连接的开销;

编译

确保你的系统上安装了 g++ (>= 4.8.5), boost (>= 1.41),cmake (>= 3.1),swig (>= 3.0.12)。

git clone

git clone --recursive https://github.com/Qihoo360/qbusbridge.git

此外,qbus SDK 静态链接到 libstdc++,因此必须确保 libstdc++.a 存在。对于 CentOS 用户,运行:

sudo yum install -y glibc-static libstdc++-static

1. 安装子模块

运行./build_dependencies.sh

它会自动下载子模块,并将其安装到cxx/thirdparts/local,即CMakeLists.txt查找头文件和库文件的目录。

./cxx/thirdparts/local

include/
  librdkafka/
    rdkafka.h
  log4cplus/
    logger.h
lib/
  librdkafka.a
  liblog4cplus.a

2. 编译SDK

C/C++

进入cxx目录,执行./build.sh,会生成以下文件:

include/
  qbus_consumer.h
  qbus_producer.h
lib/
  debug/libQBus.so
  release/libQBus.so

虽然构建 C++ SDK 需要 C++11 支持,但是 SDK 也可以被旧版本 g++ 使用。比如,使用 g++ 4.8.5 编译 qbus SDK,使用 g++ 4.4.7 使用 qbus SDK。

Go

进入golang目录,执行./build.sh,会生成以下文件:

gopath/
  src/
    qbus/
      qbus.go
      libQBus_go.so

可以运行 USE_GO_MOD=1 ./build.sh 来启用 go module,此时会生成以下文件:

examples/
  go.mod
  qbus/
    qbus.go
    go.mod
    libQBus_go.so

Python

进入python目录,执行./build.sh,会生成以下文件:

examples/
  qbus.py
  _qbus.so

PHP

进入php目录,执行./build.sh,会生成以下文件:

examples/
  qbus.php
  qbus.so

3. 编译示例程序

C/C++

进入examples子目录,运行 ./build.sh [debug|release] 生成可执行文件,其中 debug 是使用 lib/debug 目录下的 libQBus.sorelease 是使用 lib/release 目录下的 libQBus.so。运行make clean删除它们。

如果要运行自己的程序,可以参考Makefile文件。

Go

进入examples子目录,运行./build.sh生成可执行文件,运行./clean.sh删除它们。

运行可执行文件时把libQBus_go.so路径加入LD_LIBRARY_PATH环境变量:

export LD_LIBRARY_PATH=$PWD/gopath/src/qbus:$LD_LIBRARY_PATH

如果要运行自己的程序,将生成的gopath目录加入GOPATH环境变量,或者将gopath/src/qbus目录移动到$GOPATH/src下。

Python

将生成的qbus.py_qbus.so拷贝至要运行的Python脚本同一路径即可。

PHP

编辑php.ini文件,添加extension=<module-path><module-path>qbus.so路径。

使用

数据生产

  • 在非按key写入的情况下,sdk尽最大努力提交每一条消息,只要Kafka集群存有一台broker正常,就会重试发送;
  • 每次写入数据只需要调用produce接口,在异步发送的场景下,通过返回值可以判断发送队列是否填满,发送队列可通过配置文件调整;
  • 在同步发送的场景中,produce接口返回当前消息是否写入成功,但是写入性能会有所下降,CPU使用率会有所上升,推荐还是使用异步写入方式;。
  • 下面是生产接口,以c++为例:
bool QbusProducer::init(const string& broker_list,
                        const string& log_path,
                        const string& config_path,
                        const string& topic_name);
bool QbusProducer::produce(const char* data,
                           size_t data_len,
                           const std::string& key);
void QbusProducer::uninit();
  • c++ sdk的使用范例:
#include <string>
#include <iostream>
#include "qbus_producer.h"

int main(int argc, const char* argv[]) {
    qbus::QbusProducer qbus_producer;
    if (!qbus_producer.init("127.0.0.1:9092",
                    "./log",
                    "./config",
                    "topic_test")) {
        std::cout << "Failed to init" << std::endl;
        return 0;
    }

    std::string msg("test\n");
    if (!qbus_producer.produce(msg.c_str(), msg.length(), "key")) {
        std::cout << "Failed to produce" << std::endl;
    }

    qbus_producer.uninit();

    return 0;
}

数据消费

  • 消费只需调用subscribeOne订阅topic(也支持同时订阅多个topic),然后执行start就开始消费,当前进程非阻塞,每条消息通过callback接口回调给使用者;
  • sdk还支持用户手动提交offset方式,用户可以通过callback中返回的消息体,在代码其他逻辑中进行提交。
  • 下面是消费接口,以c++为例:
bool QbusConsumer::init(const std::string& broker_list,
                        const std::string& log_path,
                        const std::string& config_path,
                        const QbusConsumerCallback& callback);
bool QbusConsumer::subscribeOne(const std::string& group, const std::string& topic);
bool QbusConsumer::subscribe(const std::string& group,
                             const std::vector<std::string>& topics);
bool QbusConsumer::start();
void QbusConsumer::stop();
bool QbusConsumer::pause(const std::vector<std::string>& topics);
bool QbusConsumer::resume(const std::vector<std::string>& topics);
  • c++ sdk的使用范例:
#include <iostream>
#include "qbus_consumer.h"

qbus::QbusConsumer qbus_consumer;
class MyCallback: public qbus::QbusConsumerCallback {
    public:
        virtual void deliveryMsg(const std::string& topic,
                    const char* msg,
                    const size_t msg_len) const {
            std::cout << "topic: " << topic << " | msg: " << std::string(msg, msg_len) << std::endl;
        }

};

int main(int argc, char* argv[]) {
    MyCallback my_callback;
    if (qbus_consumer.init("127.0.0.1:9092",
                    "log",
                    "config",
                    my_callback)) {
        if (qbus_consumer.subscribeOne("groupid_test", "topic_test")) {
            if (!qbus_consumer.start()) {
                std::cout << "Failed to start" << std::endl;
                return NULL;
            }

            while (1) sleep(1);  //可以执行其他业务逻辑

            qbus_consumer.stop();
        } else {
            std::cout << "Failed subscribe" << std::endl;
        }
    } else {
        std::cout << "Failed init" << std::endl;
    }
    return 0;
}

可以用pause()resume()方法来暂停或恢复某些主题的消费,具体示例见qbus_pause_resume_example.cc

更多API使用方法参考C examplesC++ examplesGo examplesPython examplesPHP examples目录下的示例代码。

配置

配置文件是INI格式:

[global]

[topic]

[sdk]

globaltopic配置见rdkafka 1.0.x configurationsdk配置见sdk configuration

通常情况下kafkabridge使用空配置文件即可工作,但是如果broker版本低于0.10.0.0,必须添加api.version相关的配置,见broker version compatibility.

例如,对0.9.0.1版本的broker,必须添加以下配置:

[global]
api.version.request=false
broker.version.fallback=0.9.0.1

当前配置和 broker 0.9.0.1 兼容。因此,如果使用了高版本的 broker,api.version.request 应该配置为 true。否则消息协议会使用旧版本,比如,没有时间戳字段。

联系我们

QQ 群:876834263

MIT License Copyright (c) 2018 Qihoo 360 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

简介

KafkaBridge 是奇虎 360 开源的 Kafka 客户端 SDK ,底层基于 librdkafka ,与之相比封装了大量的使用细节,简单易用,使用者无需了解过多的 Kaf 展开 收起
C/C++ 等 6 种语言
MIT
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
C/C++
1
https://gitee.com/mirrors/KafkaBridge.git
git@gitee.com:mirrors/KafkaBridge.git
mirrors
KafkaBridge
KafkaBridge
master

搜索帮助