Examples
- Sleeping Barber
- Dining Philosophers
- Word Sort
- Load Balancer
Sleeping Barber
``` import org.gparallelizer.actors.pooledActors.PooledActorGroup import org.gparallelizer.actors.pooledActors.AbstractPooledActor import org.gparallelizer.actors.Actor
final def group = new PooledActorGroup()
final def barber = group.actor { final def random = new Random() loop { react {message -> switch (message) { case Enter: message.customer.send new Start() println "Barber: Processing customer ${message.customer.name}" doTheWork(random) message.customer.send new Done() message.reply new Next() break case Wait: println "Barber: No customers. Going to have a sleep" break } } } }.start()
private def doTheWork(Random random) { Thread.sleep(random.nextInt(10) * 1000) }
final Actor waitingRoom
waitingRoom = group.actor { final int capacity = 5 final List waitingCustomers = [] boolean barberAsleep = true
loop {
react {message ->
switch (message) {
case Enter:
if (waitingCustomers.size() == capacity) {
reply new Full()
} else {
waitingCustomers << message.customer
if (barberAsleep) {
assert waitingCustomers.size() == 1
barberAsleep = false
waitingRoom.send new Next()
}
else reply new Wait()
}
break
case Next:
if (waitingCustomers.size()>0) {
def customer = waitingCustomers.remove(0)
barber.send new Enter(customer:customer)
} else {
barber.send new Wait()
barberAsleep = true
}
}
}
}
}.start()
class Customer extends AbstractPooledActor { String name Actor localBarbers
void act() {
localBarbers << new Enter(customer:this)
loop {
react {message ->
switch (message) {
case Full:
println "Customer: $name: The waiting room is full. I am leaving."
stop()
break
case Wait:
println "Customer: $name: I will wait."
break
case Start:
println "Customer: $name: I am now being served."
break
case Done:
println "Customer: $name: I have been served."
break
}
}
}
}
}
class Enter { Customer customer } class Full {} class Wait {} class Next {} class Start {} class Done {}
new Customer(name:'Joe', localBarbers:waitingRoom).start() new Customer(name:'Dave', localBarbers:waitingRoom).start() new Customer(name:'Alice', localBarbers:waitingRoom).start()
System.in.read() ```
Dining Philosophers
``` import org.gparallelizer.actors.pooledActors.AbstractPooledActor import org.gparallelizer.actors.pooledActors.PooledActors
PooledActors.defaultPooledActorGroup.resize 5
final class Philosopher extends AbstractPooledActor { private Random random = new Random()
String name
def forks = []
void act() {
assert 2 == forks.size()
loop {
think()
forks*.send new Take()
react {a, b ->
if ([a, b].any {Rejected.isCase it}) {
println "$name: \tOops, can't get my forks! Giving up."
[a, b].find {Accepted.isCase it}?.reply new Finished()
} else {
eat()
reply new Finished()
}
}
}
}
void think() {
println "$name: \tI'm thinking"
Thread.sleep random.nextInt(5000)
println "$name: \tI'm done thinking"
}
void eat() {
println "$name: \tI'm EATING"
Thread.sleep random.nextInt(2000)
println "$name: \tI'm done EATING"
}
}
final class Fork extends AbstractPooledActor {
String name
boolean available = true
void act() {
loop {
react {message ->
switch (message) {
case Take:
if (available) {
available = false
reply new Accepted()
} else reply new Rejected()
break
case Finished:
assert !available
available = true
break
default: throw new IllegalStateException("Cannot process the message: $message")
}
}
}
}
}
final class Take {} final class Accepted {} final class Rejected {} final class Finished {}
def forks = [ new Fork(name:'Fork 1'), new Fork(name:'Fork 2'), new Fork(name:'Fork 3'), new Fork(name:'Fork 4'), new Fork(name:'Fork 5') ]
def philosophers = [ new Philosopher(name:'Joe', forks:[forks[0], forks[1]]), new Philosopher(name:'Dave', forks:[forks[1], forks[2]]), new Philosopher(name:'Alice', forks:[forks[2], forks[3]]), new Philosopher(name:'James', forks:[forks[3], forks[4]]), new Philosopher(name:'Phil', forks:[forks[4], forks[0]]), ]
forks*.start() philosophers*.start()
System.in.read() ```
Word sort
Given a folder name, the script will sort words in all files in the folder. The SortMaster actor creates a given number of WordSortActors, splits among them the files to sort words in and collects the results.
Inspired by Scala Concurrency blog post by Michael Galpin
``` //Messages private final class FileToSort { String fileName } private final class SortResult { String fileName; List words }
//Worker actor final class WordSortActor extends AbstractPooledActor {
private List<String> sortedWords(String fileName) {
parseFile(fileName).sort {it.toLowerCase()}
}
private List<String> parseFile(String fileName) {
List<String> words = []
new File(fileName).splitEachLine(' ') {words.addAll(it)}
return words
}
void act() {
loop {
react {message ->
switch (message) {
case FileToSort:
println "Sorting file=${message.fileName} on thread ${Thread.currentThread().name}"
reply new SortResult(fileName: message.fileName, words: sortedWords(message.fileName))
}
}
}
}
}
//Master actor final class SortMaster extends AbstractPooledActor {
String docRoot = '/'
int numActors = 1
List<List<String>> sorted = []
private CountDownLatch startupLatch = new CountDownLatch(1)
private CountDownLatch doneLatch
private void beginSorting() {
int cnt = sendTasksToWorkers()
doneLatch = new CountDownLatch(cnt)
}
private List createWorkers() {
return (1..numActors).collect {new WordSortActor().start()}
}
private int sendTasksToWorkers() {
List<PooledActor> workers = createWorkers()
int cnt = 0
new File(docRoot).eachFile {
workers[cnt % numActors] << new FileToSort(fileName: it)
cnt += 1
}
return cnt
}
public void waitUntilDone() {
startupLatch.await()
doneLatch.await()
}
void act() {
beginSorting()
startupLatch.countDown()
loop {
react {
switch (it) {
case SortResult:
sorted << it.words
doneLatch.countDown()
println "Received results for file=${it.fileName}"
}
}
}
}
}
//start the actors to sort words def master = new SortMaster(docRoot: 'C:/dev/TeamCity/logs/', numActors: 5).start() master.waitUntilDone() println 'Done' println master.sorted ```
Load Balancer
Demonstrates work balancing among adaptable set of workers. The load balancer receives tasks and queues them in a temporary task queue. When a worker finishes his assignment, it asks the load balancer for a new task.
If the load balancer doesn't have any tasks available in the task queue, the worker is stopped. If the number of tasks in the task queue exceeds certain limit, a new worker is created to increase size of the worker pool.
``` import org.gparallelizer.actors.Actor import org.gparallelizer.actors.DefaultThreadActor import org.gparallelizer.actors.pooledActors.AbstractPooledActor
final class LoadBalancer extends DefaultThreadActor { int workers = 0 List taskQueue = [] private static final QUEUE_SIZE_TRIGGER = 10
void act() {
def message = receive()
switch (message) {
case NeedMoreWork:
if (taskQueue.size()==0) {
println 'No more tasks in the task queue. Terminating the worker.'
message.reply DemoWorker.EXIT
workers -= 1
} else message.reply taskQueue.remove(0)
break
case WorkToDo:
taskQueue << message
if ((workers==0) || (taskQueue.size()>=QUEUE_SIZE_TRIGGER)) {
println 'Need more workers. Starting one.'
workers += 1
new DemoWorker(this).start()
}
}
println "Active workers=${workers}\tTasks in queue=${taskQueue.size()}"
}
}
final class DemoWorker extends AbstractPooledActor { final static Object EXIT = new Object() private static final Random random = new Random()
Actor balancer
def DemoWorker(balancer) {
this.balancer = balancer
}
void act() {
loop {
this.balancer << new NeedMoreWork()
react {
switch (it) {
case WorkToDo:
processMessage(it)
break
case EXIT:stop()
}
}
}
}
private void processMessage(message) {
synchronized(random) {
Thread.sleep random.nextInt(5000)
}
}
} final class WorkToDo{} final class NeedMoreWork{}
final Actor balancer = new LoadBalancer().start()
//produce tasks for(i in 1..20) { Thread.sleep 100 balancer << new WorkToDo() }
//produce tasks in a parallel thread Thread.start { for(i in 1..10) { Thread.sleep 1000 balancer << new WorkToDo() } }
System.in.read() balancer << new WorkToDo() balancer << new WorkToDo() System.in.read() ```