我已经使用Dns.BeginGetHostEntry
基于主机名获取主机FQDN的方法(主机名列表存储在SQL Server数据库中)。此方法(异步)在不到30分钟的时间内完成了将近150k记录的运行,并在存储主机名的同一张SQL表中更新了FQDN。
该解决方案运行速度太快(超过了每秒300个请求的阈值)。由于允许的编号。服务器生成请求的数量受到限制,我的服务器在顶部对话者中列出,并被要求停止运行该应用程序。我必须重建此应用程序以使其同步运行,现在需要花费6个多小时才能完成。
//// TotalRecords are fetched from SQL database with the Hostname (referred as host further)
for (int i = 0; i < TotalRecords.Rows.Count; i++)
{
try
{
host = TotalRecords.Rows[i].ItemArray[0].ToString();
Interlocked.Increment(ref requestCounter);
string[] arr = new string[] { i.ToString(), host };
Dns.BeginGetHostEntry(host, GetHostEntryCallback,arr);
}
catch (Exception ex)
{
log.Error("Unknown error occurred\n ", ex);
}
}
do
{
Thread.Sleep(0);
} while (requestCounter>0);
ListAdapter.Update(TotalRecords);
问题:
有没有办法每秒限制此方法生成的请求数量?
我的理解是ParallelOptions.MaxDegreeOfParallelism
不控制每秒的线程,所以有什么办法TPL
可以是更好的选择吗?可以将其限制为否。每秒的请求数?
纯粹的异步解决方案。
它使用一个nuget程序包Nite.AsyncEx
并System.Reactive
执行错误处理,并提供DNS结果,这些结果作为IObservable<IPHostEntry>
这里有很多事情。您将需要将反应式扩展理解为标准的异步编程。可能有很多方法可以达到以下效果,但这是一个有趣的解决方案。
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Linq;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using Nito.AsyncEx;
using System.Threading;
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
public static class EnumerableExtensions
{
public static IEnumerable<Func<U>> Defer<T, U>
( this IEnumerable<T> source, Func<T, U> selector)
=> source.Select(s => (Func<U>)(() => selector(s)));
}
public class Program
{
/// <summary>
/// Returns the time to wait before processing another item
/// if the rate limit is to be maintained
/// </summary>
/// <param name="desiredRateLimit"></param>
/// <param name="currentItemCount"></param>
/// <param name="elapsedTotalSeconds"></param>
/// <returns></returns>
private static double Delay(double desiredRateLimit, int currentItemCount, double elapsedTotalSeconds)
{
var time = elapsedTotalSeconds;
var timeout = currentItemCount / desiredRateLimit;
return timeout - time;
}
/// <summary>
/// Consume the tasks in parallel but with a rate limit. The results
/// are returned as an observable.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="tasks"></param>
/// <param name="rateLimit"></param>
/// <returns></returns>
public static IObservable<T> RateLimit<T>(IEnumerable<Func<Task<T>>> tasks, double rateLimit){
var s = System.Diagnostics.Stopwatch.StartNew();
var n = 0;
var sem = new AsyncCountdownEvent(1);
var errors = new ConcurrentBag<Exception>();
return Observable.Create<T>
( observer =>
{
var ctx = new CancellationTokenSource();
Task.Run
( async () =>
{
foreach (var taskFn in tasks)
{
n++;
ctx.Token.ThrowIfCancellationRequested();
var elapsedTotalSeconds = s.Elapsed.TotalSeconds;
var delay = Delay( rateLimit, n, elapsedTotalSeconds );
if (delay > 0)
await Task.Delay( TimeSpan.FromSeconds( delay ), ctx.Token );
sem.AddCount( 1 );
Task.Run
( async () =>
{
try
{
observer.OnNext( await taskFn() );
}
catch (Exception e)
{
errors.Add( e );
}
finally
{
sem.Signal();
}
}
, ctx.Token );
}
sem.Signal();
await sem.WaitAsync( ctx.Token );
if(errors.Count>0)
observer.OnError(new AggregateException(errors));
else
observer.OnCompleted();
}
, ctx.Token );
return Disposable.Create( () => ctx.Cancel() );
} );
}
#region hosts
public static string [] Hosts = new [] { "google.com" }
#endregion
public static void Main()
{
var s = System.Diagnostics.Stopwatch.StartNew();
var rate = 25;
var n = Hosts.Length;
var expectedTime = n/rate;
IEnumerable<Func<Task<IPHostEntry>>> dnsTaskFactories = Hosts.Defer( async host =>
{
try
{
return await Dns.GetHostEntryAsync( host );
}
catch (Exception e)
{
throw new Exception($"Can't resolve {host}", e);
}
} );
IObservable<IPHostEntry> results = RateLimit( dnsTaskFactories, rate );
results
.Subscribe( result =>
{
Console.WriteLine( "result " + DateTime.Now + " " + result.AddressList[0].ToString() );
},
onCompleted: () =>
{
Console.WriteLine( "Completed" );
PrintTimes( s, expectedTime );
},
onError: e =>
{
Console.WriteLine( "Errored" );
PrintTimes( s, expectedTime );
if (e is AggregateException ae)
{
Console.WriteLine( e.Message );
foreach (var innerE in ae.InnerExceptions)
{
Console.WriteLine( $" " + innerE.GetType().Name + " " + innerE.Message );
}
}
else
{
Console.WriteLine( $"got error " + e.Message );
}
}
);
Console.WriteLine("Press enter to exit");
Console.ReadLine();
}
private static void PrintTimes(Stopwatch s, int expectedTime)
{
Console.WriteLine( "Done" );
Console.WriteLine( "Elapsed Seconds " + s.Elapsed.TotalSeconds );
Console.WriteLine( "Expected Elapsed Seconds " + expectedTime );
}
}
输出的最后几行是
result 5/23/2017 3:23:36 PM 84.16.241.74
result 5/23/2017 3:23:36 PM 84.16.241.74
result 5/23/2017 3:23:36 PM 157.7.105.52
result 5/23/2017 3:23:36 PM 223.223.182.225
result 5/23/2017 3:23:36 PM 64.34.93.5
result 5/23/2017 3:23:36 PM 212.83.211.103
result 5/23/2017 3:23:36 PM 205.185.216.10
result 5/23/2017 3:23:36 PM 198.232.125.32
result 5/23/2017 3:23:36 PM 66.231.176.100
result 5/23/2017 3:23:36 PM 54.239.34.12
result 5/23/2017 3:23:36 PM 54.239.34.12
result 5/23/2017 3:23:37 PM 219.84.203.116
Errored
Done
Elapsed Seconds 19.9990118
Expected Elapsed Seconds 19
One or more errors occurred.
Exception Can't resolve adv758968.ru
Exception Can't resolve fr.a3dfp.net
Exception Can't resolve ads.adwitserver.com
Exception Can't resolve www.adtrader.com
Exception Can't resolve trak-analytics.blic.rs
Exception Can't resolve ads.buzzcity.net
我无法粘贴完整的代码,因此这是带有主机列表的代码链接。
https://gist.github.com/bradphelan/084e4b1ce2604bbdf858d948699cc190
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句