
本文探讨activemq artemis在使用openwire jms客户端时,通过选择器浏览消息成功但无法接收消息的问题。核心原因在于activemq artemis 2.18.0版本与openwire客户端存在的已知bug (artemis-3916)。文章提供了两种解决方案:切换至activemq artemis核心jms客户端或将artemis broker升级至2.25.0或更高版本,并附带代码示例进行说明。
问题描述:选择器浏览成功,接收失败
在使用ActiveMQ Artemis 2.18.0及artemis-jms-client-all:2.18.0作为客户端依赖时,开发者可能会遇到一个异常情况:能够通过QueueBrowser结合JMSMessageID选择器成功浏览到目标消息,但随后使用MessageConsumer以相同的选择器尝试接收消息时,却无法获取到消息,导致receive(timeout)方法返回null,进而抛出IllegalStateException。这种现象并非总是发生,而是在大量消息中以较低的概率(例如十万分之一三十)出现。
以下代码片段展示了这一问题:
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.Enumeration;
public class ArtemisMessageIssueReproducer {
private static final String BROKER_URL = "tcp://localhost:61616"; // 假设Broker运行在本地61616端口
public static void main(String[] args) {
// 模拟一个JMSMessageID,实际场景中应从已发送消息中获取
String messageIdToFind = "ID:some-broker-id-12345-1-1";
// 假设消息已发送到名为 "hospital" 的队列中
Connection connection = null;
Session session = null;
String selector = "JMSMessageID='" + messageIdToFind + "'";
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
connection = connectionFactory.createConnection();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue deadQueue = session.createQueue("hospital");
connection.start();
// 1. 使用QueueBrowser浏览消息
QueueBrowser browser = session.createBrowser(deadQueue, selector);
Enumeration e = browser.getEnumeration();
int foundedElements = 0;
while (e.hasMoreElements()) {
Message message = (Message) e.nextElement();
System.out.println("Browser found message: " + message.getJMSMessageID());
foundedElements++;
}
browser.close();
if (foundedElements != 1) {
throw new IllegalStateException("根据选择器找到的消息数量不为1,实际为: " + foundedElements);
}
System.out.println("Browser成功找到消息。");
// 2. 使用MessageConsumer尝试接收消息
MessageConsumer messageConsumer = session.createConsumer(deadQueue, selector);
Message receivedMessage = messageConsumer.receive(1000); // 等待1秒
if (receivedMessage == null) {
throw new IllegalStateException("MessageConsumer未能接收到消息,返回null。");
} else {
System.out.println("MessageConsumer成功接收到消息: " + receivedMessage.getJMSMessageID());
}
messageConsumer.close();
session.commit();
System.out.println("事务提交成功。");
} catch (Exception e) {
System.err.println("发生异常: " + e.getMessage());
try {
if (session != null) {
session.rollback();
System.err.println("事务回滚。");
}
} catch (JMSException e1) {
System.err.println("回滚异常: " + e1.getMessage());
}
throw new RuntimeException(e);
} finally {
if (connection != null) {
try {
connection.close();
System.out.println("连接关闭。");
} catch (JMSException e) {
System.err.println("关闭连接异常: " + e.getMessage());
throw new RuntimeException(e);
}
}
}
}
}在上述代码中,如果foundedElements为1,但receivedMessage却为null,则说明遇到了该问题。
问题根源分析:OpenWire客户端与Broker版本兼容性
经过深入分析,此问题并非JMS规范的普遍行为,而是特定于ActiveMQ Artemis在使用OpenWire JMS客户端时,与较旧的Broker版本(如2.18.0)之间存在的兼容性问题。
ActiveMQ Artemis支持多种JMS客户端协议,其中:
- ActiveMQ Artemis Core JMS Client:这是Artemis原生的、推荐的JMS客户端,通常通过artemis-jms-client或artemis-jms-client-all(但需注意其内部可能包含OpenWire依赖)引入。
- OpenWire JMS Client:这是Apache ActiveMQ Classic使用的协议,Artemis为了兼容性也提供了支持。当使用artemis-jms-client-all时,如果配置不当或默认行为,可能会隐式地使用OpenWire协议。
问题的关键在于,ActiveMQ Artemis 2.18.0版本在处理OpenWire客户端的MessageConsumer与选择器结合时的内部机制存在一个已知的Bug,编号为ARTEMIS-3916。这个bug会导致即使消息存在并能被浏览器看到,消费者也可能无法正确匹配并接收到它。而QueueBrowser只是读取消息的副本或元数据,不涉及消息的实际消费和状态改变,因此不受此bug影响。
解决方案
针对此问题,主要有两种推荐的解决方案,可以根据实际项目情况选择:
方案一:切换至ActiveMQ Artemis核心JMS客户端
这是最直接且推荐的解决方案,因为它避免了OpenWire协议带来的潜在兼容性问题。确保你的项目显式地使用Artemis Core JMS客户端。
-
检查并调整Maven/Gradle依赖: 确保你的pom.xml或build.gradle中引入的是ActiveMQ Artemis的核心JMS客户端依赖,而不是可能默认使用OpenWire的聚合包或特定OpenWire客户端。通常,artemis-jms-client是核心客户端。
org.apache.activemq artemis-jms-client 2.18.0 或者,如果使用artemis-jms-client-all,请确认其内部配置或连接工厂是否强制使用了Artemis Core协议而非OpenWire。
-
使用ActiveMQConnectionFactory创建连接: 确保你的连接工厂是org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory,它默认使用Artemis的原生协议。
以下是使用核心JMS客户端的示例代码,该代码在ActiveMQ Artemis 2.18.0上测试通过,未复现问题:
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import javax.jms.*; import java.util.Enumeration; public class ArtemisCoreClientExample { private static final String BROKER_URL = "tcp://localhost:61616"; private static final String TEST_MESSAGE_CONTENT = "This is a test message for Artemis."; public static void main(String[] args) { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); try (Connection connection = connectionFactory.createConnection()) { Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Queue deadQueue = session.createQueue("hospital"); connection.start(); // 1. 发送一条消息以供测试 MessageProducer mp = session.createProducer(deadQueue); TextMessage m = session.createTextMessage(TEST_MESSAGE_CONTENT); mp.send(m); session.commit(); // 提交发送操作 String sentMessageId = m.getJMSMessageID(); System.out.println("消息发送成功,ID: " + sentMessageId); // 2. 使用QueueBrowser浏览消息 String selector = "JMSMessageID='" + sentMessageId + "'"; QueueBrowser browser = session.createBrowser(deadQueue, selector); Enumeration e = browser.getEnumeration(); int foundedElements = 0; while (e.hasMoreElements()) { e.nextElement(); // 仅遍历,不处理内容 foundedElements++; } browser.close(); if (foundedElements != 1) { throw new IllegalStateException("Browser找到的消息数量不为1,实际为: " + foundedElements); } System.out.println("Browser成功找到消息,数量: " + foundedElements); // 3. 使用MessageConsumer接收消息 MessageConsumer messageConsumer = session.createConsumer(deadQueue, selector); Message received = messageConsumer.receive(1000); // 等待1秒 if (received == null) { throw new IllegalStateException("MessageConsumer未能接收到消息,返回null。"); } else if (!(received instanceof TextMessage) || !((TextMessage) received).getText().equals(TEST_MESSAGE_CONTENT)) { throw new IllegalStateException("接收到的消息内容不匹配或类型错误。"); } System.out.println("MessageConsumer成功接收到消息,内容: " + ((TextMessage) received).getText()); messageConsumer.close(); session.commit(); // 提交接收操作 System.out.println("事务提交成功,消息已成功接收并处理。"); } catch (Exception e) { System.err.println("操作失败: " + e.getMessage()); throw new RuntimeException(e); } } }
方案二:升级ActiveMQ Artemis Broker
如果由于某些原因无法切换客户端库,那么升级ActiveMQ Artemis Broker是另一种有效的解决方案。
升级Broker版本: 将ActiveMQ Artemis Broker升级到至少2.25.0版本。ARTEMIS-3916问题在该版本中已得到修复。理想情况下,建议升级到最新稳定版本,以获得最新的bug修复、性能改进和新功能。
升级客户端依赖: 如果升级了Broker,通常也建议将客户端依赖(artemis-jms-client或artemis-jms-client-all)升级到与Broker版本兼容或相同的新版本,以确保最佳的兼容性和功能。
总结与注意事项
- 客户端选择至关重要:在ActiveMQ Artemis生态系统中,选择正确的JMS客户端库(核心客户端 vs. OpenWire客户端)对于系统的稳定性和性能至关重要。对于新项目或遇到兼容性问题时,优先考虑使用ActiveMQ Artemis的核心JMS客户端。
- 版本管理:JMS客户端库与Broker版本之间的兼容性非常重要。通常建议两者保持版本一致或客户端版本略高于Broker版本(在兼容范围内)。
- 调试策略:当遇到消息丢失或无法接收等问题时,应同时检查客户端日志和Broker日志。特别是Broker的broker.xml配置中的日志级别,可以调高以获取更详细的内部操作信息。
- 事务处理:示例代码中使用了事务会话 (session.createSession(true, Session.SESSION_TRANSACTED)),并在操作成功后进行commit(),失败时进行rollback()。这是生产环境中确保消息可靠性的标准实践。
通过理解问题根源并采取上述解决方案,可以有效解决ActiveMQ Artemis中选择器浏览与接收消息不一致的问题,确保消息系统的稳定可靠运行。










