Azure Service Fabric消息队列

约瑟夫·安德森

我正在尝试将一系列任务排队,并使用Azure Service Fabric异步运行它们。我当前正在将CloudMessageQueue与辅助角色一起使用。我正在尝试迁移到Service Fabric。从工作人员角色来看,这是我的代码:

    private void ExecuteTask()
    {
        CloudQueueMessage message = null;

        if (queue == null)
        {
            jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.Error, String.Format("Queue for WorkerRole2 is null. Exiting.")));
            return;
        }

        try
        {
            message = queue.GetMessage();
            if (message != null)
            {
                JMATask task = GetTask(message.AsString);
                string msg = (message == null) ? string.Empty : message.AsString;
                //jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.JMA, String.Format("Executing task {0}", msg)));
                queue.DeleteMessage(message);
                PerformTask(task);
            }
        }
        catch (Exception ex)
        {
            string msg = (message == null) ? string.Empty : message.AsString;
            jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.Error, String.Format("Message {0} Error removing message from queue {1}", msg, ex.ToString())));
        }
    }

我有一些问题:

  1. 如何异步运行Perform Task方法?我想同时执行大约30-40个任务。
  2. 我有一个JMATask列表。如何将列表添加到队列中?
  3. 列表是否需要添加到队列中?

    namespace Stateful1
    {
       public class JMATask
       {
         public string Name { get; set; }
       }
    
    /// <summary>
    /// An instance of this class is created for each service replica by the Service Fabric runtime.
    /// </summary>
    internal sealed class Stateful1 : StatefulService
    {
    public Stateful1(StatefulServiceContext context)
        : base(context)
    { }
    
    /// <summary>
    /// Optional override to create listeners (e.g., HTTP, Service Remoting, WCF, etc.) for this service replica to handle client or user requests.
    /// </summary>
    /// <remarks>
    /// For more information on service communication, see http://aka.ms/servicefabricservicecommunication
    /// </remarks>
    /// <returns>A collection of listeners.</returns>
    protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
    {
        return new ServiceReplicaListener[0];
    }
    
    /// <summary>
    /// This is the main entry point for your service replica.
    /// This method executes when this replica of your service becomes primary and has write status.
    /// </summary>
    /// <param name="cancellationToken">Canceled when Service Fabric needs to shut down this service replica.</param>
    protected override async Task RunAsync(CancellationToken cancellationToken)
    {
        // TODO: Replace the following sample code with your own logic 
        //       or remove this RunAsync override if it's not needed in your service.
    
        IReliableQueue<JMATask> tasks = await this.StateManager.GetOrAddAsync<IReliableQueue<JMATask>>("JMATasks");
        //var myDictionary = await this.StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary");
    
        while (true)
        {
            cancellationToken.ThrowIfCancellationRequested();
    
            using (var tx = this.StateManager.CreateTransaction())
            {
                var result = await tasks.TryDequeueAsync(tx);
    
                //how do I execute this method async?
                PerformTask(result.Value);
    
                //Create list of JMA Tasks to queue up
                await tasks.EnqueueAsync(tx, new JMATask());
    
                //var result = await myDictionary.TryGetValueAsync(tx, "Counter");
    
                //ServiceEventSource.Current.ServiceMessage(this, "Current Counter Value: {0}",
                //    result.HasValue ? result.Value.ToString() : "Value does not exist.");
    
                //await myDictionary.AddOrUpdateAsync(tx, "Counter", 0, (key, value) => ++value);
    
                // If an exception is thrown before calling CommitAsync, the transaction aborts, all changes are 
                // discarded, and nothing is saved to the secondary replicas.
                await tx.CommitAsync();
            }
    
            await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
        }
    }
    
    private async void PerformTask(JMATask task)
    {
        //execute task
    }
    

    }

Alltej

RunAsync方法不应具有以下行代码: await tasks.EnqueueAsync(tx, new JMATask());

创建要排队的JMA任务列表应该是您的有状态服务中的另一种方法,如下所示:

    public async Task AddJMATaskAsync(JMATask task)
    {
        var tasksQueue = await StateManager.GetOrAddAsync<IReliableQueue<JMATask>>("JMATasks");
        using (var tx = StateManager.CreateTransaction())
        {
            try
            {
                await tasksQueue.EnqueueAsync(tx, request);
                await tx.CommitAsync();
            }
            catch (Exception ex)
            {
                tx.Abort();
            }
        }
    }

然后,您的PerformTask方法可以包含对无状态微服务的调用:

    public async Task PerformTask (JMATask task)
    {
        //1. resolve stateless microservice URI
        // statelessSvc

        //2. call method of the stateless microservice
        // statelessSvc.PerformTask(task);
    }

因此,基本上,有状态服务将仅对任务进行排队和出队。可以通过微服务来执行实际任务,该微服务将对集群中的所有节点可用。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

删除Azure Service Bus队列消息?

来自分类Dev

互操作性Azure Service Bus消息队列消息

来自分类Dev

Azure Service Bus中的死信队列中的消息是否过期?

来自分类Dev

没有从Azure Service Bus队列接收消息

来自分类Dev

Azure Service Fabric的使用

来自分类Dev

Azure Service Fabric 路由

来自分类Dev

Service Fabric Azure测试环境

来自分类Dev

Azure Service Fabric多租户

来自分类Dev

Azure Service Fabric更改IP

来自分类Dev

Azure Service Fabric提醒GetReminder

来自分类Dev

Azure Service Fabric v 单体

来自分类Dev

在Azure Service Bus的死信队列中完成一条消息

来自分类Dev

即使已停止,Azure App Service也会使用服务总线队列消息

来自分类Dev

重新提交Azure Service Bus队列上的消息时Messagelock令牌已过期

来自分类Dev

在Azure Service Bus死信队列上通过MessageId获取消息

来自分类Dev

Azure队列存储未发送队列消息

来自分类Dev

Azure Servicebus队列消息处理

来自分类Dev

Azure Service Fabric中可靠的Blob状态?

来自分类Dev

在Azure Service Fabric上设置TCP

来自分类Dev

Azure Service Fabric actor提醒更新?

来自分类Dev

Azure Service Fabric群集设置问题

来自分类Dev

Azure Service Fabric加入虚拟网络

来自分类Dev

在Azure Service Fabric中动态创建服务

来自分类Dev

Azure Service Fabric中的发布/订阅模式

来自分类Dev

Azure Service Fabric Actor依赖项注入

来自分类Dev

Azure Service Fabric actor微服务

来自分类Dev

将SignalR与Azure Service Fabric结合使用

来自分类Dev

Azure Service Fabric可靠的集合和内存

来自分类Dev

如何关闭Azure Service Fabric的本地实例?