59 Star 201 Fork 89

shenzhanwang / Spring-activeMQ

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

Spring-activeMQ

在业务逻辑的异步处理,系统解耦,分布式通信以及控制高并发的场景下,消息队列有着广泛的应用。本项目基于Spring这一平台,整合流行的开源消息队列中间件ActiveMQ,实现一个向ActiveMQ添加和读取消息的功能。并比较了两种模式:生产者-消费者模式和发布-订阅模式的区别。 包含的特性如下:

1.开启activeMQ,访问http://localhost:8080/demo

2 在项目中,我们为消息的生产者和发布者分别注册了两个消费者和订阅者,当有消息到达activeMQ时,消费者和订阅者会自动获取对应的消息,其中两个消费者会轮流消费消息,而两个订阅者会同时订阅所有消息;

3.填入要发送的消息,点击生产消息可以向消息队列添加一条消息,我们可以试着添加了四条消息,并观察控制台结果,可以发现每个消息只被某一个消费者接收;

4.重复以上操作发布四条消息,可以看到订阅者的输出结果,表明每个发布的消息可以被两个订阅者全部接收;

5.以上结果表明,向队列生产的每条消息,只能被某一个消费者读取,而发布的消息,可以被每个订阅者重复读取,这是两种模式最大的区别,实际应用中要根据情况来选择。

输入图片说明

输入图片说明

输入图片说明

ActiveMQ集群搭建

  • 原理图:

输入图片说明

  • activeMQ集群服务器个数为单数,一主多从,主服务器选举方式依赖zookeeper,因此zookeeper集群若不可用,则activeMQ集群不可用。客户端请求与主服务器通信,从服务器不工作,但能同步主服务器的数据。主服务器宕机,自动选举一台从服务器为主服务器。集群要求至少一半以上服务器不能宕机。
  • 一个activeMQ集群中同一时刻只有一台主服务器在工作,为避免单点故障,可以建立多个activeMQ集群,然后配置将他们桥接起来,所有集群内的队列、生产者消费者都可以共享,以达到负载均衡,横向扩展集群性能的目的。

搭建第一个activeMQ集群(activeMQ版本号5.14.5)

找一台服务器,把activeMQ的安装文件夹拷贝两次,分别进行配置:

修改conf/jetty.xml

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8361"/>
</bean>

这里配置web控制台端口为8361,另外两台分别改为8362、8363。

修改conf/activemq.xml持久化适配器

<persistenceAdapter>
<!-- kahaDB directory="${activemq.data}/kahadb"/ -->
<replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:63631" zkAddress="192.168.1.81:2181,192.168.1.82:2182,192.168.1.83:2183" hostname="edu-mq-01" zkPath="/activemq/leveldb-stores"/>
</persistenceAdapter>
<persistenceAdapter>
<!-- kahaDB directory="${activemq.data}/kahadb"/ -->
<replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:63632"
zkAddress="192.168.1.81:2181,192.168.1.82:2182,192.168.1.83:2183" hostname="edu-mq-01" zkPath="/activemq/leveldb-stores"/>
</persistenceAdapter>
<persistenceAdapter>
<!-- kahaDB directory="${activemq.data}/kahadb"/ -->
<replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:63633"
zkAddress="192.168.1.81:2181,192.168.1.82:2182,192.168.1.83:2183" hostname="edu-mq-01" zkPath="/activemq/leveldb-stores"/>
</persistenceAdapter>

zkAddress填zookeeper地址,ZkPath指集群选举信息在zookeeper的存放路径。

修改各节点的消息端口

<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:53531?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5662?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:53532?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5663?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1884?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61615?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:53533?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5664?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1885?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

注意端口号不要冲突。 依次启动节点,观察zookeeper内数据的值,可以判断当前主节点是哪一台服务器。可以关闭部分服务器来测试集群的高可用性。

java连接activeMQ

failover:(tcp://192.168.252.128:53531,tcp://192.168.252.128:53532,tcp://192.168.252.128:53533)?randomize=false

集群的扩展

由于一个activeMQ集群只有一个主服务器在工作,对消息的吞吐量是有限的。要想进行扩展,则需要把多个集群桥接起来,这样连接到任意一个集群,即可与整个集群的队列,生产者,消费者之间进行通信。在activemq.xml新增:

<networkConnectors>
<networkConnector
uri="static:(tcp://192.168.1.101:53531,tcp://192.168.1.101:53532,tcp://192.168.1.101:53533)"
duplex="false"/>
</networkConnectors>

以上配置即可完成到tcp://192.168.1.101:53531,tcp://192.168.1.101:53532,tcp://192.168.1.101:53533集群的链接。注意要相互配。

好书推荐

输入图片说明

购买地址
输入图片说明

专利文章

标题 技术领域
1 一种基于微服务架构的车联网大数据分析系统 微服务
2 一种流式数据场景下Elasticsearch索引的自动化扩容方法 搜索引擎
3 大数据钻取分析方法、装置、设备及存储介质 大数据分析
4 一种基于工作流引擎的自动化办公方法和系统 工作流引擎
5 一种低延迟高性能实时数据仓库搭建的方法和系统 实时数仓
6 一种基于数据治理的大数据中台架构系统 数据中台

视频教程

附录:中央技术储备仓库(Central Technique Reserve Repository)

基础篇:职业化,从做好OA系统开始

  1. Spring boot整合Mybatis实现增删改查(支持多数据源)输入图片说明
  2. Spring,SpringMVC和Hibernate的整合实现增删改查
  3. Spring boot整合activiti工作流引擎实现OA开发输入图片说明
  4. Ruoyi-boot集成工作流引擎Flowable实例输入图片说明
  5. Spring发布与调用REST风格的WebService
  6. Spring boot整合Axis调用SOAP风格的web服务
  7. Spring boot整合Apache Shiro实现RBAC权限控制
  8. 使用Spring security实现RBAC权限控制

中级篇:中间件的各种姿势

  1. Spring boot整合mongoDB文档数据库实现增删改查
  2. Spring连接Redis实现缓存
  3. Spring连接图存数据库Neo4j实现增删改查
  4. Spring boot整合列存数据库hbase实现增删改查
  5. Spring平台整合消息队列ActiveMQ实现发布订阅、生产者消费者模型(JMS)
  6. Spring boot整合消息队列RabbitMQ实现四种消息模式(AMQP)
  7. Spring boot整合kafka 2.1.0实现大数据消息管道
  8. Spring boot整合websocket实现即时通讯输入图片说明
  9. Spring security整合oauth2实现token认证
  10. Spring boot整合MinIO客户端实现文件管理
  11. 23种设计模式,源码、注释、使用场景
  12. 使用ETL工具Kettle的实例
  13. Git指南和分支管理策略
  14. 使用数据仓库进行OLAP数据分析(Mysql+Kettle+Zeppelin)

高级篇:分布式系统和大数据开发

  1. zookeeper原理、架构、使用场景和可视化
  2. Spring boot整合Apache dubbo v2.7.5实现分布式服务治理(SOA架构) 输入图片说明

包含组件Spring boot v2.2.2+Dubbo v2.7.5+Nacos v1.1.1 效果图

  1. 使用Spring Cloud Alibaba v2.2.7实现微服务架构(MSA架构)输入图片说明

包含组件Nacos+Feign+Gateway+Ribbon+Sentinel+Zipkin 效果图

  1. 使用jenkins+centos+git+maven搭建持续集成环境自动化部署分布式服务
  2. 使用docker+compose+jenkins+gitlab+spring cloud实现微服务的编排、持续集成和动态扩容
  3. 使用Spark进行分布式计算
  • Spark SQL做离线计算
  • Spark Streaming做实时计算
  • Structured Streaming做实时计算
  1. 使用Flink实现流批一体化的分布式计算
  2. 搭建高可用nginx集群和Tomcat负载均衡
  3. 使用mycat实现Mysql数据库的主从复制、读写分离、分表分库、负载均衡和高可用
  4. 《Elasticsearch数据搜索与分析实战》源码 输入图片说明

特别篇:分布式事务和并发控制

  1. 基于可靠消息最终一致性实现分布式事务(activeMQ)
  2. Spring boot dubbo整合seata实现分布式事务输入图片说明

包含组件nacos v2.0.2 + seata v1.4.2 +spring boot dubbo v2.7.5 效果图

  1. Spring cloud alibaba v2.2.7整合seata实现分布式事务 输入图片说明

包含组件nacos v2.0.2 + seata v1.4.2 +spring cloud alibaba v2.2.7 效果图

  1. 并发控制:数据库锁机制和事务隔离级别的实现输入图片说明
  2. 并发控制:使用redission实现分布式锁
  3. 并发控制:使用zookeeper实现分布式锁
  4. 并发控制:Java多线程编程实例
  5. 并发控制:使用netty实现高性能NIO通信

关注微信公众号获取更多技术文章和源码

输入图片说明

空文件

简介

Spring boot整合消息队列ActiveMQ 展开 收起
Java 等 3 种语言
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/shenzhanwang/Spring-activeMQ.git
git@gitee.com:shenzhanwang/Spring-activeMQ.git
shenzhanwang
Spring-activeMQ
Spring-activeMQ
master

搜索帮助

14c37bed 8189591 565d56ea 8189591