0

0

分享Java如何远程连接调用Rabbitmq?

黄舟

黄舟

发布时间:2018-05-24 11:51:25

|

3475人浏览过

|

来源于php中文网

原创

rabbitmq远程调用测试,使用外部机器192.168.174.132上的rabbitmq,使用之前需要对远程调用进行配置,操作过程见博文“解决rabbitmq远程不能访问的问题”。

SendTest:

package com.mq.rabbitmq.rabbitmqtest;
  
import java.util.Date;
  
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
  
public class ReceiveTest {
    private final static String QUEUE_NAME = "ftpAgent";
    private final static String userName = "admin";
    private final static String password = "admin";
    private final static String virtualHost = "/";
    private final static int portNumber = 5672;
    private final static String hostName = "master";
    private final static String host = "192.168.174.132";
  
    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException {
  
        ConnectionFactory factory = new ConnectionFactory();
//      factory.setHost("192.168.174.160");
        factory.setUsername(userName);
        factory.setPassword(password);
//      factory.setVirtualHost(virtualHost);
        factory.setHost(host);
        factory.setPort(portNumber);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
//      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
          
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, true, consumer);
  
        Date nowTime = new Date();
          
        while (true) {
          QueueingConsumer.Delivery delivery = consumer.nextDelivery();
          String message = new String(delivery.getBody());
          System.out.println("RecieveTime: " + nowTime);
          System.out.println(" [x] Received '" + message + "'");
        }
  
    }
}

打开IDEA创建一个maven工程(Java就可以了)。 

这里写图片描述 

pom.xml文件如下


 4.0.0

 com.zhenqi
 rabbitmq-study
 1.0-SNAPSHOT
 jar

 rabbitmq-study
 http://maven.apache.org

 
  UTF-8
 

 
  
   junit
   junit
   4.12
   test
  

  
  
   com.rabbitmq
   amqp-client
   4.1.0
   
    
     org.slf4j
     slf4j-api
    
   
  

  
   org.slf4j
   slf4j-log4j12
   1.7.21
  

  
   commons-lang
   commons-lang
   2.6
  

 

为了能远程访问rabbitmq,则需要编辑 /etc/rabbitmq/rabbitmq.conf,添加以下内容。

立即学习Java免费学习笔记(深入)”;

[
  {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]}
 ]

添加administrator角色

rabbitmqctl set_user_tags openstack administrator

创建抽象队列 EndPoint.java

蚂蚁PPT
蚂蚁PPT

AI在线智能生成PPT

下载
package com.zhenqi;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * Created by wuming on 2017/7/16.
 */
public abstract class EndPoint {

  protected Channel channel;
  protected Connection connection;
  protected String endPointName;

  public EndPoint(String endpointName) throws Exception {

    this.endPointName = endpointName;

    //创建一个连接工厂 connection factory
    ConnectionFactory factory = new ConnectionFactory();

    //设置rabbitmq-server服务IP地址
    factory.setHost("192.168.146.128");
    factory.setUsername("openstack");
    factory.setPassword("rabbitmq");
    factory.setPort(5672);
    factory.setVirtualHost("/");

    //得到 连接
    connection = factory.newConnection();

    //创建 channel实例
    channel = connection.createChannel();

    channel.queueDeclare(endpointName, false, false, false, null);
  }

  /**
   * 关闭channel和connection。并非必须,因为隐含是自动调用的。
   * @throws IOException
   */
  public void close() throws Exception{
    this.channel.close();
    this.connection.close();
  }
}

生产者Producer.java

生产者类的任务是向队列里写一条消息

package com.zhenqi;
import org.apache.commons.lang.SerializationUtils;
import java.io.Serializable;
/**
 * Created by wuming on 2017/7/16.
 */
public class Producer extends EndPoint {

  public Producer(String endpointName) throws Exception {
    super(endpointName);
  }

  public void sendMessage(Serializable object) throws Exception {
    channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
  }
}

消费者QueueConsumer.java

消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。

package com.zhenqi;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import org.apache.commons.lang.SerializationUtils;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
 * Created by wuming on 2017/7/16.
 */
public class QueueConsumer extends EndPoint implements Runnable, Consumer {

  private Logger LOG=Logger.getLogger(QueueConsumer.class);

  public QueueConsumer(String endpointName) throws Exception {
    super(endpointName);
  }

  public void handleConsumeOk(String s) {

  }

  public void handleCancelOk(String s) {

  }

  public void handleCancel(String s) throws IOException {

  }

  public void handleShutdownSignal(String s, ShutdownSignalException e) {

  }

  public void handleRecoverOk(String s) {
    LOG.info("Consumer "+s +" registered");
  }

  public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
    Map map = (HashMap) SerializationUtils.deserialize(bytes);
    LOG.info("Message Number "+ map.get("message number") + " received.");
  }

  public void run() {
    try{
      channel.basicConsume(endPointName, true,this);
    }catch(IOException e){
      e.printStackTrace();
    }
  }
}

 测试

运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走

package com.zhenqi;

import java.util.HashMap;

/**
 * Created by wuming on 2017/7/16.
 */
public class TestRabbitmq {

  public static void main(String[] args){
    try{
      QueueConsumer consumer = new QueueConsumer("queue");
      Thread consumerThread = new Thread(consumer);
      consumerThread.start();

      Producer producer = new Producer("queue");

      for (int i = 0; i < 100000; i++){
        HashMap message = new HashMap();
        message.put("message number", i);
        producer.sendMessage(message);
        System.out.println("Message Number "+ i +" sent.");
      }
    }catch(Exception e){
      e.printStackTrace();
    }

  }
}

相关文章

java速学教程(入门到精通)
java速学教程(入门到精通)

java怎么学习?java怎么入门?java在哪学?java怎么学才快?不用担心,这里为大家提供了java速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!

下载

相关标签:

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

相关专题

更多
C++ 高级模板编程与元编程
C++ 高级模板编程与元编程

本专题深入讲解 C++ 中的高级模板编程与元编程技术,涵盖模板特化、SFINAE、模板递归、类型萃取、编译时常量与计算、C++17 的折叠表达式与变长模板参数等。通过多个实际示例,帮助开发者掌握 如何利用 C++ 模板机制编写高效、可扩展的通用代码,并提升代码的灵活性与性能。

10

2026.01.23

php远程文件教程合集
php远程文件教程合集

本专题整合了php远程文件相关教程,阅读专题下面的文章了解更多详细内容。

29

2026.01.22

PHP后端开发相关内容汇总
PHP后端开发相关内容汇总

本专题整合了PHP后端开发相关内容,阅读专题下面的文章了解更多详细内容。

21

2026.01.22

php会话教程合集
php会话教程合集

本专题整合了php会话教程相关合集,阅读专题下面的文章了解更多详细内容。

21

2026.01.22

宝塔PHP8.4相关教程汇总
宝塔PHP8.4相关教程汇总

本专题整合了宝塔PHP8.4相关教程,阅读专题下面的文章了解更多详细内容。

13

2026.01.22

PHP特殊符号教程合集
PHP特殊符号教程合集

本专题整合了PHP特殊符号相关处理方法,阅读专题下面的文章了解更多详细内容。

11

2026.01.22

PHP探针相关教程合集
PHP探针相关教程合集

本专题整合了PHP探针相关教程,阅读专题下面的文章了解更多详细内容。

8

2026.01.22

菜鸟裹裹入口以及教程汇总
菜鸟裹裹入口以及教程汇总

本专题整合了菜鸟裹裹入口地址及教程分享,阅读专题下面的文章了解更多详细内容。

55

2026.01.22

Golang 性能分析与pprof调优实战
Golang 性能分析与pprof调优实战

本专题系统讲解 Golang 应用的性能分析与调优方法,重点覆盖 pprof 的使用方式,包括 CPU、内存、阻塞与 goroutine 分析,火焰图解读,常见性能瓶颈定位思路,以及在真实项目中进行针对性优化的实践技巧。通过案例讲解,帮助开发者掌握 用数据驱动的方式持续提升 Go 程序性能与稳定性。

9

2026.01.22

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 2.8万人学习

C# 教程
C# 教程

共94课时 | 7.4万人学习

Java 教程
Java 教程

共578课时 | 50.1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号