具有Akka和多线程的JMS

克鲁蒂克
public class QueueListener implements MessageListener {

    public static final ExecutorService executor = Executors.newWorkStealingPool();

    public static boolean isActorinit=false;
    public static ActorSystem system=null;
    private ActorRef myActor=null;
    private String _queueName=null; 

    public QueueListener(String qName){
        this._queueName = qName;
        if(!isActorinit){
            system=ActorSystem.create("Controller");

            try {
            myActor=system.actorOf(Props.create(MessageExecutor.class.getConstructor(String.class).newInstance(_queueName).getClass()),"mysysActor");
            } catch (Exception e) {
                // TODO Auto-generated catch block
            }
            isActorinit=true;
        }
    }

    /* 
     * (non-Javadoc)
     * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
     */
    @Override
    public void onMessage(Message msg) {

//      processRequest(msg);
        executeRequest(msg);
    }

    /** This method will process the message fetch by the listener.
     *   
     * @param msg - javax.jms.Messages parameter get queue message
     */
    private void processRequest(Message msg){

        String requestData=null;
        try {

            if(msg instanceof TextMessage){
                TextMessage textMessage= (TextMessage) msg;
                requestData = textMessage.getText().toString();
            }else if(msg instanceof ObjectMessage){
                ObjectMessage objMsg = (ObjectMessage) msg; 
                requestData = objMsg.getObject().toString();
            }


            MessageProcessor msgProcessor = new MessageProcessor(_queueName, requestData);
            executor.submit(msgProcessor);
        } catch (JMSException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

    }

    private void executeRequest(Message msg){

        String requestData=null;
        try {

            if(msg instanceof TextMessage){
                TextMessage textMessage= (TextMessage) msg;
                requestData = textMessage.getText().toString();
            }else if(msg instanceof ObjectMessage){
                ObjectMessage objMsg = (ObjectMessage) msg; 
                requestData = objMsg.getObject().toString();
            }
//           MessageExecutor objMessageExecutor=new MessageExecutor(_queueName);
            myActor.tell(requestData, ActorRef.noSender()); 

        } catch (JMSException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

    }

}

使用ExecutorService执行ProcessRequst方法时,此代码可以正常工作。但是,面对akka actor系统实现的以下问题。

Exception in thread "Thread-4" java.lang.NullPointerException
    at com.syn.jms.listener.QueueListener.executeRequest(QueueListener.java:102)
    at com.syn.jms.listener.QueueListener.onMessage(QueueListener.java:59)
    at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:942)
    at java.lang.Thread.run(Thread.java:745)

请注意,我将Apache qpid APi用于带有activeMQ的AMQP协议。

我无法理解该问题。

克鲁蒂克

我发现解决方案是由于Actor ref的NPE,而每个进程队列使用唯一的actorRef处理多个输入时并未解决该对象。我找到了这个解决方案。

public QueueListener(String actorId,String qName){
        this._queueName = qName;
         if(!isActorinit){
                system=ActorSystem.create(actorId);

                isActorinit=true;
            }

          myActor=system.actorOf( Props.create(MessageExecutor.class, qName),qName);
    }

但是,我很感谢您的意见,可以帮助我解决问题。恶作剧

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

具有QThread和线程模块的Python多线程

来自分类Dev

具有队列和多线程的python中的分段错误

来自分类Dev

具有互斥和semapohres的多线程程序

来自分类Dev

具有ObservesProperty的多线程

来自分类Dev

具有多线程的printf()

来自分类Dev

具有ObservesProperty的多线程

来自分类Dev

具有LinkedList的多线程

来自分类Dev

具有多线程的ConcurrentQueue

来自分类Dev

使用 PipedOutputStream 和 PipedInputStream 的具有单线程读取器的多线程编写器

来自分类Dev

具有很多线程的休眠用法

来自分类Dev

具有对齐的int的多线程读写

来自分类Dev

具有非空函数的多线程

来自分类Dev

具有多线程或任务的进程队列

来自分类Dev

具有重载成员函数的多线程

来自分类Dev

具有非空函数的多线程

来自分类Dev

具有多线程的 Python 装饰器

来自分类Dev

具有单核处理器的多线程代码和具有多核处理器的单线程代码

来自分类Dev

引发ValueError(“不能具有多线程和多进程服务器。”)ValueError:不能具有多线程和多进程服务器

来自分类Dev

如何在Akka和Scala中实现多线程?

来自分类Dev

具有多线程持久化和刷新功能的Doctrine实体管理器

来自分类Dev

具有多线程持久化和刷新功能的Doctrine实体管理器

来自分类Dev

具有相互独立线程的多线程开销

来自分类Dev

具有相互独立线程的多线程开销

来自分类Dev

Java-具有同步方法的多线程练习

来自分类Dev

具有类成员函数的C ++ 11多线程

来自分类Dev

具有多线程的Scala单例对象

来自分类Dev

具有模板化类成员函数的多线程

来自分类Dev

具有固定键的字典上的多线程

来自分类Dev

原子加法与具有多线程的多个变量(在C中)

Related 相关文章

热门标签

归档