|
Monitors
User-defined shareable, mutable objects
Given how limiting it is to be unable to share mutable objects, a way is needed to get around the restrictions. Monitors are the primary tool for this. Essentially what a Monitor does is it creates a wall around your object — nothing unshareable gets in or out. It then lets one thread at a time inside this wall, forcing any other threads to wait until the first one leaves. # counter.py
from __future__ import shared_module
from threadtools import Monitor, monitormethod
class Counter(Monitor):
"""A simple counter, shared between threads"""
__shared__ = True # More shared_module boilerplate
def __init__(self):
self.count = 0
@monitormethod
def tick(self):
self.count += 1
@monitormethod
def value(self):
return self.countThe count attribute is hidden within the Counter instances. @monitormethod indicates that a function should run within the Monitor. Counter can be used like this: # main.py
from __future__ import shared_module
from counter import Counter
from threadtools import branch
def work(c):
for i in range(20):
c.tick()
def main():
c = Counter()
with branch() as children:
for i in range(10):
children.add(work, c)
print("Number of ticks:", c.value())When main is called it creates a Counter, then spawns 10 threads to access it. Each of those threads calls c.tick() 20 times. After the threads exit the total number of ticks is printed: 200. CaveatDue to some problems with how python runs its startup script, the __future__ import does not work. To work around this you should explicitly import main.py, then run its main function. $ ./python -c 'import main; main.main()' Number of ticks: 200 $ ConditionsIn addition to allowing you to share objects between threads, Monitors also provide a facility for waiting until you can do a certain activity on those those objects - called a condition. # queue.py
from __future__ import shared_module
from collections import deque
from threadtools import Monitor, monitormethod, condition, wait
class Queue(Monitor):
"""A simple thread-safe queue"""
__shared__ = True # More shared_module boilerplate
def __init__(self, limit=None):
self.data = deque()
self.limit = limit
@condition
def _notfull(self):
if self.limit is None:
return True
return len(self.data) < self.limit
@condition
def _notempty(self):
return bool(self.data)
@monitormethod
def put(self, value):
wait(self._notfull)
self.data.append(value)
@monitormethod
def get(self):
wait(self._notempty)
return self.data.popleft()Although similar to traditional conditions used for threading, these are defined as part of the class, which gives them some interesting properties:
Additionally, waiting on a condition is a cancellable operation. |
Sign in to add a comment