限制编号 Dns.BeginGetHostEntry方法每秒生成的请求数或使用任务并行库(TPL)

斯瓦蒂·古普塔

我已经使用Dns.BeginGetHostEntry基于主机名获取主机FQDN的方法(主机名列表存储在SQL Server数据库中)。此方法(异步)在不到30分钟的时间内完成了将近150k记录的运行,并在存储主机名的同一张SQL表中更新了FQDN。

该解决方案运行速度太快(超过了每秒300个请求的阈值)。由于允许的编号。服务器生成请求的数量受到限制,我的服务器在顶部对话者中列出,并被要求停止运行该应用程序。我必须重建此应用程序以使其同步运行,现在需要花费6个多小时才能完成。

//// TotalRecords are fetched from SQL database with the Hostname (referred as host further)
for (int i = 0; i < TotalRecords.Rows.Count; i++)
{
    try
    {
        host = TotalRecords.Rows[i].ItemArray[0].ToString();
        Interlocked.Increment(ref requestCounter);
        string[] arr = new string[] { i.ToString(), host }; 
        Dns.BeginGetHostEntry(host, GetHostEntryCallback,arr);
    }
    catch (Exception ex)
    {
        log.Error("Unknown error occurred\n ", ex);
    }
}
do
{
    Thread.Sleep(0);

} while (requestCounter>0);

ListAdapter.Update(TotalRecords);

问题:

  1. 有没有办法每秒限制此方法生成的请求数量?

  2. 我的理解是ParallelOptions.MaxDegreeOfParallelism不控制每秒的线程,所以有什么办法TPL可以是更好的选择吗?可以将其限制为否。每秒的请求数?

bradgone冲浪

纯粹的异步解决方案。

它使用一个nuget程序包Nite.AsyncExSystem.Reactive执行错误处理,并提供DNS结果,这些结果作为IObservable<IPHostEntry>

这里有很多事情。您将需要将反应式扩展理解为标准的异步编程可能有很多方法可以达到以下效果,但这是一个有趣的解决方案。

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Linq;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using Nito.AsyncEx;
using System.Threading;

#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed

public static class EnumerableExtensions
{
    public static IEnumerable<Func<U>> Defer<T, U>
        ( this IEnumerable<T> source, Func<T, U> selector) 
        => source.Select(s => (Func<U>)(() => selector(s)));
}


public class Program
{
    /// <summary>
    /// Returns the time to wait before processing another item
    /// if the rate limit is to be maintained
    /// </summary>
    /// <param name="desiredRateLimit"></param>
    /// <param name="currentItemCount"></param>
    /// <param name="elapsedTotalSeconds"></param>
    /// <returns></returns>
    private static double Delay(double desiredRateLimit, int currentItemCount, double elapsedTotalSeconds)
    {
        var time = elapsedTotalSeconds;
        var timeout = currentItemCount / desiredRateLimit;
        return timeout - time;
    }

    /// <summary>
    /// Consume the tasks in parallel but with a rate limit. The results
    /// are returned as an observable.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="tasks"></param>
    /// <param name="rateLimit"></param>
    /// <returns></returns>
    public static IObservable<T> RateLimit<T>(IEnumerable<Func<Task<T>>> tasks, double rateLimit){
        var s = System.Diagnostics.Stopwatch.StartNew();
        var n = 0;
        var sem = new  AsyncCountdownEvent(1);

        var errors = new ConcurrentBag<Exception>();

        return Observable.Create<T>
            ( observer =>
            {

                var ctx = new CancellationTokenSource();
                Task.Run
                    ( async () =>
                    {
                        foreach (var taskFn in tasks)
                        {
                            n++;
                            ctx.Token.ThrowIfCancellationRequested();

                            var elapsedTotalSeconds = s.Elapsed.TotalSeconds;
                            var delay = Delay( rateLimit, n, elapsedTotalSeconds );
                            if (delay > 0)
                                await Task.Delay( TimeSpan.FromSeconds( delay ), ctx.Token );

                            sem.AddCount( 1 );
                            Task.Run
                                ( async () =>
                                {
                                    try
                                    {
                                        observer.OnNext( await taskFn() );
                                    }
                                    catch (Exception e)
                                    {
                                        errors.Add( e );
                                    }
                                    finally
                                    {
                                        sem.Signal();
                                    }
                                }
                                , ctx.Token );
                        }
                        sem.Signal();
                        await sem.WaitAsync( ctx.Token );
                        if(errors.Count>0)
                            observer.OnError(new AggregateException(errors));
                        else
                            observer.OnCompleted();
                    }
                      , ctx.Token );

                return Disposable.Create( () => ctx.Cancel() );
            } );
    }

    #region hosts



    public static string [] Hosts = new [] { "google.com" }

    #endregion


    public static void Main()
    {
        var s = System.Diagnostics.Stopwatch.StartNew();

        var rate = 25;

        var n = Hosts.Length;

        var expectedTime = n/rate;

        IEnumerable<Func<Task<IPHostEntry>>> dnsTaskFactories = Hosts.Defer( async host =>
        {
            try
            {
                return await Dns.GetHostEntryAsync( host );
            }
            catch (Exception e)
            {
                throw new Exception($"Can't resolve {host}", e);
            }
        } );

        IObservable<IPHostEntry> results = RateLimit( dnsTaskFactories, rate );

        results
            .Subscribe( result =>
            {
                Console.WriteLine( "result " + DateTime.Now + " " + result.AddressList[0].ToString() );
            },
            onCompleted: () =>
            {
                Console.WriteLine( "Completed" );

                PrintTimes( s, expectedTime );
            },
            onError: e =>
            {
                Console.WriteLine( "Errored" );

                PrintTimes( s, expectedTime );

                if (e is AggregateException ae)
                {
                    Console.WriteLine( e.Message );
                    foreach (var innerE in ae.InnerExceptions)
                    {
                        Console.WriteLine( $"     " + innerE.GetType().Name + " " + innerE.Message );
                    }
                }
                else
                {
                        Console.WriteLine( $"got error " + e.Message );
                }
            }

            );

        Console.WriteLine("Press enter to exit");
        Console.ReadLine();
    }

    private static void PrintTimes(Stopwatch s, int expectedTime)
    {
        Console.WriteLine( "Done" );
        Console.WriteLine( "Elapsed Seconds " + s.Elapsed.TotalSeconds );
        Console.WriteLine( "Expected Elapsed Seconds " + expectedTime );
    }
}

输出的最后几行是

result 5/23/2017 3:23:36 PM 84.16.241.74
result 5/23/2017 3:23:36 PM 84.16.241.74
result 5/23/2017 3:23:36 PM 157.7.105.52
result 5/23/2017 3:23:36 PM 223.223.182.225
result 5/23/2017 3:23:36 PM 64.34.93.5
result 5/23/2017 3:23:36 PM 212.83.211.103
result 5/23/2017 3:23:36 PM 205.185.216.10
result 5/23/2017 3:23:36 PM 198.232.125.32
result 5/23/2017 3:23:36 PM 66.231.176.100
result 5/23/2017 3:23:36 PM 54.239.34.12
result 5/23/2017 3:23:36 PM 54.239.34.12
result 5/23/2017 3:23:37 PM 219.84.203.116
Errored
Done
Elapsed Seconds 19.9990118
Expected Elapsed Seconds 19
One or more errors occurred.
     Exception Can't resolve adv758968.ru
     Exception Can't resolve fr.a3dfp.net
     Exception Can't resolve ads.adwitserver.com
     Exception Can't resolve www.adtrader.com
     Exception Can't resolve trak-analytics.blic.rs
     Exception Can't resolve ads.buzzcity.net

我无法粘贴完整的代码,因此这是带有主机列表的代码链接。

https://gist.github.com/bradphelan/084e4b1ce2604bbdf858d948699cc190

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Linux

使用getaddrinfo缓存DNS

来自分类Dev

在Python上每秒限制HTTP请求数量

来自分类Dev

采访任务:使用令牌限制并发请求数

来自分类Dev

通过时间限制Dns.GetHostAddresses

来自分类Dev

错误“超出了与DNS交互的最大次数限制(10)”

来自分类Dev

NodeMCU UDP DNS请求格式

来自分类Dev

服务编号限制

来自分类Dev

在Node.js中限制对Cassandra DB的并行请求数

来自分类Dev

Scapy DNS请求

来自分类Dev

如何使用minikube的DNS?

来自分类Dev

每秒HTTPS请求数限制

来自分类Dev

限制等待DNS反向函数js的执行时间

来自分类Dev

使用脚本更改DNS

来自分类Dev

为什么对dns标签施加63个字节的限制?

来自分类Dev

Chrome:带有随机DNS名称的DNS请求:恶意软件?

来自分类Dev

将Web App限制为IP地址或DNS名称

来自分类Dev

任务并行库中的线程数限制

来自分类Dev

如何收集DNS A记录请求?

来自分类Dev

每个IP使用绑定DNS的不同DNS答案

来自分类Dev

我可以使用joomla htacess一次限制网址中的多个文件吗(路径的结尾是用户编号)

来自分类Dev

跟踪DNS请求

来自分类Dev

SoftLayer API:是否可以使用SoftLayer API(而非SoftLayer门户)来限制用户对DNS管理的访问?

来自分类Dev

使用终端更改DNS-公共DNS

来自分类Dev

限制最大编号 使用Javascript的表格行数

来自分类Dev

是否可以轮询DNS请求?

来自分类Dev

设置 DNS 缓存的最佳方法?

来自分类Dev

AWS API Gateway IP 流量限制 - 使用 DNS 而不是 IP 地址

来自分类Dev

如何在异步任务 C# 中限制每秒请求数

来自分类Dev

如何使用多个dns?