Skip to content

WorkerQueue

The WorkerQueue class provides an asynchronous, cache-aware worker queue for Python, designed to process items concurrently with automatic caching and graceful shutdown. It is ideal for scenarios where you want to avoid redundant work, manage concurrency, and ensure results are cached for future requests.


Why?

Consider the following scenario:

async def expensive_task(x):
    # Simulate a costly operation
    await asyncio.sleep(1)
    return x * 2

results = {}
async def cache_get(x):
    return results.get(x)

async def cache_set(x, value):
    results[x] = value

Managing concurrent requests, avoiding duplicate work, and handling shutdowns can be complex and error-prone. WorkerQueue abstracts these concerns, providing a robust, reusable solution:

from escudeiro.ds.worker import WorkerQueue

queue = WorkerQueue(
    worker=expensive_task,
    cache_get=cache_get,
    cache_set=cache_set,
    maxsize=5,
)

result = await queue.require(10)  # Will compute and cache
result2 = await queue.require(10) # Will use cache

Features

  • Async worker queue with concurrent processing
  • Automatic caching to avoid redundant work
  • Graceful shutdown and context manager support
  • Exception handling and error propagation
  • Global registry for managing multiple queues
  • Customizable concurrency via maxsize

Usage

Basic Example

import asyncio
from escudeiro.ds.worker import WorkerQueue

async def worker_fn(x):
    await asyncio.sleep(1)
    return x * 2

cache = {}

async def cache_get(x):
    return cache.get(x)

async def cache_set(x, value):
    cache[x] = value

queue = WorkerQueue(
    worker=worker_fn,
    cache_get=cache_get,
    cache_set=cache_set,
    maxsize=3,
)

async def main():
    print(await queue.require(5))  # Computes and caches
    print(await queue.require(5))  # Uses cache

asyncio.run(main())

Using as an Async Context Manager

async with WorkerQueue(
    worker=worker_fn,
    cache_get=cache_get,
    cache_set=cache_set,
) as queue:
    result = await queue.require(42)

Graceful Shutdown

To ensure all tasks are finished and resources are released:

await queue.aclose()

Or close all queues:

await WorkerQueue.aclose_all()

API Reference

Class: WorkerQueue[T: Hashable, R]

Constructor

WorkerQueue(
    worker: Callable[[T], Awaitable[R]],
    cache_get: Callable[[T], Awaitable[R | None]],
    cache_set: Callable[[T, R], Awaitable[None]],
    maxsize: int = 3,
    finish_timeout: float = 3.0,
)
  • worker: Async function to process items.
  • cache_get: Async function to retrieve cached results.
  • cache_set: Async function to store results in cache.
  • maxsize: Maximum number of items in the queue and concurrent tasks.
  • finish_timeout: Timeout for finishing pending tasks during shutdown.

Methods

  • await require(item: T) -> R: Request processing of an item, using cache if available.
  • await aclose(): Gracefully shutdown this worker queue.
  • @classmethod await aclose_all(): Shutdown all active queues.
  • running: bool: Property indicating if the worker is active.
  • async def __aenter__(): Start worker on entering async context.
  • async def __aexit__(): Shutdown worker on exiting async context.

Notes

  • If an item is already being processed, require will await the result instead of duplicating work.
  • The queue automatically starts the worker task as needed.
  • Use aclose() or context manager to ensure all tasks are completed and exceptions are handled.
  • The class maintains a global registry for easy management of multiple queues.

See Also