Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Lib/concurrent/futures/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,15 @@ def _worker(executor_reference, work_queue):
_base.LOGGER.critical('Exception in worker', exc_info=True)

class ThreadPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, thread_name_prefix=''):
def __init__(self, max_workers=None, thread_name_prefix='', max_queue_size=0):
"""Initializes a new ThreadPoolExecutor instance.

Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
max_queue_size: The maximum number of work items to buffer before
submit() blocks, defaults to 0 (infinite).
"""
if max_workers is None:
# Use this number because ThreadPoolExecutor is often
Expand All @@ -96,8 +98,12 @@ def __init__(self, max_workers=None, thread_name_prefix=''):
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")

# check that max_queue_size is a positive integer
if type(max_queue_size) is not int or max_queue_size < 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it strictly be an int? Classes that inherit from int won't pass this condition unless isinstance is used. Either way I'm not sure if doing any check for the type is a good idea since the Queue class generally doesn't (and fails when you put something in it)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also fine if we leave verification to the Queue.

raise ValueError("max_queue_size must be equal or greater 0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slight rewording to "greater or equal to 0" is possibly better.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would remove this altogether again as you suggested.


self._max_workers = max_workers
self._work_queue = queue.Queue()
self._work_queue = queue.Queue(maxsize=max_queue_size)
self._threads = set()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxsize is the sole arg to the initializer of Queue so you could just supply max_queue_size since it's also pretty descriptive.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the sole arg to the initializer

What if this changes? Is it then still safe to just pass max_queue_size as positional argument?

self._shutdown = False
self._shutdown_lock = threading.Lock()
Expand Down
19 changes: 19 additions & 0 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,25 @@ def test_thread_names_default(self):
self.assertRegex(t.name, r'^.*ThreadPoolExecutor.*_[0-4]$')
t.join()

def test_default_max_queue_size(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a higher-level test, i.e. create an executor with a non-zero queue size, submit a number of tasks larger than the queue size, and check that they all get executed in the end.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea :)

executor = futures.ThreadPoolExecutor()
self.assertEqual(executor._work_queue.maxsize, 0)

def test_custom_max_queue_size(self):
for i in range(0, 10):
executor = futures.ThreadPoolExecutor(max_queue_size=i)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean to have a negative queue size?

Copy link
Author

@prayerslayer prayerslayer Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing really, but since the Queue constructor doesn't check for validity of arguments I wanted to add a test for invalid arguments. (To be sure that I didn't break this behavior.)

self.assertEqual(executor._work_queue.maxsize, i)

def test_negative_max_queue_size(self):
for i in range(-1, -10):
with self.assertRaises(ValueError):
futures.ThreadPoolExecutor(max_queue_size=i)

def test_invalid_max_queue_size(self):
for bad_value in [None, "5", False, True, 3.14]:
with self.assertRaises(ValueError):
futures.ThreadPoolExecutor(max_queue_size=bad_value)


class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.TestCase):
def _prime_executor(self):
Expand Down