My favorites | Sign in
Project Logo
                
Search
for
Updated Jul 02, 2009 by vaclav.pech
Labels: Featured
ActorsExamples  
A few examples on Actors use

Examples

Sleeping Barber

Problem description

import org.gparallelizer.actors.pooledActors.PooledActorGroup
import org.gparallelizer.actors.pooledActors.AbstractPooledActor
import org.gparallelizer.actors.Actor

final def group = new PooledActorGroup()

final def barber = group.actor {
    final def random = new Random()
    loop {
        react {message ->
            switch (message) {
                case Enter:
                    message.customer.send new Start()
                    println "Barber: Processing customer ${message.customer.name}"
                    doTheWork(random)
                    message.customer.send new Done()
                    message.reply new Next()
                    break
                case Wait:
                    println "Barber: No customers. Going to have a sleep"
                    break
            }
        }
    }
}.start()

private def doTheWork(Random random) {
    Thread.sleep(random.nextInt(10) * 1000)
}

final Actor waitingRoom

waitingRoom = group.actor {
    final int capacity = 5
    final List<Customer> waitingCustomers = []
    boolean barberAsleep = true

    loop {
        react {message ->
            switch (message) {
                case Enter:
                    if (waitingCustomers.size() == capacity) {
                        reply new Full()
                    } else {
                        waitingCustomers << message.customer
                        if (barberAsleep) {
                            assert waitingCustomers.size() == 1
                            barberAsleep = false
                            waitingRoom.send new Next()
                        }
                        else reply new Wait()
                    }
                    break
                case Next:
                    if (waitingCustomers.size()>0) {
                        def customer = waitingCustomers.remove(0)
                        barber.send new Enter(customer:customer)
                    } else {
                        barber.send new Wait()
                        barberAsleep = true
                    }
            }
        }
    }

}.start()

class Customer extends AbstractPooledActor {
    String name
    Actor localBarbers

    void act() {
        localBarbers << new Enter(customer:this)
        loop {
            react {message ->
                switch (message) {
                    case Full:
                        println "Customer: $name: The waiting room is full. I am leaving."
                        stop()
                        break
                    case Wait:
                        println "Customer: $name: I will wait."
                        break
                    case Start:
                        println "Customer: $name: I am now being served."
                        break
                    case Done:
                        println "Customer: $name: I have been served."
                        break

                }
            }
        }
    }
}

class Enter { Customer customer }
class Full {}
class Wait {}
class Next {}
class Start {}
class Done {}

new Customer(name:'Joe', localBarbers:waitingRoom).start()
new Customer(name:'Dave', localBarbers:waitingRoom).start()
new Customer(name:'Alice', localBarbers:waitingRoom).start()

System.in.read()

Dining Philosophers

Problem description

import org.gparallelizer.actors.pooledActors.AbstractPooledActor
import org.gparallelizer.actors.pooledActors.PooledActors

PooledActors.defaultPooledActorGroup.resize 5

final class Philosopher extends AbstractPooledActor {
    private Random random = new Random()

    String name
    def forks = []

    void act() {
        assert 2 == forks.size()
        loop {
            think()
            forks*.send new Take()
            react {a, b ->
                if ([a, b].any {Rejected.isCase it}) {
                    println "$name: \tOops, can't get my forks! Giving up."
                    [a, b].find {Accepted.isCase it}?.reply new Finished()
                } else {
                    eat()
                    reply new Finished()
                }
            }
        }
    }

    void think() {
        println "$name: \tI'm thinking"
        Thread.sleep random.nextInt(5000)
        println "$name: \tI'm done thinking"
    }

    void eat() {
        println "$name: \tI'm EATING"
        Thread.sleep random.nextInt(2000)
        println "$name: \tI'm done EATING"
    }
}

final class Fork extends AbstractPooledActor {

    String name
    boolean available = true

    void act() {
        loop {
            react {message ->
                switch (message) {
                    case Take:
                        if (available) {
                            available = false
                            reply new Accepted()
                        } else reply new Rejected()
                        break
                    case Finished:
                        assert !available
                        available = true
                        break
                    default: throw new IllegalStateException("Cannot process the message: $message")
                }
            }
        }
    }
}

final class Take {}
final class Accepted {}
final class Rejected {}
final class Finished {}

def forks = [
        new Fork(name:'Fork 1'),
        new Fork(name:'Fork 2'),
        new Fork(name:'Fork 3'),
        new Fork(name:'Fork 4'),
        new Fork(name:'Fork 5')
]

def philosophers = [
        new Philosopher(name:'Joe', forks:[forks[0], forks[1]]),
        new Philosopher(name:'Dave', forks:[forks[1], forks[2]]),
        new Philosopher(name:'Alice', forks:[forks[2], forks[3]]),
        new Philosopher(name:'James', forks:[forks[3], forks[4]]),
        new Philosopher(name:'Phil', forks:[forks[4], forks[0]]),
]

forks*.start()
philosophers*.start()

System.in.read()

Word sort

Given a folder name, the script will sort words in all files in the folder. The SortMaster actor creates a given number of WordSortActors, splits among them the files to sort words in and collects the results.

Inspired by Scala Concurrency blog post by Michael Galpin

//Messages
private final class FileToSort { String fileName }
private final class SortResult { String fileName; List<String> words }

//Worker actor
final class WordSortActor extends AbstractPooledActor {

    private List<String> sortedWords(String fileName) {
        parseFile(fileName).sort {it.toLowerCase()}
    }

    private List<String> parseFile(String fileName) {
        List<String> words = []
        new File(fileName).splitEachLine(' ') {words.addAll(it)}
        return words
    }

    void act() {
        loop {
            react {message ->
                switch (message) {
                    case FileToSort:
                        println "Sorting file=${message.fileName} on thread ${Thread.currentThread().name}"
                        reply new SortResult(fileName: message.fileName, words: sortedWords(message.fileName))
                }
            }
        }
    }
}

//Master actor
final class SortMaster extends AbstractPooledActor {

    String docRoot = '/'
    int numActors = 1

    List<List<String>> sorted = []
    private CountDownLatch startupLatch = new CountDownLatch(1)
    private CountDownLatch doneLatch

    private void beginSorting() {
        int cnt = sendTasksToWorkers()
        doneLatch = new CountDownLatch(cnt)
    }

    private List createWorkers() {
        return (1..numActors).collect {new WordSortActor().start()}
    }

    private int sendTasksToWorkers() {
        List<PooledActor> workers = createWorkers()
        int cnt = 0
        new File(docRoot).eachFile {
            workers[cnt % numActors] << new FileToSort(fileName: it)
            cnt += 1
        }
        return cnt
    }

    public void waitUntilDone() {
        startupLatch.await()
        doneLatch.await()
    }

    void act() {
        beginSorting()
        startupLatch.countDown()
        loop {
            react {
                switch (it) {
                    case SortResult:
                        sorted << it.words
                        doneLatch.countDown()
                        println "Received results for file=${it.fileName}"
                }
            }
        }
    }
}

//start the actors to sort words
def master = new SortMaster(docRoot: 'C:/dev/TeamCity/logs/', numActors: 5).start()
master.waitUntilDone()
println 'Done'
println master.sorted

Load Balancer

Demonstrates work balancing among adaptable set of workers. The load balancer receives tasks and queues them in a temporary task queue. When a worker finishes his assignment, it asks the load balancer for a new task.

If the load balancer doesn't have any tasks available in the task queue, the worker is stopped. If the number of tasks in the task queue exceeds certain limit, a new worker is created to increase size of the worker pool.

import org.gparallelizer.actors.Actor
import org.gparallelizer.actors.DefaultThreadActor
import org.gparallelizer.actors.pooledActors.AbstractPooledActor

final class LoadBalancer extends DefaultThreadActor {
    int workers = 0
    List taskQueue = []
    private static final QUEUE_SIZE_TRIGGER = 10

    void act() {
        def message = receive()
        switch (message) {
            case NeedMoreWork:
                if (taskQueue.size()==0) {
                    println 'No more tasks in the task queue. Terminating the worker.'
                    message.reply DemoWorker.EXIT
                    workers -= 1
                } else message.reply taskQueue.remove(0)
                break
            case WorkToDo:
                taskQueue << message
                if ((workers==0) || (taskQueue.size()>=QUEUE_SIZE_TRIGGER)) {
                    println 'Need more workers. Starting one.'
                    workers += 1
                    new DemoWorker(this).start()
                }
        }
        println "Active workers=${workers}\tTasks in queue=${taskQueue.size()}"
    }
}

final class DemoWorker extends AbstractPooledActor {
    final static Object EXIT = new Object()
    private static final Random random = new Random()

    Actor balancer

    def DemoWorker(balancer) {
        this.balancer = balancer
    }

    void act() {
        loop {
            this.balancer << new NeedMoreWork()
            react {
                switch (it) {
                    case WorkToDo:
                        processMessage(it)
                        break
                    case EXIT:stop()
                }
            }
        }

    }

    private void processMessage(message) {
        synchronized(random) {
            Thread.sleep random.nextInt(5000)
        }
    }
}
final class WorkToDo{}
final class NeedMoreWork{}

final Actor balancer = new LoadBalancer().start()

//produce tasks
for(i in 1..20) {
    Thread.sleep 100
    balancer << new WorkToDo()
}

//produce tasks in a parallel thread
Thread.start {
    for(i in 1..10) {
        Thread.sleep 1000
        balancer << new WorkToDo()
    }
}

System.in.read()
balancer << new WorkToDo()
balancer << new WorkToDo()
System.in.read()

Sign in to add a comment
Hosted by Google Code