Sign Up

Have an account? Sign In Now

Sign In

Forgot Password?

Don't have account, Sign Up Here

Forgot Password

Lost your password? Please enter your email address. You will receive a link and will create a new password via email.

Have an account? Sign In Now

You must login to ask question.

Forgot Password?

Need An Account, Sign Up Here

Please briefly explain why you feel this question should be reported.

Please briefly explain why you feel this answer should be reported.

Sign InSign Up

Softans

Softans Logo Softans Logo
Search
Ask A Question

Mobile menu

Close
Ask a Question
  • Home
  • Add group
  • Groups page
  • Communities
  • Questions
    • New Questions
    • Trending Questions
    • Must read Questions
    • Hot Questions
  • Polls
  • Tags
  • Badges
  • Users
  • Help
Home/ Questions/Q 4583
Answered
Ghulam Nabi
Ghulam Nabi
Asked: February 1, 20232023-02-01T14:31:00+00:00 2023-02-01T14:31:00+00:00

Make multiprocessing.Queue accessible from asyncio

Given a multiprocessing.Queue that is filled from different Python threads, created via ThreadPoolExecutor.submit(...).

How to access that Queue with asyncio / Trio / Anyio in a safe manner (context FastAPI) and reliable manner?

I am aware of Janus https://github.com/aio-libs/janus/blob/master/janus/__init__.py but rather prefer a custom solution here.

Asked (hopefully) more concisely: How can I do/implement the

await <something_is_in_my_multiprocessing_queue>

?

What synchronization mechanism would you suggest?

(Attention here: multiprocessing.Queue not asyncio.Queue)

make multiprocessing.queue accessible from asyncio
  • 0
  • 1 1 Answer
  • 10 Views
  • 0 Followers
  • 0
Answer
Share
  • Facebook
  • Report

1 Answer

  • Voted
  • Oldest
  • Recent
  1. Best Answer
    Ghulam Nabi
    2023-02-01T14:32:31+00:00Added an answer on February 1, 2023 at 2:32 pm

    To access a multiprocessing.Queue from asyncio, you can use the asyncio.Queue. The simplest way to do this is to wrap the multiprocessing.Queue in an asyncio.Queue and use the asyncio.Queue methods to access the data in a safe and reliable manner. You can use asyncio.Queue.put() to insert data into the queue, and asyncio.Queue.get() to retrieve data from the queue.

    For synchronization, you can use asyncio.Lock, which provides a simple way to lock access to shared resources. When a task acquires the lock, no other tasks can access the shared resource until the lock is released.

    Example implementation:

    import asyncio
    import multiprocessing
    
    async def producer(queue, lock):
        while True:
            # produce some data
            data = ...
            async with lock:
                await queue.put(data)
    
    async def consumer(queue, lock):
        while True:
            async with lock:
                data = await queue.get()
            # consume the data
            ...
    
    queue = asyncio.Queue()
    lock = asyncio.Lock()
    asyncio.run(asyncio.gather(producer(queue, lock), consumer(queue, lock)))
    

    This solution is based on asyncio, but the concept can be applied to Trio or Anyio as well.

    • 0
    • Reply
    • Share
      Share
      • Share on Facebook
      • Share on Twitter
      • Share on LinkedIn
      • Share on WhatsApp
      • Report

Leave an answer
Cancel reply

You must login to add an answer.

Forgot Password?

Need An Account, Sign Up Here

Sidebar

Ask A Question
  • Popular
  • Answers
  • Ghulam Nabi

    Why are the British confused about us calling bread rolls ...

    • 5 Answers
  • Jerry

    Add file to native target programmatically via tuist/XcodeProj

    • 4 Answers
  • Ghulam Nabi

    Is this statement, “i see him last night” can be ...

    • 4 Answers
  • Ghulam Nabi
    Ghulam Nabi added an answer To resolve the NullPointerException, you need to identify the variable… March 15, 2023 at 8:25 am
  • Ghulam Nabi
    Ghulam Nabi added an answer You can replace the PnP code in your Azure Function… February 13, 2023 at 7:11 am
  • Ghulam Nabi
    Ghulam Nabi added an answer You can use the $match stage in the aggregate pipeline… February 10, 2023 at 6:20 am

Trending Tags

android c++ cypress flutter java javascript python selenium testng webdriver

Top Members

Robert

Robert

  • 3 Questions
  • 1k Points
Luci

Luci

  • 5 Questions
  • 1k Points
Kevin O Brien

Kevin O Brien

  • 2 Questions
  • 1k Points

Explore

  • Home
  • Add group
  • Groups page
  • Communities
  • Questions
    • New Questions
    • Trending Questions
    • Must read Questions
    • Hot Questions
  • Polls
  • Tags
  • Badges
  • Users
  • Help

Footer

Softans

Softans is a social questions & Answers Engine which will help you establish your community and connect with other people.

About Us

  • Blog
  • Jobs
  • About Us
  • Meet The Team
  • Contact Us

Legal Stuff

Help

Follow

© 2021 Softans. All Rights Reserved
With Love by Softans.

Insert/edit link

Enter the destination URL

Or link to existing content

    No search term specified. Showing recent items. Search or use up and down arrow keys to select an item.