等待基于任务的队列
我想知道是否存在ConcurrentQueue的实现/包装器,类似于BlockingCollection ,其中从集合中获取不会阻塞,而是异步并且将导致异步等待直到将项放入队列中。
我已经提出了自己的实现,但它似乎没有按预期执行。 我想知道我是否正在重塑已经存在的东西。
这是我的实现:
public class MessageQueue { ConcurrentQueue queue = new ConcurrentQueue(); ConcurrentQueue<TaskCompletionSource> waitingQueue = new ConcurrentQueue<TaskCompletionSource>(); object queueSyncLock = new object(); public void Enqueue(T item) { queue.Enqueue(item); ProcessQueues(); } public async Task Dequeue() { TaskCompletionSource tcs = new TaskCompletionSource(); waitingQueue.Enqueue(tcs); ProcessQueues(); return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; } private void ProcessQueues() { TaskCompletionSource tcs=null; T firstItem=default(T); while (true) { bool ok; lock (queueSyncLock) { ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem); if (ok) { waitingQueue.TryDequeue(out tcs); queue.TryDequeue(out firstItem); } } if (!ok) break; tcs.SetResult(firstItem); } } }
我不知道无锁解决方案,但您可以查看新的Dataflow库 ,它是Async CTP的一部分。 一个简单的BufferBlock
就足够了,例如:
BufferBlock buffer = new BufferBlock ();
通过数据流块类型的扩展方法可以轻松完成生产和消费。
生产就像:
buffer.Post(13);
和消费是异步准备:
int item = await buffer.ReceiveAsync();
我建议你尽可能使用Dataflow; 使这样的缓冲区既高效又正确,比首次出现更困难。
我的尝试(在创建“promise”时会引发一个事件,外部生产者可以使用它来知道何时生成更多项目):
public class AsyncQueue { private ConcurrentQueue _bufferQueue; private ConcurrentQueue> _promisesQueue; private object _syncRoot = new object(); public AsyncQueue() { _bufferQueue = new ConcurrentQueue (); _promisesQueue = new ConcurrentQueue>(); } /// /// Enqueues the specified item. /// /// The item. public void Enqueue(T item) { TaskCompletionSource promise; do { if (_promisesQueue.TryDequeue(out promise) && !promise.Task.IsCanceled && promise.TrySetResult(item)) { return; } } while (promise != null); lock (_syncRoot) { if (_promisesQueue.TryDequeue(out promise) && !promise.Task.IsCanceled && promise.TrySetResult(item)) { return; } _bufferQueue.Enqueue(item); } } /// /// Dequeues the asynchronous. /// /// The cancellation token. /// public Task DequeueAsync(CancellationToken cancellationToken) { T item; if (!_bufferQueue.TryDequeue(out item)) { lock (_syncRoot) { if (!_bufferQueue.TryDequeue(out item)) { var promise = new TaskCompletionSource (); cancellationToken.Register(() => promise.TrySetCanceled()); _promisesQueue.Enqueue(promise); this.PromiseAdded.RaiseEvent(this, EventArgs.Empty); return promise.Task; } } } return Task.FromResult(item); } /// /// Gets a value indicating whether this instance has promises. /// /// /// true if this instance has promises; otherwise, false . /// public bool HasPromises { get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; } } /// /// Occurs when a new promise /// is generated by the queue /// public event EventHandler PromiseAdded; }
对于您的用例(考虑到学习曲线),它可能有点过分,但Reactive Extentions提供了您可能想要的异步合成的所有粘合剂 。
您基本上订阅了更改,并在它们可用时将它们推送给您,您可以让系统在单独的线程上推送更改。
这是我目前正在使用的实现。
public class MessageQueue { ConcurrentQueue queue = new ConcurrentQueue (); ConcurrentQueue> waitingQueue = new ConcurrentQueue>(); object queueSyncLock = new object(); public void Enqueue(T item) { queue.Enqueue(item); ProcessQueues(); } public async Task DequeueAsync(CancellationToken ct) { TaskCompletionSource tcs = new TaskCompletionSource (); ct.Register(() => { lock (queueSyncLock) { tcs.TrySetCanceled(); } }); waitingQueue.Enqueue(tcs); ProcessQueues(); return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; } private void ProcessQueues() { TaskCompletionSource tcs = null; T firstItem = default(T); lock (queueSyncLock) { while (true) { if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem)) { waitingQueue.TryDequeue(out tcs); if (tcs.Task.IsCanceled) { continue; } queue.TryDequeue(out firstItem); } else { break; } tcs.SetResult(firstItem); } } } }
它工作得很好,但是在queueSyncLock
上有很多争用,因为我正在大量使用CancellationToken
来取消一些等待的任务。 当然,这会导致我用BlockingCollection
看到的BlockingCollection
要少得多,但……
我想知道是否有更顺畅,无锁的方法来达到同样的目的
您可以使用BlockingCollection
(使用默认的ConcurrentQueue
)并将调用包装到Take
in a Task
以便您可以await
它:
上述就是C#学习教程:等待基于任务的队列分享的全部内容,如果对大家有所用处且需要了解更多关于C#学习教程,希望大家多多关注—计算机技术网(www.ctvol.com)!
var bc = new BlockingCollection(); T element = await Task.Run( () => bc.Take() );
本文来自网络收集,不代表计算机技术网立场,如涉及侵权请联系管理员删除。
ctvol管理联系方式QQ:251552304
本文章地址:https://www.ctvol.com/cdevelopment/1029471.html