Threading Condition Variable

      No Comments on Threading Condition Variable

Below is a demonstration of using a condition variable. The purpose of it is waiting on some condition to be met, related to a shared resource, and doing it in a thread-safe manner.

In the below example, we demonstrate how we make sure my_set is being accessed safely, while making it efficient to wait on different conditions (waiting for the set the have at least one element, or waiting for the set to be empty).

I will let the code speak for itself – make sure to read and understand the comments, understand the output and the timestamp differences.

NOTE: the code below is not Python 2 compatible (the missing link is the wait_for method of the condition variable. See sample implementation below.

import threading
import logging
import time

logging.basicConfig(
    level=logging.DEBUG,
    # format='%(levelname)-7s [%(asctime)s] [T:%(threadName)s]: %(message)s')
    format='%(levelname)-7s [%(relativeCreated)6d] [T:%(threadName)s]: %(message)s')

logger = logging.getLogger()

cv = threading.Condition()
my_set = set()
all_done = False

def run():
    logger.info('started')
    while not all_done:
        with cv:
            # acquiring the condition lock, but not waiting on it,
            # thus essentially delaying anyone else from using the
            # resource
            logger.info('acquired')
            time.sleep(2)
            logger.info('waiting for elements')
            # that's the correct way to use the condition - just
            # wait for a specific condition to be met
            cv.wait_for(lambda: my_set)
            item = my_set.pop()
            logger.info('found %s items in set. got "%s"',
                        len(my_set) + 1, item)
            # we want to notify once the resource has changed,
            # to make waiting on the resource more effective -
            # if another thread is waiting for the resource to
            # become empty, for example
            cv.notify()

    logger.info('done')


def locking_thread():
    # this thread is simply acquiring the resource just for kicks, 
    # blocking everyone else
    logger.info('started')
    with cv:
        logger.info('acquired')
        time.sleep(5)
        logger.info('releasing')


def main():
    t1 = threading.Thread(target=run, name=f'ID-1')
    t1.start()
    t2 = threading.Thread(target=locking_thread, name='ID-2')
    t2.start()

    time.sleep(0.1)
    logger.info('1. trying to acquire')
    with cv:
        logger.info('1. adding')
        my_set.add(99)
        my_set.add(88)
        # notice how we only notify *once* - even though each `wait`
        # call requires a `notify` to be released, the idea is that
        # the other thread will check the condition before attempting
        # at waiting again
        cv.notify()

    time.sleep(0.5)
    # since we slept for short while thread-1 will probably acquire 
    # the condition-lock so a delay of few seconds is expected 
    # between trying to acquire and adding an item
    logger.info('2. trying to acquire')
    with cv:
        logger.info('2. adding')
        my_set.add(77)
        cv.notify()

    logger.info('3. trying to acquire')
    with cv:
        logger.info('waiting for empty')
        cv.wait_for(lambda: not my_set)
        logger.info('set is empty')

    logger.info('marking all done')
    global all_done
    all_done = True
    with cv:
        logger.info('release thread')
        my_set.add(None)
        cv.notify()

    t1.join()
    t2.join()
    # since we waiting on the other threads, we can now "touch" 
    # `my_set` and it is expected to be empty
    logger.info('done. set: %s', my_set)


if __name__ == '__main__':
    main()

The expected output is something like this:

INFO    [     2] [T:ID-1]: started
INFO    [     2] [T:ID-2]: started
INFO    [   107] [T:MainThread]: 1. trying to acquire
INFO    [  2006] [T:ID-1]: waiting for elements
INFO    [  2006] [T:ID-2]: acquired
INFO    [  7010] [T:ID-2]: releasing
INFO    [  7011] [T:MainThread]: 1. adding
INFO    [  7011] [T:ID-1]: found 2 items in set. got "88"
INFO    [  7515] [T:MainThread]: 2. trying to acquire
INFO    [  9014] [T:ID-1]: waiting for elements
INFO    [  9014] [T:ID-1]: found 1 items in set. got "99"
INFO    [ 11019] [T:ID-1]: waiting for elements
INFO    [ 11019] [T:MainThread]: 2. adding
INFO    [ 11019] [T:MainThread]: 3. trying to acquire
INFO    [ 11019] [T:MainThread]: waiting for empty
INFO    [ 11019] [T:ID-1]: found 1 items in set. got "77"
INFO    [ 13022] [T:ID-1]: waiting for elements
INFO    [ 13022] [T:MainThread]: set is empty
INFO    [ 13022] [T:MainThread]: marking all done
INFO    [ 13022] [T:MainThread]: release thread
INFO    [ 13022] [T:ID-1]: found 1 items in set. got "None"
INFO    [ 13022] [T:ID-1]: done
INFO    [ 13022] [T:MainThread]: done. set: set()

Appendix: Condition.wait_for implementation (sample!)

NOTE: this isn’t thoroughly tested – free to use at your own risk.

def wait_for(cv, predicate):
    # type: (threading.Condition, Callable[[], bool]) -> None
    """Python 2/3 compatible implementation"""

    if six.PY2:
        while not predicate():
            cv.wait()
    elif six.PY3:
        cv.wait_for(predicate)
    else:
        raise NotImplementedError(
            'unknown Python version for "wait_for" function')


# usage:
cv = threading.Condition()
wait_for(cv, lambda: my_set)

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.