0

0

利用rabbit mq.模拟dubbo,使MQ异步调用

怪我咯

怪我咯

发布时间:2017-06-26 11:22:21

|

3183人浏览过

|

来源于php中文网

原创

最近在改造老系统,遇到了需要使用rabbitmq的场景.在以前使用的过程中需要在发送端和消费端各种配置,感觉比较麻烦,然后突然想到了dubbo中@reference注解的形式,可不可以做一个类似的架子,这样调用mq的时候就像调用同步接口一样方便简单呢?于是查了相关资料和看了dubbo的源码,之后就有了思路.

总的来说,要实现的目标就是像dubbo一样,消费端暴露接口(甚至可以复用dubbo服务定义的接口,这样写一个dubbo服务即可同步也可MQ异步),发送端通过自定义的注解注入对象调用方法,通过框架内部处理之后转换成异步mq形式发送到消费端.

比如服务端有接口:

public interface MqDemoService {
    void dealById(Long id);
}

有实现:

@Slf4j
@Component("mqDemoServiceImpl")
@Service(version = "1.0.0")
public class MqDemoServiceImpl implements MqDemoService {
    @Override
    public void dealById(Long id) {
        log.info("执行findById方法");
    }
}

其中:

@Slf4j是lombok注解
@Service是dubbo服务端注解

有兴趣的同学自行查阅

然后是发送端

有自定义注解:

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface AsyncInvoker {
}

于是在调用的controller中:

@Slf4j
@Controller
public class MqDemoController {
  @AsyncInvoker
  private MqDemoService mqDemoService;

  @RequestMapping(value = "/deal", method = RequestMethod.POST)
  public void deal() {
    mqDemoService.dealById(1L);
  }
}

注意Controller中@AsyncInvoker注解的属性mqDemoService,通过这个注解注入的对象调用方法的时候会通过mq发送变为异步调用.

好了,要实现的目标很清晰了.那么要解决的问题就是以下几个方面了:

1,如何确定发送消息的格式,使消费端可以确定调用的方法
2,发送端中如何为注解@AsyncInvoker注释的对象注入实例
3,接收端中如何在接收到消息后调用对应接口的实现方法
4,多个消费服务如何区分mq队列.

1,如何确定发送消息的格式,使接收端可以确定调用的方法

这里我先按照java反射调用需要的参数简单定义了一个传输对象:

@Data
public class MqMethodMeta {
  //调用的接口名称(包括包名,用于反射)
  private String interfaceName;
  //调用的方法名
  private String methodName;
  //调用的方法的参数
  private Object[] args;
  //调用的方法的参数类型
  private String[] paramTypeNames;
}

2,发送端中如何为注解@AsyncInvoker注释的对象注入实例

在这个场景中,发送端是只会引入消费端的接口,不会引入实现的.那么@AsyncInvoker如何注入对象呢?

答案就是动态代理.

那么还有如何让Spring知道@AsyncInvoker注解的对象要注入动态代理呢?

答案就是spring的BeanPostProcessor接口了!这个接口允许spring在处理对象创建的前后插入用户自己定义的逻辑,在这里就不细细展开了,有需要的同学自行google/百度了哈.

那么思路出来了,代码如下:

@Slf4j
@Component
public class AsyncInvokerBeanProcessor implements BeanPostProcessor {
  //缓存生成的动态代理对象,用于多个Controller注入同一类型对象时使用.
  private final ConcurrentMap proxyMap = new ConcurrentHashMap<>();

  //注入spring amqp处理mq的对象
  @Autowired
  private RabbitTemplate rabbitTemplate;

  //BeanPostProcessor接口方法,在spring创建每个实例前插入的用户自定义逻辑.这里我们需要的是在每个Controller对象创建的时候为其中的@AsyncInvoker注解对象注入动态代理.
  @Override
  public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
    //获取该实例中的有@AsyncInvoker注解的field
    Field[] fields = bean.getClass().getDeclaredFields();
    for (Field field : fields) {
      try {
        if (!field.isAccessible()) {
          field.setAccessible(true);
        }
        AsyncInvoker asyncInvoker = field.getAnnotation(AsyncInvoker.class);
        if (asyncInvoker != null) {
          //创建代理对象,赋值给该feild
          Object value = createProxy(field.getType());
          if (value != null) {
            field.set(bean, value);
          }
        }
      } catch (Throwable e) {
        log.error("Failed to init remote mq service at filed " + field.getName() + " in class " + bean.getClass().getName() + ", cause: " + e.getMessage(), e);
      }
    }
    return bean;
  }

  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    return bean;
  }

  private Object createProxy(Class clz) {
    String interfaceName;
    if (clz.isInterface()) {
      interfaceName = clz.getName();
    } else {
      throw new IllegalStateException("The @MqInvoker property type " + clz.getName() + " is not a interface.");
    }

    Object proxy = proxyMap.get(interfaceName);
    if (proxy == null) {
      Object newProxy = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clz}, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
          log.debug("执行动态代理! method:{} ,args: {}", method, args);
          if (method.getParameters().length != 1 || !method.getParameters()[0].getType().equals(Long.class)) {
            throw new IllegalAccessException("MQ Service 目前仅支持单参数Long类型方法");
          }
          //动态代理中创建mq传输对象并发送.
          MqMethodMeta mqMethodMeta = new MqMethodMeta();
          mqMethodMeta.setInterfaceName(clz.getName());
          mqMethodMeta.setMethodName(method.getName());
          mqMethodMeta.setArgs(args);
          String[] paramTypeNames = new String[args.length];
          for (int i = 0; i < args.length; i++) {
            paramTypeNames[i] = args[i].getClass().getName();
          }
          mqMethodMeta.setParamTypeNames(paramTypeNames);
          RabbitAdmin admin = new RabbitAdmin(rabbitTemplate.getConnectionFactory());
          Exchange exchange = new TopicExchange("exchange.demo.web.adaptor");
          admin.declareExchange(exchange);
          //关注此处clz.getName(),用于处理问题4
          rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta);
          return null;
        }
      });
      proxyMap.putIfAbsent(interfaceName, newProxy);
      proxy = proxyMap.get(interfaceName);
    }
    return proxy;
  }
}

3,接收端中如何在接收到消息后调用对应接口的实现方法

接收端调用对应接口就很简单了,只需要拿到MqMethodMeta对象进行反射调用就好了,直接上代码:

AIPAI
AIPAI

AI视频创作智能体

下载
@Slf4j
public class AsyncMethodListener implements ApplicationContextAware {
  private ApplicationContext applicationContext;

  @RabbitListener(bindings = @QueueBinding(
      value = @Queue(value = "${demo.mq.method.queue}", durable = "true"),
      exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"),
      key = "${demo.mq.method.routekey}"
  ))
  public void messageHandle(@Payload MqMethodMeta message) {
    try {
      log.info("收到message: {}", message);
      Class clz = Class.forName(message.getInterfaceName());
      String methodName = message.getMethodName();
      Object[] args = message.getArgs();
      Class[] paramTypes = new Class[message.getParamTypeNames().length];
      for (int i = 0; i < message.getParamTypeNames().length; i++) {
        paramTypes[i] = Class.forName(message.getParamTypeNames()[i]);
      }

      //由于使用Object[]数组传送参数,所以Jackson2JsonMessageConverter会将id转换为Integer,反射调用时会报错,此处强转一下
      for (int i = 0; i < args.length; i++) {
        Class c = paramTypes[i];
        if (args[i] instanceof Integer && c.equals(Long.class)) {
          args[i] = ((Integer) args[i]).longValue();
        }
      }
      //拿到spring管理的对应接口的实现
      Object invoker = applicationContext.getBean(clz);
      Method method = clz.getMethod(methodName, paramTypes);
      method.invoke(invoker, args);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.applicationContext = applicationContext;
  }
}

4,多个消费服务如何区分mq队列.

这里就使用到了rabbit的topic类型exchange.
首先对消费端listener中的queue和routekey进行可配置话管理:

@RabbitListener(bindings = @QueueBinding(
      value = @Queue(value = "${demo.mq.method.queue}", durable = "true"),
      exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"),
      key = "${demo.mq.method.routekey}"
  ))

注意这里的

${demo.mq.method.queue}
${demo.mq.method.routekey}

是从配置文件中读取出来的:

比如系统1中是如下配置:

demo.mq.method.queue=com.demo.service.project1.#
demo.mq.method.routekey=com.demo.service.project1.#

系统2中是如下配置:

demo.mq.method.queue=com.demo.service.project2.#
demo.mq.method.routekey=com.demo.service.project2.#

再看发送端中那段代码:

//关注此处clz.getName(),用于处理问题4
rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta);

这里面的clz.getName(). 由于我们系统是有良好的分包策略,所以系统1的clz.getName()一定是以com.demo.service.project1为开头的.一定会发送到project1中的listener.比如clz.getName()值为com.demo.service.project1.MqDemoService (".#"匹配后面多个标示符,此为rabbitMQ中topic类型exchange的特性).

至此,一开始想要达成的目标已经达成.今后需要用mq做异步调用的时候可以像同步方法一样使用了.

对于mq在spring中的使用在此就不详细列举了,可以参考文档:

http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/htmlsingle/

稍后会提供一套demo代码出来供记录和参考

总结

目前这套方法中还是存在一些问题的.比如:

1,因为目前业务场景,没有考虑异步回调的问题. 需要的话可以考虑和rabbitmq本身的异步回调方式结合. 目前还没有思考.
2,因为对消费端版本更新问题的考虑,目前仅仅支持单参数(整型)方法的调用.

第一个问题等需要用到对应业务后再做考虑吧.或者有思路的通知可以探讨一下.

第二个问题主要考虑的是如果消费端更改了参数类型或者其他之类的情况下,重新发布后,对于可能残留在mq中的老消息的兼容.这个目前确实没有什么好思路,抛出来也是为了集思广益了.

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
C++ 设计模式与软件架构
C++ 设计模式与软件架构

本专题深入讲解 C++ 中的常见设计模式与架构优化,包括单例模式、工厂模式、观察者模式、策略模式、命令模式等,结合实际案例展示如何在 C++ 项目中应用这些模式提升代码可维护性与扩展性。通过案例分析,帮助开发者掌握 如何运用设计模式构建高质量的软件架构,提升系统的灵活性与可扩展性。

14

2026.01.30

c++ 字符串格式化
c++ 字符串格式化

本专题整合了c++字符串格式化用法、输出技巧、实践等等内容,阅读专题下面的文章了解更多详细内容。

9

2026.01.30

java 字符串格式化
java 字符串格式化

本专题整合了java如何进行字符串格式化相关教程、使用解析、方法详解等等内容。阅读专题下面的文章了解更多详细教程。

12

2026.01.30

python 字符串格式化
python 字符串格式化

本专题整合了python字符串格式化教程、实践、方法、进阶等等相关内容,阅读专题下面的文章了解更多详细操作。

4

2026.01.30

java入门学习合集
java入门学习合集

本专题整合了java入门学习指南、初学者项目实战、入门到精通等等内容,阅读专题下面的文章了解更多详细学习方法。

20

2026.01.29

java配置环境变量教程合集
java配置环境变量教程合集

本专题整合了java配置环境变量设置、步骤、安装jdk、避免冲突等等相关内容,阅读专题下面的文章了解更多详细操作。

18

2026.01.29

java成品学习网站推荐大全
java成品学习网站推荐大全

本专题整合了java成品网站、在线成品网站源码、源码入口等等相关内容,阅读专题下面的文章了解更多详细推荐内容。

19

2026.01.29

Java字符串处理使用教程合集
Java字符串处理使用教程合集

本专题整合了Java字符串截取、处理、使用、实战等等教程内容,阅读专题下面的文章了解详细操作教程。

3

2026.01.29

Java空对象相关教程合集
Java空对象相关教程合集

本专题整合了Java空对象相关教程,阅读专题下面的文章了解更多详细内容。

6

2026.01.29

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号