1 Star 0 Fork 43

星辰大海 / springboot-rocketmq

forked from zhaowg / springboot-rocketmq 
Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
This repository doesn't specify license. Please pay attention to the specific project description and its upstream code dependency when using it.
Clone or Download
README.md 16.23 KB
Copy Edit Raw Blame History
zhaowg authored 2018-03-03 17:02 . README

上一篇:Spring Boot 整合 RocketMq

RocketMq消息监听程序消除大量的if..else

承接上一篇文章,如果消费端订阅了多个topic和tag,则需要在消息监听器类中添加if..else,根据topic和tag处理不同的业务逻辑,使得消息监听类职责过重。

大概思路

消息监听器类只负责监听消息,获取到消息后通过topic和tag路由到需要调用的服务,消费者只需要编写对应的topic和tag的服务。

为了监听器类可以通过topic和tag路由到需要调用的服务,自定义一个消费者注解MQConsumeService(该注解包含topic和tag的定义),消费者实现类上添加该注解,然后就可以在监听器类中通过反射获取到实现类上面注解的topic和tag,和监听到的topic和tag比较,如果相同则调用该服务。

定义MQConsumeService注解

package com.clouds.common.rocketmq.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.stereotype.Service;

import com.clouds.common.rocketmq.TopicEnum;


/**
 * 此注解用于标注消费者服务
 * .<br/>
 * 
 * Copyright: Copyright (c) 2017  zteits
 * 
 * @ClassName: MQConsumeService
 * @Description: 
 * @version: v1.0.0
 * @author: zhaowg
 * @date: 2018年3月2日 下午1:15:52
 * Modification History:
 * Date             Author          Version            Description
 *---------------------------------------------------------*
 * 2018年3月2日      zhaowg           v1.0.0               创建
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Service
public @interface MQConsumeService {
    /**
     * 消息主题
     */
     TopicEnum topic();

    /**
     * 消息标签,如果是该主题下所有的标签,使用“*”
     */
     String[] tags();


}

定义消费者返回消息Bean

package com.clouds.common.rocketmq.consumer.processor;

import java.io.Serializable;
/**
 * 消费结果
 * .<br/>
 * 
 * Copyright: Copyright (c) 2017  zteits
 * 
 * @ClassName: MQConsumeResult
 * @Description: 
 * @version: v1.0.0
 * @author: zhaowg
 * @date: 2018年3月1日 上午11:12:55
 * Modification History:
 * Date             Author          Version            Description
 *---------------------------------------------------------*
 * 2018年3月1日      zhaowg           v1.0.0               创建
 */
public class MQConsumeResult implements Serializable{

	private static final long serialVersionUID = 1L;
	/**
	 * 是否处理成功
	 */
	private boolean isSuccess;
	/**
	 * 如果处理失败,是否允许消息队列继续调用,直到处理成功,默认true
	 */
	private boolean isReconsumeLater = true;
	/**
	 * 是否需要记录消费日志,默认不记录
	 */
	private boolean isSaveConsumeLog = false;
	/**
	 * 错误Code
	 */
	private String errCode;
	/**
	 * 错误消息
	 */
	private String errMsg;
	/**
	 * 错误堆栈
	 */
	private Throwable e;
	
	//省略set和get方法
}

定义统一的消息处理接口

package com.clouds.common.rocketmq.consumer.processor;

import java.util.List;

import com.alibaba.rocketmq.common.message.MessageExt;

/**
 * 消息队列-消息消费处理接口
 * .<br/>
 * 
 * Copyright: Copyright (c) 2017  zteits
 * 
 * @ClassName: MQMsgProcessorService
 * @Description: 
 * @version: v1.0.0
 * @author: zhaowg
 * @date: 2018年3月1日 上午9:57:57
 * Modification History:
 * Date             Author          Version            Description
 *---------------------------------------------------------*
 * 2018年3月1日      zhaowg           v1.0.0               创建
 */
public interface MQMsgProcessor {
	/**
	 * 消息处理<br/>
	 * 如果没有return true ,consumer会重新消费该消息,直到return true<br/>
	 * consumer可能重复消费该消息,请在业务端自己做是否重复调用处理,该接口设计为幂等接口
	 * @param topic 消息主题
	 * @param tag 消息标签
	 * @param msgs 消息
	 * @return
	 * 2018年3月1日 zhaowg
	 */
	MQConsumeResult handle(String topic, String tag, List<MessageExt> msgs);
}

定义抽象类实现上面的MQMsgProcessor接口

package com.clouds.common.rocketmq.consumer.processor;

import java.util.Arrays;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.rocketmq.common.message.MessageConst;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
 * 所有消息处理继承该类
 * .<br/>
 * 
 * Copyright: Copyright (c) 2017  zteits
 * 
 * @ClassName: AbstractMQMsgProcessorService
 * @Description: 
 * @version: v1.0.0
 * @author: zhaowg
 * @date: 2018年3月1日 上午11:14:25
 * Modification History:
 * Date             Author          Version            Description
 *---------------------------------------------------------*
 * 2018年3月1日      zhaowg           v1.0.0               创建
 */
public abstract class AbstractMQMsgProcessor implements MQMsgProcessor{
	
	protected static final Logger logger = LoggerFactory.getLogger(AbstractMQMsgProcessor.class);
    
	@Override
	public MQConsumeResult handle(String topic, String tag, List<MessageExt> msgs) {
		MQConsumeResult mqConsumeResult = new MQConsumeResult();
		/**可以增加一些其他逻辑*/
		
		for (MessageExt messageExt : msgs) {
			//消费具体的消息,抛出钩子供真正消费该消息的服务调用
			mqConsumeResult = this.consumeMessage(tag,messageExt.getKeys()==null?null:Arrays.asList(messageExt.getKeys().split(MessageConst.KEY_SEPARATOR)),messageExt);
		}
		
		/**可以增加一些其他逻辑*/
		return mqConsumeResult;
	}
	/**
	 * 消息某条消息
	 * @param tag 标签
	 * @param keys 消息关键字
	 * @param messageExt
	 * @return
	 * 2018年3月1日 zhaowg
	 */
	protected abstract MQConsumeResult consumeMessage(String tag,List<String> keys, MessageExt messageExt);

}

修改后的消费者监听器类

package com.clouds.common.rocketmq.consumer.processor;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.clouds.common.rocketmq.annotation.MQConsumeService;
import com.clouds.common.rocketmq.constants.RocketMQErrorEnum;
import com.clouds.common.rocketmq.exception.AppException;
import com.clouds.common.rocketmq.exception.RocketMQException;
/**
 * 消费者消费消息路由
 * .<br/>
 * 
 * Copyright: Copyright (c) 2017  zteits
 * 
 * @ClassName: RocketMQMessageListenerConcurrentlyProcessor
 * @Description: 
 * @version: v1.0.0
 * @author: zhaowg
 * @date: 2018年2月28日 上午11:12:32
 * Modification History:
 * Date             Author          Version            Description
 *---------------------------------------------------------*
 * 2018年2月28日      zhaowg           v1.0.0               创建
 */
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently{
	private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
    @Autowired
    private Map<String,MQMsgProcessor> mqMsgProcessorServiceMap;
	/**
	 *  默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息<br/>
	 *  不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
	 */
	@Override
	public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
		if(CollectionUtils.isEmpty(msgs)){
			logger.info("接受到的消息为空,不处理,直接返回成功");
			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		}
		ConsumeConcurrentlyStatus concurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
		try{
			//根据Topic分组
			Map<String, List<MessageExt>> topicGroups = msgs.stream().collect(Collectors.groupingBy(MessageExt::getTopic));
			for (Entry<String, List<MessageExt>> topicEntry : topicGroups.entrySet()) {
				String topic = topicEntry.getKey();
				//根据tags分组
				Map<String, List<MessageExt>> tagGroups = topicEntry.getValue().stream().collect(Collectors.groupingBy(MessageExt::getTags));
				for (Entry<String, List<MessageExt>> tagEntry : tagGroups.entrySet()) {
					String tag = tagEntry.getKey();
					//消费某个主题下,tag的消息
					this.consumeMsgForTag(topic,tag,tagEntry.getValue());
				}
			}
		}catch(Exception e){
			logger.error("处理消息失败",e);
			concurrentlyStatus = ConsumeConcurrentlyStatus.RECONSUME_LATER;
		}
        // 如果没有return success ,consumer会重新消费该消息,直到return success
        return concurrentlyStatus;
	}
	/**
	 * 根据topic 和 tags路由,查找消费消息服务
	 * @param topic
	 * @param tag
	 * @param value
	 * 2018年3月1日 zhaowg
	 */
	private void consumeMsgForTag(String topic, String tag, List<MessageExt> value) {
		//根据topic 和  tag查询具体的消费服务
		MQMsgProcessor imqMsgProcessor = selectConsumeService(topic, tag);
		
		try{
			if(imqMsgProcessor==null){
				logger.error(String.format("根据Topic:%s和Tag:%s 没有找到对应的处理消息的服务",topic,tag));
				throw new RocketMQException(RocketMQErrorEnum.NOT_FOUND_CONSUMESERVICE);
			}
			logger.info(String.format("根据Topic:%s和Tag:%s 路由到的服务为:%s,开始调用处理消息",topic,tag,imqMsgProcessor.getClass().getName()));
			//调用该类的方法,处理消息
			MQConsumeResult mqConsumeResult = imqMsgProcessor.handle(topic,tag,value);
			if(mqConsumeResult==null){
				throw new RocketMQException(RocketMQErrorEnum.HANDLE_RESULT_NULL);
			}
			if(mqConsumeResult.isSuccess()){
				logger.info("消息处理成功:"+JSON.toJSONString(mqConsumeResult));
			}else{
				throw new RocketMQException(RocketMQErrorEnum.CONSUME_FAIL,JSON.toJSONString(mqConsumeResult),false);
			}
			if(mqConsumeResult.isSaveConsumeLog()){
				logger.debug("开始记录消费日志");
				//TODO 记录消费日志
			}
		}catch(Exception e){
			if(e instanceof AppException){
				AppException mqe = (AppException)e;
				//TODO 记录消费失败日志
				throw new AppException(mqe.getErrCode(),mqe.getErrMsg(),false); 
			}else{
				//TODO 记录消费失败日志
				throw e;
			}
		}
	}
	/**
	 * 根据topic和tag查询对应的具体的消费服务
	 * @param topic
	 * @param tag
	 * @return
	 * 2018年3月3日 zhaowg
	 */
	private MQMsgProcessor selectConsumeService(String topic, String tag) {
		MQMsgProcessor imqMsgProcessor = null;
		for (Entry<String, MQMsgProcessor> entry : mqMsgProcessorServiceMap.entrySet()) {
			//获取service实现类上注解的topic和tags
			MQConsumeService consumeService = entry.getValue().getClass().getAnnotation(MQConsumeService.class);
			if(consumeService == null){
				logger.error("消费者服务:"+entry.getValue().getClass().getName()+"上没有添加MQConsumeService注解");
				continue;
			}
			String annotationTopic = consumeService.topic().getCode();
			if(!annotationTopic.equals(topic)){
				continue;
			}
			String[] tagsArr = consumeService.tags();
			//"*"号表示订阅该主题下所有的tag
			if(tagsArr[0].equals("*")){
				//获取该实例
				imqMsgProcessor = entry.getValue();
				break;
			}
			boolean isContains = Arrays.asList(tagsArr).contains(tag);
			if(isContains){
				//获取该实例
				imqMsgProcessor = entry.getValue();
				break;
			}
		}
		return imqMsgProcessor;
	}

}

至此,通过topic和tag路由服务已经写完了,现在以新增订阅一个DemoTopic主题为例,编写消息接受方。

新建消费者类,继承AbstractMQMsgProcessor类

注意:该类必须继承AbstractMQMsgProcessor,且添加MQConsumeService注解,用于定义该类是针对那个topic和tag的消费服务。新增订阅主题,还需要在application.propertis中修改rocketmq.consumer.topics。

package com.clouds.common.demo;

import java.util.List;

import com.alibaba.rocketmq.common.message.MessageExt;
import com.clouds.common.rocketmq.TopicEnum;
import com.clouds.common.rocketmq.annotation.MQConsumeService;
import com.clouds.common.rocketmq.consumer.processor.AbstractMQMsgProcessor;
import com.clouds.common.rocketmq.consumer.processor.MQConsumeResult;

@MQConsumeService(topic=TopicEnum.DemoTopic,tags={"*"})
public class DemoNewTopicConsumerMsgProcessImpl extends AbstractMQMsgProcessor{

	@Override
	protected MQConsumeResult consumeMessage(String tag, List<String> keys, MessageExt messageExt) {
		String msg = new String(messageExt.getBody());
		logger.info("获取到的消息为:"+msg);
		//TODO 判断该消息是否重复消费(RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重)
		
		//如果注解中tags数据中包含多个tag或者是全部的tag(*),则需要根据tag判断是那个业务,
		//如果注解中tags为具体的某个tag,则该服务就是单独针对tag处理的
		if(tag.equals("某个tag")){
			//做某个操作
		}
		//TODO 获取该消息重试次数
		int reconsume = messageExt.getReconsumeTimes();
		//根据消息重试次数判断是否需要继续消费
		if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功
			
		}
		MQConsumeResult result = new MQConsumeResult();
		result.setSuccess(true);
		return result;
	}

}
rocketmq.consumer.topics=DemoTopic~*;

运行测试类观察日志


  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.5.2.RELEASE)

2018-03-03 16:54:25.079  INFO 12496 --- [           main] c.c.c.SpringBootRocketMqApplication      : Starting SpringBootRocketMqApplication on DESKTOP-HS6GR38 with PID 12496 (D:\wsforjava\springboot-rocketmq\target\classes started by suolo in D:\wsforjava\springboot-rocketmq)
2018-03-03 16:54:25.086  INFO 12496 --- [           main] c.c.c.SpringBootRocketMqApplication      : No active profile set, falling back to default profiles: default
2018-03-03 16:54:25.201  INFO 12496 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@236e3f4e: startup date [Sat Mar 03 16:54:25 CST 2018]; root of context hierarchy
2018-03-03 16:54:27.073  INFO 12496 --- [           main] c.c.c.r.p.MQProducerConfiguration        : producer is start ! groupName:[springboot-rocketmq],namesrvAddr:[47.97.8.22:9876]
2018-03-03 16:54:27.430  INFO 12496 --- [           main] c.c.c.r.c.MQConsumerConfiguration        : consumer is start !!! groupName:springboot-rocketmq,topics:DemoTopic~*;,namesrvAddr:47.97.8.22:9876
2018-03-03 16:54:27.747  INFO 12496 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2018-03-03 16:54:27.764  INFO 12496 --- [           main] c.c.c.SpringBootRocketMqApplication      : Started SpringBootRocketMqApplication in 3.213 seconds (JVM running for 3.814)
2018-03-03 16:54:27.784  INFO 12496 --- [MessageThread_1] .c.c.r.c.p.MQConsumeMsgListenerProcessor : 根据Topic:DemoTopic和Tag:DemoTag 路由到的服务为:com.clouds.common.demo.DemoNewTopicConsumerMsgProcessImpl,开始调用处理消息
2018-03-03 16:54:30.162  INFO 12496 --- [MessageThread_1] c.c.c.r.c.p.AbstractMQMsgProcessor       : 获取到的消息为:demo msg test
2018-03-03 16:54:30.194  INFO 12496 --- [MessageThread_1] .c.c.r.c.p.MQConsumeMsgListenerProcessor : 消息处理成功:{"reconsumeLater":true,"saveConsumeLog":false,"success":true}

项目源码

https://gitee.com/zhaowg3/springboot-rocketmq/tree/master/

完成。

Java
1
https://gitee.com/qidian2/springboot-rocketmq.git
git@gitee.com:qidian2/springboot-rocketmq.git
qidian2
springboot-rocketmq
springboot-rocketmq
master

Search