Export to GitHub

gparallelizer - ActorsExamples.wiki


Examples

  • Sleeping Barber
  • Dining Philosophers
  • Word Sort
  • Load Balancer

Sleeping Barber

Problem description

``` import org.gparallelizer.actors.pooledActors.PooledActorGroup import org.gparallelizer.actors.pooledActors.AbstractPooledActor import org.gparallelizer.actors.Actor

final def group = new PooledActorGroup()

final def barber = group.actor { final def random = new Random() loop { react {message -> switch (message) { case Enter: message.customer.send new Start() println "Barber: Processing customer ${message.customer.name}" doTheWork(random) message.customer.send new Done() message.reply new Next() break case Wait: println "Barber: No customers. Going to have a sleep" break } } } }.start()

private def doTheWork(Random random) { Thread.sleep(random.nextInt(10) * 1000) }

final Actor waitingRoom

waitingRoom = group.actor { final int capacity = 5 final List waitingCustomers = [] boolean barberAsleep = true

loop {
    react {message ->
        switch (message) {
            case Enter:
                if (waitingCustomers.size() == capacity) {
                    reply new Full()
                } else {
                    waitingCustomers << message.customer
                    if (barberAsleep) {
                        assert waitingCustomers.size() == 1
                        barberAsleep = false
                        waitingRoom.send new Next()
                    }
                    else reply new Wait()
                }
                break
            case Next:
                if (waitingCustomers.size()>0) {
                    def customer = waitingCustomers.remove(0)
                    barber.send new Enter(customer:customer)
                } else {
                    barber.send new Wait()
                    barberAsleep = true
                }
        }
    }
}

}.start()

class Customer extends AbstractPooledActor { String name Actor localBarbers

void act() {
    localBarbers << new Enter(customer:this)
    loop {
        react {message ->
            switch (message) {
                case Full:
                    println "Customer: $name: The waiting room is full. I am leaving."
                    stop()
                    break
                case Wait:
                    println "Customer: $name: I will wait."
                    break
                case Start:
                    println "Customer: $name: I am now being served."
                    break
                case Done:
                    println "Customer: $name: I have been served."
                    break

            }
        }
    }
}

}

class Enter { Customer customer } class Full {} class Wait {} class Next {} class Start {} class Done {}

new Customer(name:'Joe', localBarbers:waitingRoom).start() new Customer(name:'Dave', localBarbers:waitingRoom).start() new Customer(name:'Alice', localBarbers:waitingRoom).start()

System.in.read() ```

Dining Philosophers

Problem description

``` import org.gparallelizer.actors.pooledActors.AbstractPooledActor import org.gparallelizer.actors.pooledActors.PooledActors

PooledActors.defaultPooledActorGroup.resize 5

final class Philosopher extends AbstractPooledActor { private Random random = new Random()

String name
def forks = []

void act() {
    assert 2 == forks.size()
    loop {
        think()
        forks*.send new Take()
        react {a, b ->
            if ([a, b].any {Rejected.isCase it}) {
                println "$name: \tOops, can't get my forks! Giving up."
                [a, b].find {Accepted.isCase it}?.reply new Finished()
            } else {
                eat()
                reply new Finished()
            }
        }
    }
}

void think() {
    println "$name: \tI'm thinking"
    Thread.sleep random.nextInt(5000)
    println "$name: \tI'm done thinking"
}

void eat() {
    println "$name: \tI'm EATING"
    Thread.sleep random.nextInt(2000)
    println "$name: \tI'm done EATING"
}

}

final class Fork extends AbstractPooledActor {

String name
boolean available = true

void act() {
    loop {
        react {message ->
            switch (message) {
                case Take:
                    if (available) {
                        available = false
                        reply new Accepted()
                    } else reply new Rejected()
                    break
                case Finished:
                    assert !available
                    available = true
                    break
                default: throw new IllegalStateException("Cannot process the message: $message")
            }
        }
    }
}

}

final class Take {} final class Accepted {} final class Rejected {} final class Finished {}

def forks = [ new Fork(name:'Fork 1'), new Fork(name:'Fork 2'), new Fork(name:'Fork 3'), new Fork(name:'Fork 4'), new Fork(name:'Fork 5') ]

def philosophers = [ new Philosopher(name:'Joe', forks:[forks[0], forks[1]]), new Philosopher(name:'Dave', forks:[forks[1], forks[2]]), new Philosopher(name:'Alice', forks:[forks[2], forks[3]]), new Philosopher(name:'James', forks:[forks[3], forks[4]]), new Philosopher(name:'Phil', forks:[forks[4], forks[0]]), ]

forks*.start() philosophers*.start()

System.in.read() ```

Word sort

Given a folder name, the script will sort words in all files in the folder. The SortMaster actor creates a given number of WordSortActors, splits among them the files to sort words in and collects the results.

Inspired by Scala Concurrency blog post by Michael Galpin

``` //Messages private final class FileToSort { String fileName } private final class SortResult { String fileName; List words }

//Worker actor final class WordSortActor extends AbstractPooledActor {

private List<String> sortedWords(String fileName) {
    parseFile(fileName).sort {it.toLowerCase()}
}

private List<String> parseFile(String fileName) {
    List<String> words = []
    new File(fileName).splitEachLine(' ') {words.addAll(it)}
    return words
}

void act() {
    loop {
        react {message ->
            switch (message) {
                case FileToSort:
                    println "Sorting file=${message.fileName} on thread ${Thread.currentThread().name}"
                    reply new SortResult(fileName: message.fileName, words: sortedWords(message.fileName))
            }
        }
    }
}

}

//Master actor final class SortMaster extends AbstractPooledActor {

String docRoot = '/'
int numActors = 1

List<List<String>> sorted = []
private CountDownLatch startupLatch = new CountDownLatch(1)
private CountDownLatch doneLatch

private void beginSorting() {
    int cnt = sendTasksToWorkers()
    doneLatch = new CountDownLatch(cnt)
}

private List createWorkers() {
    return (1..numActors).collect {new WordSortActor().start()}
}

private int sendTasksToWorkers() {
    List<PooledActor> workers = createWorkers()
    int cnt = 0
    new File(docRoot).eachFile {
        workers[cnt % numActors] << new FileToSort(fileName: it)
        cnt += 1
    }
    return cnt
}

public void waitUntilDone() {
    startupLatch.await()
    doneLatch.await()
}

void act() {
    beginSorting()
    startupLatch.countDown()
    loop {
        react {
            switch (it) {
                case SortResult:
                    sorted << it.words
                    doneLatch.countDown()
                    println "Received results for file=${it.fileName}"
            }
        }
    }
}

}

//start the actors to sort words def master = new SortMaster(docRoot: 'C:/dev/TeamCity/logs/', numActors: 5).start() master.waitUntilDone() println 'Done' println master.sorted ```

Load Balancer

Demonstrates work balancing among adaptable set of workers. The load balancer receives tasks and queues them in a temporary task queue. When a worker finishes his assignment, it asks the load balancer for a new task.

If the load balancer doesn't have any tasks available in the task queue, the worker is stopped. If the number of tasks in the task queue exceeds certain limit, a new worker is created to increase size of the worker pool.

``` import org.gparallelizer.actors.Actor import org.gparallelizer.actors.DefaultThreadActor import org.gparallelizer.actors.pooledActors.AbstractPooledActor

final class LoadBalancer extends DefaultThreadActor { int workers = 0 List taskQueue = [] private static final QUEUE_SIZE_TRIGGER = 10

void act() {
    def message = receive()
    switch (message) {
        case NeedMoreWork:
            if (taskQueue.size()==0) {
                println 'No more tasks in the task queue. Terminating the worker.'
                message.reply DemoWorker.EXIT
                workers -= 1
            } else message.reply taskQueue.remove(0)
            break
        case WorkToDo:
            taskQueue << message
            if ((workers==0) || (taskQueue.size()>=QUEUE_SIZE_TRIGGER)) {
                println 'Need more workers. Starting one.'
                workers += 1
                new DemoWorker(this).start()
            }
    }
    println "Active workers=${workers}\tTasks in queue=${taskQueue.size()}"
}

}

final class DemoWorker extends AbstractPooledActor { final static Object EXIT = new Object() private static final Random random = new Random()

Actor balancer

def DemoWorker(balancer) {
    this.balancer = balancer
}

void act() {
    loop {
        this.balancer << new NeedMoreWork()
        react {
            switch (it) {
                case WorkToDo:
                    processMessage(it)
                    break
                case EXIT:stop()
            }
        }
    }

}

private void processMessage(message) {
    synchronized(random) {
        Thread.sleep random.nextInt(5000)
    }
}

} final class WorkToDo{} final class NeedMoreWork{}

final Actor balancer = new LoadBalancer().start()

//produce tasks for(i in 1..20) { Thread.sleep 100 balancer << new WorkToDo() }

//produce tasks in a parallel thread Thread.start { for(i in 1..10) { Thread.sleep 1000 balancer << new WorkToDo() } }

System.in.read() balancer << new WorkToDo() balancer << new WorkToDo() System.in.read() ```