Background Queue with Channels
On this page
Production incident
You add an in-memory queue to process emails and webhooks asynchronously. During a traffic spike the queue grows without bound, memory climbs, and pods get OOMKilled. When the service restarts, queued work is gone. Users miss emails, and you have no audit trail. Another incident: the consumer is faster than the downstream and floods it because there is no concurrency control. The root cause is simple: you used an in-memory queue as if it were a message broker.
Symptoms
- Memory growth correlated with traffic spikes.
- Work disappears on crash/restart (no durability).
- Downstream rate limiting increases due to unbounded fan-out.
- Queue processing stops silently if the consumer throws and exits.
Root causes
- Unbounded queue: memory becomes the queue.
- No backpressure: producers never slow down.
- No durability: restart loses work.
- No visibility: no queue depth metric, no oldest-item age, no success/failure counters.
Diagnosis
# Find queue structures grep -R "Channel\.Create" -n . grep -R "ConcurrentQueue\|BlockingCollection" -n . grep -R "BackgroundService" -n . # Look for missing bounded options and missing ct usage grep -R "CreateBounded" -n .
Anti-pattern
// Unbounded channel: will OOM under sustained load var channel = Channel.CreateUnbounded<WorkItem>();
// Producer ignores backpressure and drops errors on the floor channel.Writer.TryWrite(item);
Correct pattern
Use a bounded channel with explicit full-mode behavior. Decide whether you want to block producers (apply backpressure) or drop (shed load). Always instrument.
Bounded queue with backpressure
public interface IBackgroundTaskQueue
{
ValueTask EnqueueAsync(WorkItem item, CancellationToken ct);
IAsyncEnumerable<WorkItem> DequeueAllAsync(CancellationToken ct);
int Capacity { get; }
}
public sealed class ChannelBackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<WorkItem> _channel;
public int Capacity { get; }
public ChannelBackgroundTaskQueue(int capacity)
{
Capacity = capacity;
_channel = Channel.CreateBounded<WorkItem>(new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait, // backpressure
SingleReader = false,
SingleWriter = false
});
}
public ValueTask EnqueueAsync(WorkItem item, CancellationToken ct)
=> _channel.Writer.WriteAsync(item, ct);
public async IAsyncEnumerable<WorkItem> DequeueAllAsync([System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct)
{
await foreach (var item in _channel.Reader.ReadAllAsync(ct))
yield return item;
}
}
Consumer with concurrency limits
public sealed class QueueWorker : BackgroundService
{
private readonly IBackgroundTaskQueue _queue;
private readonly SemaphoreSlim _concurrency = new(10);
private readonly ILogger<QueueWorker> _log;
public QueueWorker(IBackgroundTaskQueue queue, ILogger<QueueWorker> log)
{
_queue = queue;
_log = log;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var item in _queue.DequeueAllAsync(stoppingToken))
{
await _concurrency.WaitAsync(stoppingToken);
_ = Task.Run(async () =>
{
try
{
using var budget = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
budget.CancelAfter(TimeSpan.FromSeconds(30));
await item.ExecuteAsync(budget.Token);
}
catch (Exception ex)
{
_log.LogError(ex, "WorkItem failed");
// Decide: retry? dead-letter? persist failure?
}
finally
{
_concurrency.Release();
}
}, stoppingToken);
}
}
}
public sealed record WorkItem(Func<CancellationToken, Task> ExecuteAsync);
Durability warning
- Channel queues are in-process. If the process dies, items die.
- Use this for best-effort tasks or when you also persist work elsewhere (outbox table, durable queue).
- If you need guaranteed delivery: use a durable broker or DB-backed outbox + worker.
Security and performance impact
- Performance: bounded queues prevent OOM and stabilize latency by applying backpressure.
- Security: queues often handle PII and tokens (emails, webhooks). Do not log payloads. Apply size limits to prevent abuse.
Operational notes
- Monitoring: queue depth, enqueue wait time, reject/drop count (if using drop mode), worker concurrency utilization, success/failure counts.
- Rollout: start with conservative capacity and tune based on traffic. Validate that backpressure does not break user experience.
- Rollback: if backpressure causes request timeouts, reduce enqueue volume via feature flags or move work to durable infrastructure.
Checklist
- Queue is bounded; full-mode behavior is explicit.
- Consumer concurrency is limited and observable.
- Every work item has a deadline and respects cancellation.
- Failure handling is explicit (retry, dead-letter, persist).
- Durability requirements are documented and met (or explicitly not required).