Qbusbridge 是 pub-sub 消息系统的客户端 SDK,目前它支持:
用户可以通过修改配置切换到任意一个 pub-sub 消息系统。默认配置是访问 Kafka,如果想要切换到 Pulsar,需要修改配置为:
mq.type=pulsar
# Other configs for Pulsar...
更多细节见 config。
Qbusbridge-Kafka 底层基于 librdkafka, 与之相比封装了大量的使用细节,简单易用,使用者无需了解过多的Kafka系统细节,只需调用极少量的接口,就可完成消息的生产和消费;
针对使用者比较关心的消息生产的可靠性,作了近一步的提升
确保你的系统上安装了 g++ (>= 4.8.5), boost (>= 1.41),cmake (>= 3.1),swig (>= 3.0.12)。
git clone --recursive https://github.com/Qihoo360/qbusbridge.git
此外,qbus SDK 静态链接到 libstdc++,因此必须确保 libstdc++.a
存在。对于 CentOS 用户,运行:
sudo yum install -y glibc-static libstdc++-static
运行./build_dependencies.sh
。
它会自动下载子模块,并将其安装到cxx/thirdparts/local
,即CMakeLists.txt
查找头文件和库文件的目录。
见 ./cxx/thirdparts/local
:
include/
librdkafka/
rdkafka.h
log4cplus/
logger.h
lib/
librdkafka.a
liblog4cplus.a
进入cxx
目录,执行./build.sh
,会生成以下文件:
include/
qbus_consumer.h
qbus_producer.h
lib/
debug/libQBus.so
release/libQBus.so
虽然构建 C++ SDK 需要 C++11 支持,但是 SDK 也可以被旧版本 g++ 使用。比如,使用 g++ 4.8.5 编译 qbus SDK,使用 g++ 4.4.7 使用 qbus SDK。
进入golang
目录,执行./build.sh
,会生成以下文件:
gopath/
src/
qbus/
qbus.go
libQBus_go.so
可以运行 USE_GO_MOD=1 ./build.sh
来启用 go module,此时会生成以下文件:
examples/
go.mod
qbus/
qbus.go
go.mod
libQBus_go.so
进入python
目录,执行./build.sh
,会生成以下文件:
examples/
qbus.py
_qbus.so
进入php
目录,执行./build.sh
,会生成以下文件:
examples/
qbus.php
qbus.so
进入examples
子目录,运行 ./build.sh [debug|release]
生成可执行文件,其中 debug
是使用 lib/debug
目录下的 libQBus.so
,release
是使用 lib/release
目录下的 libQBus.so
。运行make clean
删除它们。
如果要运行自己的程序,可以参考Makefile
文件。
进入examples
子目录,运行./build.sh
生成可执行文件,运行./clean.sh
删除它们。
运行可执行文件时把libQBus_go.so
路径加入LD_LIBRARY_PATH
环境变量:
export LD_LIBRARY_PATH=$PWD/gopath/src/qbus:$LD_LIBRARY_PATH
如果要运行自己的程序,将生成的gopath
目录加入GOPATH
环境变量,或者将gopath/src/qbus
目录移动到$GOPATH/src
下。
将生成的qbus.py
和_qbus.so
拷贝至要运行的Python脚本同一路径即可。
编辑php.ini
文件,添加extension=<module-path>
,<module-path>
为qbus.so
路径。
bool QbusProducer::init(const string& broker_list,
const string& log_path,
const string& config_path,
const string& topic_name);
bool QbusProducer::produce(const char* data,
size_t data_len,
const std::string& key);
void QbusProducer::uninit();
#include <string>
#include <iostream>
#include "qbus_producer.h"
int main(int argc, const char* argv[]) {
qbus::QbusProducer qbus_producer;
if (!qbus_producer.init("127.0.0.1:9092",
"./log",
"./config",
"topic_test")) {
std::cout << "Failed to init" << std::endl;
return 0;
}
std::string msg("test\n");
if (!qbus_producer.produce(msg.c_str(), msg.length(), "key")) {
std::cout << "Failed to produce" << std::endl;
}
qbus_producer.uninit();
return 0;
}
bool QbusConsumer::init(const std::string& broker_list,
const std::string& log_path,
const std::string& config_path,
const QbusConsumerCallback& callback);
bool QbusConsumer::subscribeOne(const std::string& group, const std::string& topic);
bool QbusConsumer::subscribe(const std::string& group,
const std::vector<std::string>& topics);
bool QbusConsumer::start();
void QbusConsumer::stop();
bool QbusConsumer::pause(const std::vector<std::string>& topics);
bool QbusConsumer::resume(const std::vector<std::string>& topics);
#include <iostream>
#include "qbus_consumer.h"
qbus::QbusConsumer qbus_consumer;
class MyCallback: public qbus::QbusConsumerCallback {
public:
virtual void deliveryMsg(const std::string& topic,
const char* msg,
const size_t msg_len) const {
std::cout << "topic: " << topic << " | msg: " << std::string(msg, msg_len) << std::endl;
}
};
int main(int argc, char* argv[]) {
MyCallback my_callback;
if (qbus_consumer.init("127.0.0.1:9092",
"log",
"config",
my_callback)) {
if (qbus_consumer.subscribeOne("groupid_test", "topic_test")) {
if (!qbus_consumer.start()) {
std::cout << "Failed to start" << std::endl;
return NULL;
}
while (1) sleep(1); //可以执行其他业务逻辑
qbus_consumer.stop();
} else {
std::cout << "Failed subscribe" << std::endl;
}
} else {
std::cout << "Failed init" << std::endl;
}
return 0;
}
可以用pause()
和resume()
方法来暂停或恢复某些主题的消费,具体示例见qbus_pause_resume_example.cc。
更多API使用方法参考C examples,C++ examples,Go examples,Python examples,PHP examples目录下的示例代码。
配置文件是INI格式:
[global]
[topic]
[sdk]
global和topic配置见rdkafka 1.0.x configuration,sdk配置见sdk configuration。
通常情况下kafkabridge使用空配置文件即可工作,但是如果broker版本低于0.10.0.0,必须添加api.version相关的配置,见broker version compatibility.
例如,对0.9.0.1版本的broker,必须添加以下配置:
[global]
api.version.request=false
broker.version.fallback=0.9.0.1
当前配置和 broker 0.9.0.1 兼容。因此,如果使用了高版本的 broker,api.version.request
应该配置为 true。否则消息协议会使用旧版本,比如,没有时间戳字段。
QQ 群:876834263
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。
1. 开源生态
2. 协作、人、软件
3. 评估模型