1 Star 9 Fork 1

tym_hmm / rabbitmq-pool-router-path-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

go rabbitMq 队列路由处理

最新版本 v1.1.8

描述

1、rabbitmq 使用同一交换机,队列, routekey, 监听队列后通过消息路由routePath 进行区分任务逻辑
一个消息队列可根据自定义路由处响应不通事件。 支持不同队列不同路由处理, 基于rabbitmq连接池

2、兼容rabbitmq链接池自定义 VirtualHosts

使用案例

生产项目已使用,每日处理上亿数据,每秒可处理1000+队列消息,rabbitmq集群及服务应用未出现较高压力

组件说明

基于rabbitmq连接池工具=>

接入说明

go get -u gitee.com/tym_hmm/rabbitmq-pool-router-path-go

生产者声明

var wg sync.WaitGroup
exchangeName := "test-data-center-exchange-name"
exchangeType := RabbitmqPool.EXCHANGE_TYPE_TOPIC
queueName := "test-data-center-queue-name"
routeKey := "test-data"
routePath := "/test/a"


wg.Add(2)
go func() {
  defer wg.Done()
  data:="这是一个数据test/a"

  //使用默认VirtualHosts 发送
  //product:=RabbitmqRoute.NewProductClient(host, port, user, pwd)
  //使用自定义VirtualHosts 发送
  product:=RabbitmqRoute.NewProductClientVirtualHosts(host, port, user, pwd, "/temptest1")

  err:=product.Publish(exchangeName, exchangeType, queueName, routeKey, routePath, data)
  if err !=nil{
    fmt.Println(err)
  }
}()

go func() {
  defer wg.Done()
  routePatsh := "/testaa"
  datas:="这是一个数据test"
  //使用默认VirtualHosts 发送
  //products:=RabbitmqRoute.NewProductClient(host, port, user, pwd)
  //使用自定义VirtualHosts 发送
  products:=RabbitmqRoute.NewProductClientVirtualHosts(host, port, user, pwd, "/temptest1")
  errs:=products.Publish(exchangeName, exchangeType, queueName, routeKey, routePatsh, datas)
  if errs !=nil{
    fmt.Println(errs)
  }
}()

  wg.Wait()

消费者声明


consumer := RabbitmqRoute.NewTask(host, port, user, pwd)

  //设置加载的节点信息(暂时只支持单节点加入)
consumer.SetHandleNode(&RabbitmqRoute.NodeInfo{
  NodeName:     "dataCenter",
  ExchangeName: "test-data-center-exchange-name",
  ExchangeType: RabbitmqPool.EXCHANGE_TYPE_TOPIC,
  Route:        "test-data",
  QueueName:    "test-data-center-queue-name",
  IsTry:        false,
  MaxReTry:     1,
})

  //注册加载的路由
consumer.RouteRegister(func(engine *RabbitmqRoute.TaskEngine) {
  //fmt.Println(engine)
  engine.AddRoute("/test", func(c *RabbitmqRoute.TaskContext) {
     fmt.Println(c.Request.Data)
   //c.Request.Data
  })
  engine.AddRoute("/test/a", func(c *RabbitmqRoute.TaskContext) {
    fmt.Println(c.Request.Data)
  })
})
  err := consumer.Enter()
  if err != nil {
    fmt.Println(err)
  }

image

空文件

简介

基于rabbbitmq 连接池 任务消费路由器 一个消息队列可根据自定义路由处响应不通事件。 支持不同队列不同路由处理, 基于rabbitmq连接池 展开 收起
Go
取消

发行版 (11)

全部

贡献者

全部

近期动态

加载更多
不能加载更多了
Go
1
https://gitee.com/tym_hmm/rabbitmq-pool-router-path-go.git
git@gitee.com:tym_hmm/rabbitmq-pool-router-path-go.git
tym_hmm
rabbitmq-pool-router-path-go
rabbitmq-pool-router-path-go
master

搜索帮助