Below is a demonstration of using a condition variable. The purpose of it is waiting on some condition to be met, related to a shared resource, and doing it in a thread-safe manner.
In the below example, we demonstrate how we make sure my_set
is being accessed safely, while making it efficient to wait on different conditions (waiting for the set the have at least one element, or waiting for the set to be empty).
I will let the code speak for itself – make sure to read and understand the comments, understand the output and the timestamp differences.
NOTE: the code below is not Python 2 compatible (the missing link is the wait_for
method of the condition variable. See sample implementation below.
import threading import logging import time logging.basicConfig( level=logging.DEBUG, # format='%(levelname)-7s [%(asctime)s] [T:%(threadName)s]: %(message)s') format='%(levelname)-7s [%(relativeCreated)6d] [T:%(threadName)s]: %(message)s') logger = logging.getLogger() cv = threading.Condition() my_set = set() all_done = False def run(): logger.info('started') while not all_done: with cv: # acquiring the condition lock, but not waiting on it, # thus essentially delaying anyone else from using the # resource logger.info('acquired') time.sleep(2) logger.info('waiting for elements') # that's the correct way to use the condition - just # wait for a specific condition to be met cv.wait_for(lambda: my_set) item = my_set.pop() logger.info('found %s items in set. got "%s"', len(my_set) + 1, item) # we want to notify once the resource has changed, # to make waiting on the resource more effective - # if another thread is waiting for the resource to # become empty, for example cv.notify() logger.info('done') def locking_thread(): # this thread is simply acquiring the resource just for kicks, # blocking everyone else logger.info('started') with cv: logger.info('acquired') time.sleep(5) logger.info('releasing') def main(): t1 = threading.Thread(target=run, name=f'ID-1') t1.start() t2 = threading.Thread(target=locking_thread, name='ID-2') t2.start() time.sleep(0.1) logger.info('1. trying to acquire') with cv: logger.info('1. adding') my_set.add(99) my_set.add(88) # notice how we only notify *once* - even though each `wait` # call requires a `notify` to be released, the idea is that # the other thread will check the condition before attempting # at waiting again cv.notify() time.sleep(0.5) # since we slept for short while thread-1 will probably acquire # the condition-lock so a delay of few seconds is expected # between trying to acquire and adding an item logger.info('2. trying to acquire') with cv: logger.info('2. adding') my_set.add(77) cv.notify() logger.info('3. trying to acquire') with cv: logger.info('waiting for empty') cv.wait_for(lambda: not my_set) logger.info('set is empty') logger.info('marking all done') global all_done all_done = True with cv: logger.info('release thread') my_set.add(None) cv.notify() t1.join() t2.join() # since we waiting on the other threads, we can now "touch" # `my_set` and it is expected to be empty logger.info('done. set: %s', my_set) if __name__ == '__main__': main()
The expected output is something like this:
INFO [ 2] [T:ID-1]: started INFO [ 2] [T:ID-2]: started INFO [ 107] [T:MainThread]: 1. trying to acquire INFO [ 2006] [T:ID-1]: waiting for elements INFO [ 2006] [T:ID-2]: acquired INFO [ 7010] [T:ID-2]: releasing INFO [ 7011] [T:MainThread]: 1. adding INFO [ 7011] [T:ID-1]: found 2 items in set. got "88" INFO [ 7515] [T:MainThread]: 2. trying to acquire INFO [ 9014] [T:ID-1]: waiting for elements INFO [ 9014] [T:ID-1]: found 1 items in set. got "99" INFO [ 11019] [T:ID-1]: waiting for elements INFO [ 11019] [T:MainThread]: 2. adding INFO [ 11019] [T:MainThread]: 3. trying to acquire INFO [ 11019] [T:MainThread]: waiting for empty INFO [ 11019] [T:ID-1]: found 1 items in set. got "77" INFO [ 13022] [T:ID-1]: waiting for elements INFO [ 13022] [T:MainThread]: set is empty INFO [ 13022] [T:MainThread]: marking all done INFO [ 13022] [T:MainThread]: release thread INFO [ 13022] [T:ID-1]: found 1 items in set. got "None" INFO [ 13022] [T:ID-1]: done INFO [ 13022] [T:MainThread]: done. set: set()
Appendix: Condition.wait_for
implementation (sample!)
NOTE: this isn’t thoroughly tested – free to use at your own risk.
def wait_for(cv, predicate): # type: (threading.Condition, Callable[[], bool]) -> None """Python 2/3 compatible implementation""" if six.PY2: while not predicate(): cv.wait() elif six.PY3: cv.wait_for(predicate) else: raise NotImplementedError( 'unknown Python version for "wait_for" function') # usage: cv = threading.Condition() wait_for(cv, lambda: my_set)