DOTNET Contents

Background Queue with Channels

In-memory background queues are fine until you treat them like durable infrastructure. A Channel-based queue can control backpressure and concurrency, but it will drop work on crash unless you persist. This is the safe pattern for in-process queues.

On this page

Production incident

You add an in-memory queue to process emails and webhooks asynchronously. During a traffic spike the queue grows without bound, memory climbs, and pods get OOMKilled. When the service restarts, queued work is gone. Users miss emails, and you have no audit trail. Another incident: the consumer is faster than the downstream and floods it because there is no concurrency control. The root cause is simple: you used an in-memory queue as if it were a message broker.

Symptoms

  • Memory growth correlated with traffic spikes.
  • Work disappears on crash/restart (no durability).
  • Downstream rate limiting increases due to unbounded fan-out.
  • Queue processing stops silently if the consumer throws and exits.

Root causes

  • Unbounded queue: memory becomes the queue.
  • No backpressure: producers never slow down.
  • No durability: restart loses work.
  • No visibility: no queue depth metric, no oldest-item age, no success/failure counters.

Diagnosis

# Find queue structures
grep -R "Channel\.Create" -n .
grep -R "ConcurrentQueue\|BlockingCollection" -n .
grep -R "BackgroundService" -n .

# Look for missing bounded options and missing ct usage
grep -R "CreateBounded" -n .

Anti-pattern

// Unbounded channel: will OOM under sustained load
var channel = Channel.CreateUnbounded<WorkItem>();
// Producer ignores backpressure and drops errors on the floor
channel.Writer.TryWrite(item);

Correct pattern

Use a bounded channel with explicit full-mode behavior. Decide whether you want to block producers (apply backpressure) or drop (shed load). Always instrument.

Bounded queue with backpressure

public interface IBackgroundTaskQueue
{
    ValueTask EnqueueAsync(WorkItem item, CancellationToken ct);
    IAsyncEnumerable<WorkItem> DequeueAllAsync(CancellationToken ct);
    int Capacity { get; }
}

public sealed class ChannelBackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly Channel<WorkItem> _channel;

    public int Capacity { get; }

    public ChannelBackgroundTaskQueue(int capacity)
    {
        Capacity = capacity;

        _channel = Channel.CreateBounded<WorkItem>(new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait, // backpressure
            SingleReader = false,
            SingleWriter = false
        });
    }

    public ValueTask EnqueueAsync(WorkItem item, CancellationToken ct)
        => _channel.Writer.WriteAsync(item, ct);

    public async IAsyncEnumerable<WorkItem> DequeueAllAsync([System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct)
    {
        await foreach (var item in _channel.Reader.ReadAllAsync(ct))
            yield return item;
    }
}

Consumer with concurrency limits

public sealed class QueueWorker : BackgroundService
{
    private readonly IBackgroundTaskQueue _queue;
    private readonly SemaphoreSlim _concurrency = new(10);
    private readonly ILogger<QueueWorker> _log;

    public QueueWorker(IBackgroundTaskQueue queue, ILogger<QueueWorker> log)
    {
        _queue = queue;
        _log = log;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var item in _queue.DequeueAllAsync(stoppingToken))
        {
            await _concurrency.WaitAsync(stoppingToken);

            _ = Task.Run(async () =>
            {
                try
                {
                    using var budget = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
                    budget.CancelAfter(TimeSpan.FromSeconds(30));

                    await item.ExecuteAsync(budget.Token);
                }
                catch (Exception ex)
                {
                    _log.LogError(ex, "WorkItem failed");
                    // Decide: retry? dead-letter? persist failure?
                }
                finally
                {
                    _concurrency.Release();
                }
            }, stoppingToken);
        }
    }
}

public sealed record WorkItem(Func<CancellationToken, Task> ExecuteAsync);

Durability warning

  • Channel queues are in-process. If the process dies, items die.
  • Use this for best-effort tasks or when you also persist work elsewhere (outbox table, durable queue).
  • If you need guaranteed delivery: use a durable broker or DB-backed outbox + worker.

Security and performance impact

  • Performance: bounded queues prevent OOM and stabilize latency by applying backpressure.
  • Security: queues often handle PII and tokens (emails, webhooks). Do not log payloads. Apply size limits to prevent abuse.

Operational notes

  • Monitoring: queue depth, enqueue wait time, reject/drop count (if using drop mode), worker concurrency utilization, success/failure counts.
  • Rollout: start with conservative capacity and tune based on traffic. Validate that backpressure does not break user experience.
  • Rollback: if backpressure causes request timeouts, reduce enqueue volume via feature flags or move work to durable infrastructure.

Checklist

  • Queue is bounded; full-mode behavior is explicit.
  • Consumer concurrency is limited and observable.
  • Every work item has a deadline and respects cancellation.
  • Failure handling is explicit (retry, dead-letter, persist).
  • Durability requirements are documented and met (or explicitly not required).