如何从远程 IBM MQ 不断获取消息

拉特纳

我创建了一个 Windows 服务,它将连接到远程 MQ 并以 MQSTR 格式获取消息,但在获取消息后我没有关闭与远程 MQ 的连接。我的 Windows 服务将不断检查远程 MQ 中是否有数据可用,但在收到一条消息后,我需要重新启动我的服务以从远程 MQ 获取另一条消息。谁能告诉我需要做什么才能不断从远程 MQ 获取消息。任何线索或任何链接都可以。请帮忙

我的 C# windows 服务代码是这样的:

程序.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.ServiceProcess;
using System.Text;
using System.Threading.Tasks;

namespace MQ_listner
{
    static class Program
    {
        static void Main()
        {
            ServiceBase[] ServicesToRun;
            ServicesToRun = new ServiceBase[]
            {
                new Service1()
            };
            ServiceBase.Run(ServicesToRun);


        }
    }
}

服务1.cs

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;


namespace MQ_listner
{
    public partial class Service1 : ServiceBase
    {
        private MQReader MQReader;
        private string _serviceName = "MQ_Listener";
        private DateTime _TimeStart;
        private bool _run = true; 
        private Thread _thread;
        int WaitWhenStop = 0;
        private DateTime _TimeEnd;
        private TimeSpan _TimeDifference;
        private TimeSpan _TimeElasped = new TimeSpan(0);



        public Service1()
        {
            InitializeComponent();
        }

        protected override void OnStart(string[] args)
        {
            try
            {
                EventLog.WriteEntry(_serviceName + "was started at" + _TimeStart.ToString());
                _run = true;

                _thread = new Thread(new ThreadStart(StartMQListenerService));
                _thread.IsBackground = true;
                _thread.Start();
            }
            catch (Exception ex)
            {
                EventLog.WriteEntry(_serviceName + "was not started . Error Message : " + ex.ToString());
            }


        }

        protected override void OnStop()
        {
            _run = false;
            _thread.Join(WaitWhenStop);

            _TimeEnd = DateTime.Now;
            _TimeDifference = _TimeEnd.Subtract(_TimeStart); 
            _TimeElasped = _TimeElasped.Add(_TimeDifference);
            EventLog.WriteEntry(_serviceName + "was stopped at " + _TimeEnd.ToString() + "\r\n ran for total time :" + _TimeElasped.ToString());
        }


        // MQ connection service 

        public void StartMQListenerService()
        {
            try
            {
                if (_run)
                {
                    if (MQReader == null)
                    {
                        MQReader = new MQReader();
                        MQReader.InitializeConnections();
                        EventLog.WriteEntry(_serviceName + "MQ connection is established");
                    }
                }
            }
            catch (Exception ex)
            {
                System.Diagnostics.EventLog.WriteEntry(_serviceName, ex.ToString());
                System.Diagnostics.ProcessStartInfo startinfo = new System.Diagnostics.ProcessStartInfo();
                startinfo.WindowStyle = System.Diagnostics.ProcessWindowStyle.Hidden;
                startinfo.FileName = "NET";
                startinfo.Arguments = "stop" + this.ServiceName;
                Process.Start(startinfo);
            }
        }
    }
}


****MQReader.cs****

using System;
using IBM.WMQ;
using System.Diagnostics;
using System.IO;
using System.Xml;
using System.Linq;
using System.Xml.Linq;
using System.Configuration;

namespace MQ_listner
{
    internal class MQReader
    {
        public MQReader()
        {
        }
        public void InitializeConnections()
        {

            MQQueueManager queueManager;
            MQMessage queueMessage;
            MQGetMessageOptions queueGetMessageOptions;
            MQQueue queue;


            string QueueName;
            string QueueManagerName;
            string ChannelInfo;
            string channelName;
            string PortNumber;
            string transportType;
            string connectionName;

            QueueManagerName = ConfigurationManager.AppSettings["QueueManager"]; 
            QueueName = ConfigurationManager.AppSettings["Queuename"];
            ChannelInfo = ConfigurationManager.AppSettings["ChannelInformation"];
            PortNumber = ConfigurationManager.AppSettings["Port"];
            char[] separator = { '/' };
            string[] ChannelParams;
            ChannelParams = ChannelInfo.Split(separator);
            channelName = ConfigurationManager.AppSettings["Channel"];
            transportType = ConfigurationManager.AppSettings["TransportType"];
            connectionName = ConfigurationManager.AppSettings["ConnectionName"];
            String strReturn = "";

            try
            {
                queueManager = new MQQueueManager(QueueManagerName,
                channelName, connectionName);
                strReturn = "Connected Successfully";

                queue = queueManager.AccessQueue(QueueName,
                MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING);
                queueMessage = new MQMessage();
                queueMessage.Format = MQC.MQFMT_STRING;
                queueGetMessageOptions = new MQGetMessageOptions();
                queue.Get(queueMessage, queueGetMessageOptions);
                strReturn = queueMessage.ReadString(queueMessage.MessageLength);
            }
            catch (MQException exp)
            {
                strReturn = "Exception: " + exp.Message;
            }

            string path1 = @"C:\documents\Example.txt";
            System.IO.File.WriteAllText(path1, strReturn);

        }
    }
}

谁能告诉我我的代码有什么问题?我是否需要在此处添加任何内容以不断从远程 MQ 获取消息。请帮忙 。任何链接或线索都可以。

编辑

一定时间后,我需要重新启动我的服务以从远程 mq 获取数据。你能告诉我为什么 Windows 服务需要重新启动才能获取数据。有什么线索吗?任何想法 ?

罗杰

您的队列关闭和队列管理器断开连接的位置在哪里?如果您连接和/或打开某物,您必须确保关闭并断开连接。我强烈建议您参加 MQ 编程课程。或者前往MQ 技术会议,那里有关于 MQ 编程的会议。

我发布了一个功能齐全的 C# MQ 程序,该程序在MQQueueManager 消息池中检索队列中的所有消息

这是您的 MQReader 类的更新版本,应该会给您正确的想法。注意:我没有测试它。我把那个留给你。:)

此外,您应该将连接信息放在一个 Hashtable 中,并将该 Hashtable 传递给 MQQueueManager 类。

using System;
using IBM.WMQ;
using System.Diagnostics;
using System.IO;
using System.Xml;
using System.Linq;
using System.Xml.Linq;
using System.Configuration;

namespace MQ_listner
{
    internal class MQReader
    {
        private MQQueueManager qManager = null;
        private MQMessage      inQ = null;
        private bool           running = true;

        public MQReader()
        {
        }

        public bool InitQMgrAndQueue()
        {
            bool flag = true;
            Hashtable qMgrProp = new Hashtable();
            qMgrProp.Add(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES_MANAGED);
            qMgrProp.Add(MQC.HOST_NAME_PROPERTY, ConfigurationManager.AppSettings["ConnectionName"]);
            qMgrProp.Add(MQC.CHANNEL_PROPERTY, ConfigurationManager.AppSettings["Channel"]);

            try
            {
               if (ConfigurationManager.AppSettings["Port"] != null)
                  qMgrProp.Add(MQC.PORT_PROPERTY, System.Int32.Parse(ConfigurationManager.AppSettings["Port"]));
               else
                  qMgrProp.Add(MQC.PORT_PROPERTY, 1414);
            }
            catch (System.FormatException e)
            {
               qMgrProp.Add(MQC.PORT_PROPERTY, 1414);
            }

            if (ConfigurationManager.AppSettings["UserID"] != null)
               qMgrProp.Add(MQC.USER_ID_PROPERTY, ConfigurationManager.AppSettings["UserID"]);

            if (ConfigurationManager.AppSettings["Password"] != null)
               qMgrProp.Add(ConfigurationManager.AppSettings["Password"]);

            try
            {
                qManager = new MQQueueManager(ConfigurationManager.AppSettings["QueueManager"],
                                              qMgrProp);
                System.Console.Out.WriteLine("Connected Successfully");

                inQ = qManager.AccessQueue(ConfigurationManager.AppSettings["Queuename"],
                                              MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING);
                System.Console.Out.WriteLine("Open queue Successfully");
            }
            catch (MQException exp)
            {
                System.Console.Out.WriteLine("MQException CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
                flag = false;
            }

            return flag;
        }

        public void LoopThruMessages()
        {
            MQGetMessageOptions gmo = new MQGetMessageOptions();
            gmo.Options |= MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING;
            gmo.WaitInterval = 2500;  // 2.5 seconds wait time or use MQC.MQEI_UNLIMITED to wait forever
            MQMessage msg = null;

            while (running)
            {
                try
                {
                   msg = new MQMessage();
                   inQ.Get(msg, gmo);
                   System.Console.Out.WriteLine("Message Data: " + msg.ReadString(msg.MessageLength));
                }
                catch (MQException mqex)
                {
                   if (mqex.Reason == MQC.MQRC_NO_MSG_AVAILABLE)
                   {
                      // no meesage - life is good - loop again
                   }
                   else
                   {
                      running = false;  // severe error - time to exit
                      System.Console.Out.WriteLine("MQException CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
                   }
                }
                catch (System.IO.IOException ioex)
                {
                   System.Console.Out.WriteLine("ioex=" + ioex);
                }
            }

            try
            {
               if (inQ != null)
               {
                  inQ.Close();
                  System.Console.Out.WriteLine("Closed queue");
               }
            }
            catch (MQException mqex)
            {
                System.Console.Out.WriteLine("MQException CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
            }

            try
            {
               if (qMgr != null)
               {
                  qMgr.Disconnect();
                  System.Console.Out.WriteLine("disconnected from queue manager");
               }
            }
            catch (MQException mqex)
            {
                System.Console.Out.WriteLine("MQException CC=" + mqex.CompletionCode + " : RC=" + mqex.ReasonCode);
            }
        }

        public void StopIt()
        {
            running = false;
        }
    }
}

每当您停止服务时,请确保它调用 MQReader 中的 StopIt 方法。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

春季集成中如何回滚从IBM MQ获取的消息

来自分类Dev

部署在JBOSS上的MDB从IBM MQ获取消息

来自分类Dev

无法从IBM Websphere MQ的队列中获取消息

来自分类Dev

在JMS中获取消息类型-IBM MQ

来自分类Dev

如何使用Spring Batch创建到活动MQ和IBM MQ的通用JMS消息传递

来自分类Dev

如何从IBM MQ集群中另一个队列管理器中托管的队列中获取消息

来自分类Dev

如何在IBM Websphere Queue上获取消息计数

来自分类Dev

如何更改IBM MQ的SDR通道定义

来自分类Dev

如何使用IBM MQ-在Linux中的队列上备份和清除消息?

来自分类Dev

IBM MQ 中如何实现消息交换:推送还是拉取?

来自分类Dev

如何在Linux和UNIX中获取IBM MQ的默认安装目录?

来自分类Dev

在JMS中一次处理来自IBM MQ的消息

来自分类Dev

从启用了AMS的IBM Web Sphere MQ检索消息

来自分类Dev

C#:使用 Read() 方法时的 IBM MQ“解锁”消息

来自分类Dev

什么是IBM MQ节?

来自分类Dev

IBM MQ限制

来自分类Dev

针对多线程场景从IBM MQ获取基于corretionID的响应

来自分类Dev

如何在IBM MQ集群中动态创建预订?

来自分类Dev

如何在IBM MQ 7.0中禁用授权

来自分类Dev

如何更改ibm mq qm以与用户连接并通过

来自分类Dev

IBM MQ-MQ Explorer中可浏览的SSL加密消息

来自分类Dev

从Java类初始化IBM MQ时获取错误消息

来自分类Dev

如何将 JVM 参数`-Dcom.ibm.mq.cfg.useIBMCipherMappings=false` 添加到 IBM MQ?

来自分类Dev

如何使用IBM MQ库从WebSphrere MQ MQMD标头中读取值

来自分类Dev

IBM-MQ 创建 MQ 目标

来自分类Dev

IBM MQ认证和授权

来自分类Dev

IBM MQ 失败错误 2058

来自分类Dev

MQ服务器停机时如何获取丢失的消息

来自分类Dev

IBM MQ发布/订阅向一个订阅者发送消息

Related 相关文章

热门标签

归档