Below is a demonstration of using a condition variable. The purpose of it is waiting on some condition to be met, related to a shared resource, and doing it in a thread-safe manner.
In the below example, we demonstrate how we make sure my_set
is being accessed safely, while making it efficient to wait on different conditions (waiting for the set the have at least one element, or waiting for the set to be empty).
I will let the code speak for itself – make sure to read and understand the comments, understand the output and the timestamp differences.
NOTE: the code below is not Python 2 compatible (the missing link is the wait_for
method of the condition variable. See sample implementation below.
import threading
import logging
import time
logging.basicConfig(
level=logging.DEBUG,
# format='%(levelname)-7s [%(asctime)s] [T:%(threadName)s]: %(message)s')
format='%(levelname)-7s [%(relativeCreated)6d] [T:%(threadName)s]: %(message)s')
logger = logging.getLogger()
cv = threading.Condition()
my_set = set()
all_done = False
def run():
logger.info('started')
while not all_done:
with cv:
# acquiring the condition lock, but not waiting on it,
# thus essentially delaying anyone else from using the
# resource
logger.info('acquired')
time.sleep(2)
logger.info('waiting for elements')
# that's the correct way to use the condition - just
# wait for a specific condition to be met
cv.wait_for(lambda: my_set)
item = my_set.pop()
logger.info('found %s items in set. got "%s"',
len(my_set) + 1, item)
# we want to notify once the resource has changed,
# to make waiting on the resource more effective -
# if another thread is waiting for the resource to
# become empty, for example
cv.notify()
logger.info('done')
def locking_thread():
# this thread is simply acquiring the resource just for kicks,
# blocking everyone else
logger.info('started')
with cv:
logger.info('acquired')
time.sleep(5)
logger.info('releasing')
def main():
t1 = threading.Thread(target=run, name=f'ID-1')
t1.start()
t2 = threading.Thread(target=locking_thread, name='ID-2')
t2.start()
time.sleep(0.1)
logger.info('1. trying to acquire')
with cv:
logger.info('1. adding')
my_set.add(99)
my_set.add(88)
# notice how we only notify *once* - even though each `wait`
# call requires a `notify` to be released, the idea is that
# the other thread will check the condition before attempting
# at waiting again
cv.notify()
time.sleep(0.5)
# since we slept for short while thread-1 will probably acquire
# the condition-lock so a delay of few seconds is expected
# between trying to acquire and adding an item
logger.info('2. trying to acquire')
with cv:
logger.info('2. adding')
my_set.add(77)
cv.notify()
logger.info('3. trying to acquire')
with cv:
logger.info('waiting for empty')
cv.wait_for(lambda: not my_set)
logger.info('set is empty')
logger.info('marking all done')
global all_done
all_done = True
with cv:
logger.info('release thread')
my_set.add(None)
cv.notify()
t1.join()
t2.join()
# since we waiting on the other threads, we can now "touch"
# `my_set` and it is expected to be empty
logger.info('done. set: %s', my_set)
if __name__ == '__main__':
main()
The expected output is something like this:
INFO [ 2] [T:ID-1]: started
INFO [ 2] [T:ID-2]: started
INFO [ 107] [T:MainThread]: 1. trying to acquire
INFO [ 2006] [T:ID-1]: waiting for elements
INFO [ 2006] [T:ID-2]: acquired
INFO [ 7010] [T:ID-2]: releasing
INFO [ 7011] [T:MainThread]: 1. adding
INFO [ 7011] [T:ID-1]: found 2 items in set. got "88"
INFO [ 7515] [T:MainThread]: 2. trying to acquire
INFO [ 9014] [T:ID-1]: waiting for elements
INFO [ 9014] [T:ID-1]: found 1 items in set. got "99"
INFO [ 11019] [T:ID-1]: waiting for elements
INFO [ 11019] [T:MainThread]: 2. adding
INFO [ 11019] [T:MainThread]: 3. trying to acquire
INFO [ 11019] [T:MainThread]: waiting for empty
INFO [ 11019] [T:ID-1]: found 1 items in set. got "77"
INFO [ 13022] [T:ID-1]: waiting for elements
INFO [ 13022] [T:MainThread]: set is empty
INFO [ 13022] [T:MainThread]: marking all done
INFO [ 13022] [T:MainThread]: release thread
INFO [ 13022] [T:ID-1]: found 1 items in set. got "None"
INFO [ 13022] [T:ID-1]: done
INFO [ 13022] [T:MainThread]: done. set: set()
Appendix: Condition.wait_for
implementation (sample!)
NOTE: this isn’t thoroughly tested – free to use at your own risk.
def wait_for(cv, predicate):
# type: (threading.Condition, Callable[[], bool]) -> None
"""Python 2/3 compatible implementation"""
if six.PY2:
while not predicate():
cv.wait()
elif six.PY3:
cv.wait_for(predicate)
else:
raise NotImplementedError(
'unknown Python version for "wait_for" function')
# usage:
cv = threading.Condition()
wait_for(cv, lambda: my_set)