Who Runs The Callback When Using Apply_async Method Of A Multiprocessing Pool?
Solution 1:
There is indeed a hint in the docs:
callback should complete immediately since otherwise the thread which handles the results will get blocked.
The callbacks are handled in the main process, but they're run in their own separate thread. When you create a Pool
it actually creates a few Thread
objects internally:
classPool(object):
Process = Process
def__init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None):
self._setup_queues()
self._taskqueue = Queue.Queue()
self._cache = {}
... # stuff we don't care about
self._worker_handler = threading.Thread(
target=Pool._handle_workers,
args=(self, )
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
self._worker_handler.start()
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue,
self._pool, self._cache)
)
self._task_handler.daemon = True
self._task_handler._state = RUN
self._task_handler.start()
self._result_handler = threading.Thread(
target=Pool._handle_results,
args=(self._outqueue, self._quick_get, self._cache)
)
self._result_handler.daemon = True
self._result_handler._state = RUN
self._result_handler.start()
The interesting thread for us is _result_handler
; we'll get to why shortly.
Switching gears for a second, when you run apply_async
, it creates an ApplyResult
object internally to manage getting the result from the child:
defapply_async(self, func, args=(), kwds={}, callback=None):
assert self._state == RUN
result = ApplyResult(self._cache, callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
classApplyResult(object):
def__init__(self, cache, callback):
self._cond = threading.Condition(threading.Lock())
self._job = job_counter.next()
self._cache = cache
self._ready = False
self._callback = callback
cache[self._job] = self
def_set(self, i, obj):
self._success, self._value = obj
if self._callback and self._success:
self._callback(self._value)
self._cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
del self._cache[self._job]
As you can see, the _set
method is the one that ends up actually executing the callback
passed in, assuming the task was successful. Also notice that it adds itself to a global cache
dict at the end of __init__
.
Now, back to the _result_handler
thread object. That object calls the _handle_results
function, which looks like this:
while1:
try:
task = get()
except (IOError, EOFError):
debug('result handler got EOFError/IOError -- exiting')
returnif thread._state:
assert thread._state == TERMINATE
debug('result handler found thread._state=TERMINATE')
breakif task isNone:
debug('result handler got sentinel')
break
job, i, obj = task
try:
cache[job]._set(i, obj) # Here is _set (and therefore our callback) being called!except KeyError:
pass# More stuff
It's a loop that just pulls results from children out of queue, finds the entry for it in cache
, and calls _set
, which executes our callback. It's able to run even though you're in a loop because it isn't running in the main thread.
Post a Comment for "Who Runs The Callback When Using Apply_async Method Of A Multiprocessing Pool?"