我对此深感困惑。
我想订阅一个ActiveMQ主题。ActiveMQ在Centos机器上工作,而不是LOCALHOST。我可以使用tcp,http协议使用消息。代码;
public static void main(String[] args) throws JMSException {
PropertyUtils.loadPropertyFile();
Properties receiverProperties = PropertyUtils.getreceiverProperties();
// URL of the JMS server
String url = (String) receiverProperties.get("receiver.connection.url");
// Name of the queue we will receive messages from
String subject = (String) receiverProperties.get("receiver.topic.name");
// Getting JMS connection from the server
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
// Creating session for getting messages
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Getting the topic
Destination destination = session.createTopic(subject);
// MessageConsumer is used for receiving (consuming) messages
MessageConsumer consumer = session.createConsumer(destination);
Message message = null;
// Here we receive the message.
while (true) {
message = consumer.receive();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
System.out.println("Received message '" + textMessage.getText() + "'");
}
}
// We will be using TestMessage in our example. MessageProducer sent us a
// TextMessage
// so we must cast to it to get access to its .getText() method.
// connection.close();
}
我想使用wss协议。这对我来说是必须的。当我用wss:// host:port更改url时;
Could not create Transport. Reason: java.io.IOException: createTransport() method not implemented!
所以我检查了替代方案。人们通过Stomp over WS解决了这个问题。我的第一个成就是wss connection。
任何建议将不胜感激!
我试图为STOMP编写一个侦听器;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;
public class StompListener {
public static void main(String[] args) {
StompConnection connection = new StompConnection();
try {
connection.open("host", 61613);
connection.connect("admin", "admin", "test");
connection.subscribe("TEST_TOPIC", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
connection.begin("test");
while (true) {
try {
StompFrame message = connection.receive(10000);
System.out.println(String.format("%s - Receiver: received '%s'", new Date(), message.getBody()));
} catch (SocketTimeoutException e) {
// ignore
e.printStackTrace();
}
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
此代码在“ connection.receive”行上引发异常。
java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:224)
at java.io.DataInputStream.readByte(DataInputStream.java:265)
at org.apache.activemq.transport.stomp.StompWireFormat.readHeaderLine(StompWireFormat.java:174)
at org.apache.activemq.transport.stomp.StompWireFormat.readLine(StompWireFormat.java:167)
at org.apache.activemq.transport.stomp.StompWireFormat.parseAction(StompWireFormat.java:200)
at org.apache.activemq.transport.stomp.StompWireFormat.unmarshal(StompWireFormat.java:112)
at org.apache.activemq.transport.stomp.StompConnection.receive(StompConnection.java:77)
at tr.com.estherial.stomplistener.StompListener.main(StompListener.java:25)
您所看到的异常是预料之中的,因为您使用的OpenWire JMS客户端不支持WebSocket连接,并且实际上并不需要。WebSocket连接实际上仅与在有限环境(例如Web浏览器)中运行的客户端有关。Web浏览器不支持运行Java客户端。
如果您确实想在WebSockets上使用STOMP,则必须使用支持WebSockets的STOMP客户端实现(大多数这样做)。
请记住,ActiveMQ是经纪人。它不为其支持的所有协议提供客户端。它仅提供JMS客户端,因为实现JMS需要它。例如,STOMP是一种标准化协议,任何人都可以实现一个客户端,该客户端将与实现STOMP的任何代理一起工作。对于许多不同的平台,有许多可用许多不同语言编写的STOMP客户端实现。任何好的搜索引擎都应该帮助您找到一个适合您的需求。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句