Skip to content Skip to sidebar Skip to footer

Python Threadpool With Limited Task Queue Size

My problem is the following: I have a multiprocessing.pool.ThreadPool object with worker_count workers and a main pqueue from which I feed tasks to the pool. The flow is as follow

Solution 1:

ThreadPool is a simple tool for a common task. If you want to manage the queue yourself, to get DFS behavior; you could implement the necessary functionality on top threading and queue modules directly.

To prevent scheduling the next root task until all tasks spawned by the current task are done ("DFS"-like order), you could use Queue.join():

#!/usr/bin/env python3import queue
import random
import threading
import time

defworker(q, multiplicity=5, maxlevel=3, lock=threading.Lock()):
    for task initer(q.get, None):  # blocking get until None is receivedtry:
            iflen(task) < maxlevel:
                for i inrange(multiplicity):
                    q.put(task + str(i))  # schedule the next level
            time.sleep(random.random())  # emulate some workwith lock:
                print(task)
        finally:
            q.task_done()

worker_count = 2
q = queue.LifoQueue()
threads = [threading.Thread(target=worker, args=[q], daemon=True)
           for _ inrange(worker_count)]
for t in threads:
    t.start()

for task in"01234":  # populate the first level
    q.put(task)
    q.join()  # block until all spawned tasks are donefor _ in threads:  # signal workers to quit
    q.put(None)
for t in threads:  # wait until workers exit
    t.join()

The code example is derived from the example in the queue module documentation.

The task at each level spawns multiplicity direct child tasks that spawn their own subtasks until maxlevel is reached.

None is used to signal the workers that they should quit. t.join() is used to wait until threads exit gracefully. If the main thread is interrupted for any reason then the daemon threads are killed unless there are other non-daemon threads (you might want to provide SIGINT hanlder, to signal the workers to exit gracefully on Ctrl+C instead of just dying).

queue.LifoQueue() is used, to get "Last In First Out" order (it is approximate due to multiple threads).

The maxsize is not set because otherwise the workers may deadlock--you have to put the task somewhere anyway. worker_count background threads are running regardless of the task queue.

Post a Comment for "Python Threadpool With Limited Task Queue Size"