Given a multiprocessing.Queue
that is filled from different Python threads, created via ThreadPoolExecutor.submit(...)
.
How to access that Queue with asyncio / Trio / Anyio in a safe manner (context FastAPI) and reliable manner?
I am aware of Janus https://github.com/aio-libs/janus/blob/master/janus/__init__.py but rather prefer a custom solution here.
Asked (hopefully) more concisely: How can I do/implement the
await <something_is_in_my_multiprocessing_queue>
?
What synchronization mechanism would you suggest?
(Attention here: multiprocessing.Queue
not asyncio.Queue
)
To access a multiprocessing.Queue from asyncio, you can use the asyncio.Queue. The simplest way to do this is to wrap the multiprocessing.Queue in an asyncio.Queue and use the asyncio.Queue methods to access the data in a safe and reliable manner. You can use asyncio.Queue.put() to insert data into the queue, and asyncio.Queue.get() to retrieve data from the queue.
For synchronization, you can use asyncio.Lock, which provides a simple way to lock access to shared resources. When a task acquires the lock, no other tasks can access the shared resource until the lock is released.
Example implementation:
This solution is based on asyncio, but the concept can be applied to Trio or Anyio as well.