Skip to content Skip to sidebar Skip to footer

How Can You Wait For Completion Of A Callback Submitted From Another Thread?

I have two Python threads that share some state, A and B. At one point, A submits a callback to be run by B on its loop with something like: # This line is executed by A loop.call_

Solution 1:

Callbacks are set and (mostly) forget. They are not intended to be used for something you need to get a result back from. This is why the handle produced only lets you cancel a callback (this callback is no longer needed), nothing more.

If you need to wait for a result from an asyncio-managed coroutine in another thread, use a coroutine and schedule it as a task with asyncio.run_coroutine_threadsafe(); this gives you a Future() instance, which you can then wait for to be done.

However, stopping the loop with run_coroutine_threadsafe() does require the loop to handle one more round of callbacks than it'll actually be able to run; the Future returned by run_coroutine_threadsafe() would otherwise not be notified of the state change of the task it scheduled. You can remedy this by running asyncio.sleep(0) through loop.run_until_complete() in thread B before closing the loop:

def thread_A():
    # ... 
    # when done, schedule the asyncio loop to exit
    future = asyncio.run_coroutine_threadsafe(shutdown_loop(loop), loop)
    future.result()  # wait for the shutdown to complete
    print("Thread A out")

def thread_B(loop):
    print("Thread B")
    asyncio.set_event_loop(loop)
    loop.run_forever()
    # run one last noop task in the loop to clear remaining callbacks
    loop.run_until_complete(asyncio.sleep(0))
    loop.close()
    print("Thread B out")

async def shutdown_loop(loop):
    print("Stopping loop")
    loop.stop()

This is, of course, slightly hacky and depends on the internals of callback management and cross-threading task scheduling to not change. As the default asyncio implementation stands, running a single noop task is plenty for several rounds of callbacks creating more callbacks being handled, but alternative loop implementations may handle this differently.

So for shutting down the loop, you may be better off using thread-based coordination:

def thread_A():
    # ...
    callback_event = threading.Event()
    loop.call_soon_threadsafe(callback, loop, callback_event)
    callback_event.wait()  # wait for the shutdown to complete
    print("Thread A out")

def thread_B(loop):
    print("Thread B")
    asyncio.set_event_loop(loop)
    loop.run_forever()
    loop.close()
    print("Thread B out")

def callback(loop, callback_event):
    print("Stopping loop")
    loop.stop()
    callback_event.set()

Solution 2:

Is there any way (besides standard threading synchronization primitives) to make A wait for the completion of the callback?

Normally you'd use run_coroutine_threadsafe, as Martijn initially suggested. But your use of loop.stop() makes the callback somewhat specific. Given that, you are probably best off using the standard thread synchronization primitives, which are in this case very straightforward and can be completely decoupled from the callback implementation and the rest of your code. For example:

def submit_and_wait(loop, fn, *args):
    "Submit fn(*args) to loop, and wait until the callback executes."
    done = threading.Event()
    def wrap_fn():
        try:
            fn(*args)
        finally:
            done.set()
    loop.call_soon_threadsafe(wrap_fn)
    done.wait()

Instead of using loop.call_soon_threadsafe(callback), use submit_and_wait(loop, callback). The threading synchronization is there, but completely hidden inside submit_and_wait.


Post a Comment for "How Can You Wait For Completion Of A Callback Submitted From Another Thread?"