Здравствуйте, Kolesiki, Вы писали:
K>НЕТ! Мы не качаем "блоками по 5 файлов", а держим постоянную очередь из 5 (максимум) работяг. Если хотя бы один закончил работу, то на его место приходит новое задание на закачку. А у тебя ждёт всех 5-ти, а потом снова берёт 5.
Тогда можно за отправную точку взять вот этот код:
| Scheduler.cs |
| class ServiceRequestScheduler : IDisposable
{
private readonly ServiceJobFactory jobFactory;
private readonly ConcurrentQueue<ServiceJob> jobQueue;
private readonly Timer timer;
private readonly TimeSpan runPeriod;
private readonly int maxNumConcurrentJobs;
private int numConcurrentJobs;
public ServiceRequestScheduler(ServiceJobFactory jobFactory)
: this(2, TimeSpan.FromSeconds(30))
{
this.jobFactory = jobFactory;
}
private ServiceRequestScheduler(int numConcurrentJobs, TimeSpan runPeriod)
{
this.maxNumConcurrentJobs = numConcurrentJobs;
this.runPeriod = runPeriod;
this.jobQueue = new ConcurrentQueue<ServiceJob>();
this.timer = new Timer(RunJobs);
this.timer.Change(TimeSpan.Zero, this.runPeriod);
}
public Task Schedule(ServiceRequest request, CancellationToken cancelToken)
{
var job = this.jobFactory.CreateJob(request, cancelToken);
job.Started += HandleJobStarted;
job.Stopped += HandleJobStopped;
this.jobQueue.Enqueue(job);
return job.Task;
}
public void Dispose()
{
this.timer.Dispose();
}
private void HandleJobStopped(object sender, EventArgs e)
{
Interlocked.Decrement(ref this.numConcurrentJobs);
}
private void HandleJobStarted(object sender, EventArgs e)
{
Interlocked.Increment(ref this.numConcurrentJobs);
}
private void RunJobs(object state)
{
var numJobs = this.maxNumConcurrentJobs - this.numConcurrentJobs;
for (var i = 0; i < numJobs; ++i)
{
ServiceJob job;
if (this.jobQueue.TryDequeue(out job))
job.RunAsync();
}
}
}
abstract class ServiceJob
{
private readonly TaskCompletionSource<ServiceJob> taskSource;
protected ServiceJob()
{
this.taskSource = new TaskCompletionSource<ServiceJob>();
}
public Task Task { get { return this.taskSource.Task; } }
public event EventHandler Started;
public event EventHandler Stopped;
public async Task RunAsync()
{
try
{
OnStarted(EventArgs.Empty);
await RunOverrideAsync().ConfigureAwait(false);
this.taskSource.SetResult(this);
}
catch (OperationCanceledException)
{
this.taskSource.TrySetCanceled();
}
catch (Exception x)
{
this.taskSource.TrySetException(x);
}
finally
{
OnStopped(EventArgs.Empty);
}
}
protected virtual void OnStarted(EventArgs e)
{
var started = this.Started;
if (started != null)
{
started(this, e);
}
}
protected virtual void OnStopped(EventArgs e)
{
var stopped = this.Stopped;
if (stopped != null)
{
stopped(this, e);
}
}
protected abstract Task RunOverrideAsync();
}
|
| |