|
MycilaEvent
Mycila Event, a brand-new powerful in-process Event System
Featured
IntroductionMycila Event Mycila Event is a new powerful event framework for in-memory event management. It has a lot of features similar to EventBus but is better designed, uses Java Concurrency features and has a lot of more event features than EventBus, which are really useful when you work with a complex system driven by event messaging. Sampleimport static com.mycila.event.api.topic.Topics.*;
// first create an event service
Dispatcher dispatcher = Dispatchers.synchronousSafe(ErrorHandlers.rethrowErrorsAfterPublish());
// then subscribe
TopicMatcher matcher = only("app/events/swing/button").or(topics("app/events/swing/fields/**"));
dispatcher.subscribe(matcher, String.class, new Subscriber<String>() {
public void onEvent(Event<String> event) throws Exception {
System.out.println("Received: " + event.source());
}
});
// and publish
dispatcher.publish(topic("app/events/swing/button"), "Hello !");When you subscribe, you need to give to which topic to subscribe and for which event type. DownloadMycila event is deployed in maven 2 Central Repository: http://repo2.maven.org/maven2/com/mycila/mycila-event/ and also in: http://mc-repo.googlecode.com/svn/maven2/releases/ <dependency>
<groupId>com.mycila</groupId>
<artifactId>mycila-event</artifactId>
<version>X.Y</version>
</dependency>Snapshots are available at https://mc-repo.googlecode.com/svn/maven2/snapshots/com/mycila/mycila-event/ Helps and linkUsageSubscribingSubscribing is done with the Dispatcher.subscribe method, which take the topic to subscribe to, the event type and an instance of Subscriber<T>, where T is the event type. The method receives an Event object, containing the timestamp (in nanoseconds) and the source. Example: dispatcher.subscribe(only("prog/events/a").or(matching("prog/events/b/**")), String.class, new Subscriber<String>() {
public void onEvent(Event<String> event) throws Exception {
sequence.add(event.getSource());
}
});PublishingPublishing is done by simply sending to a topic an event object. dispatcher.publish(topic("prog/events/a"), "Hello for a");Synchronous requestsAn event system is asynchronous by default, but you sometimes need to wait for an answer before proceding. This is the request/response pattern that everyone know. You can create a request and wait for its response (or wait with a timeout). The request is created through Messages.createRequest, where you can pass request parameters. The you call MessageRequest.getResponse() to obtain the response. MessageRequest<Integer> adddRequest = Messages.createRequest(new int[]{1, 2, 3, 4, 5}, "param2");
dispatcher.publish(topic("system/add"), adddRequest);
int sum = adddRequest.getResponse(1, TimeUnit.SECOND);Asynchronous requestsYou can also request in asynchronous mode by adding listeners, which will be triggered when the response will be received; MessageRequest<Integer> adddRequest = Messages.createRequest(new int[]{1, 2, 3, 4, 5}, "param2");
adddRequest.addListener(new MessageListener<Integer>() {
public void onResponse(Integer value) {
assertEquals(15, value.intValue());
}
public void onError(Throwable t) {
t.printStackTrace();
fail();
}
});
dispatcher.publish(topic("system/add"), adddRequest);Request answersTo be able to repond to an answer on a topic, you simply have to subscribe, with the specific event type MessageResponse: dispatcher.subscribe(only("system/add"), MessageResponse.class, new Subscriber<MessageResponse<Integer>>() {
public void onEvent(Event<MessageResponse<Integer>> event) throws Exception {
int[] p = (int[]) event.getSource().getParameters()[0];
System.out.printl(event.getSource().getParameters()[1]); // output the second parameter
int c = 0;
for (int i : p) c += i;
event.getSource.reply(c);
}
});The event type is a special class which enable to take the parameters and reply a response. You could also respond by an exception if an error occured in the subscriber: event.getSource().replyError(new ArithmeticException("Overflow !"));FeaturesTopics and Event typesWhen you subscribe, you subscribe in a Topic for a given event type. Event type subclasses are allowed to be received by a subscriber accepting its super-class. In example, if you subscribe to Topic.topic("buttons/ok") with event type ActionListener.class, you can publish any implementation of ActionListener and it will be received by subscribers accepting the type (and sub-types) ActionListener. In-Memory event systemMycila Event is not a JMS solution ! Like EventBus, Mycila Event resolves intra-process communication. In example, it can be used in a Swing GUI or in a complex modular framework to handle communication between plugins. Thus, Mycila Event must be fast, thread-safe and scalable. Memory managementLike EventBus, Mycila Event supports hard and weak subscriptions. A hard subscription will always remain and must be unregistered if not needed anymore. A weak subscription will be automatically removed when the subscriber is no longer in use. By default, if nothing is specified, Mycila Event uses a hard reference. this is very useful when you simply bind a listener like this: dispatcher.subscribe(matcher, String.class, new Subscriber<String>() {
public void onEvent(Event<String> event) throws Exception {
System.out.println("Received: " + event.source());
}
});Reachability control can be done by annotating the class with @Reference. In example, suppose you have a plugin class subscribing for events. You can annotate the class like this: @Reference(Reachability.WEAK)
public class MyPlugin implements Subscriber<String> {
public void onEvent(Event<String> event) throws Exception {
System.out.println("Received: " + event.source());
}
public void start() {
// start the plugin
}
}
MyPlugin pluginLoadedByAnotherSystem = ...;
dispatcher.subscribe(matcher, String.class, pluginLoadedByAnotherSystem);When registering the plugin, a weak registration will be done so that if the plugin is unloaded or not used anymore, the subscription could be removed automatically. @Reference can also be put on methods, when used with annotations. Exception handlingBy default, if a subscriber launches an exception, the exception is rethrown immediately, in the thread firering the event. You can change this behavior by providing your own ErrorHandler, or by using existing one:
To create an Dispatcher with the appropriate event handler, you just have to set the ErrorHandler instance when creating it: Dispatcher dispatcher = Dispatchers.synchronousSafe(ErrorHandlers.ignoreErrors()); Topic MatchersWhen you register a subscriber, you need to pass the type of event you want to receive and a matcher to math topics you want to listen to. TopicMatcher can be created with the Topics class. You can compose matchers.
In example, to set a catch-all subscriber, you could do: dispatcher.subscribe(Topics.any(), Object.class, new Subscriber<Object>() {
public void onEvent(Event<Object> event) throws Exception {
System.out.println("Received: " + event.source());
}
});Annotation supportMycila Event provides annotations to create publishers and subscribers decoupled from the Dispatcher service. SubscribersYou can annotate any method in your class to receive events with the @Subscribe annotation. It takes in arguments the list of topics (an Ant expression matching topics) to listen to and the type of event. These annotations must be placed on methods having one parameter: Event for listening to events. You can use the @Reference annotation to control if the listeners is weak or not. This annotation can be place on the method, or, if you have in your class several annotated methods to listen for events, you can place the @Reference annotation in the class directly. By default, a subscription has a HARD reference. So be VERY CAREFUL when you register listeners implemented by your classes to make them WEAK if they have a shorter lifecycle than the Dispatcher. Subscribe to events class MyClass1 {
@Subscribe(topics = "prog/events/a/**", eventType = String.class)
private void handle(Event<String> event) {
// do something
}
}In your code, after having created a Dispatcher, you can use the AnnotationProcessors to register annotated methods like this: MyClass1 c1 = new ... AnnotationProcessor processor = AnnotationProcessors.create(dispatcher); processor.process(c1);; PublisherPublishers can be created using the annotation @Publish. You can completely decouple your code by creating interface (or abstract classes) that will be automatically generated thanks to annotations. In example: private static interface B {
@Publish(topics = "prog/events/a")
void send(String a, int b);
}
static abstract class C {
@Publish(topics = {"prog/events/a/a1", "prog/events/a/allA"})
abstract void send(String a, int b);
}These two classes defines publishing methods. B will publish two events (a string and an integer) to one topic, and C will publish two events in two topics. To automatically generate their implementation, you can use, after having created a Dispacther: B b = annotationProcessor.proxy(B.class);
C c = annotationProcessor.proxy(C.class);
b.send("Hello for a", 1);
c.send("Hello for a1", 4);B and C are classes in your code that you can use directly to publish events. You can annotate a generated Publisher method by @Multiple. If the publishing method is given an array or collection of objects, each object will be published independently as an event. In example: abstract class MyCustomPublisher2 {
@Publish(topics = "a/topic/path")
@Multiple
abstract void send(int event1, String... otherEvents);
}
[...]
myCustomPublisher2.send(1, "each", "string", "will", "be", "an", "event")Note: you can then create abstract classes which act sa event managers: abstract methods annotated by @Publish will be generated and concrete methods annotated by @Subscribe will receive events. RequestsRequesting methods acts the same as publishers. They differ in the annotation, which provides a way to timeout and wait for the response to come back. In example, suppose you have an additioner plugin exposing its computation method to the topic system/add. You can create a subscriber like this: interface Requestor {
@Request(topic = "system/add", timeout = 1, unit = TimeUnit.SECONDS)
int addNumbers(int... p);
}
Dispatcher dispatcher = Dispatchers.synchronousSafe();
AnnotationProcessor processor = AnnotationProcessors.create(dispatcher);
Requestor req = processor.proxy(Requestor.class);
assertEquals(15, req.addNumbers(1,2,3,4,5));Given the nature of a method call, it is obvious that the call is synchronous and will wait either indefinitely for a response or only with the given time. Answering requestsThere is two way for answering requests: as we seen, we have to register a simple subscriber. It can be done like this: static class DU {
@Subscribe(topics = "system/du", eventType = MessageResponse.class)
void duRequest(Event<MessageResponse<Integer>> event) {
String folder = (String) event.getSource().getParameters()[0];
System.out.println("du request on folder " + folder);
// call du -h <folder>
if ("notFound".equals(folder))
event.getSource().replyError(new FileNotFoundException("du did not found folder " + folder));
else
event.getSource().reply(40);
}
}
Dispatcher dispatcher = Dispatchers.synchronousSafe();
AnnotationProcessor processor = AnnotationProcessors.create(dispatcher);
DU du = new DU();
processor.process(du);Hopefully, there is a better way. The request sends as parameter a String, and the subscriber replies with an integer. So what if we can simply call a method ? This can be done like this: static class DU {
@Answers(topics = "system/du")
int getSize(String folder) throws FileNotFoundException {
System.out.println("du request on folder " + folder);
// call du -h <folder>
if ("notFound".equals(folder))
throw new FileNotFoundException("du did not found folder " + folder);
return 40;
}
}
Dispatcher dispatcher = Dispatchers.synchronousSafe();
AnnotationProcessor processor = AnnotationProcessors.create(dispatcher);
DU du = new DU();
processor.process(du);The method output (or the exception) will then be used as the reply if an request is made to the topic system/du. Event dispatching strategiesThere are several strategies regarding about how you want the order of events and the order of listeners be respected and whether or not you have multiple threads publishing events. Synchronous Safe Dispatching
This strategy guarantees the order of listeners called and that only one thread will hit the listeners at a time. Thus, your listeners don't need to be thread-safe. The publish method thus block until the previous publishing is finished. This behavior allows you to have stateful non thread-safe subscribers. When multiple threads are publishing, this strategy is slower. Dispatcher dispatcher = Dispatchers.synchronousSafe(); Synchronous Unsafe Dispatching
This strategy guarantees the order of listeners called. The publish method only block for the current thread, meaning that a thread can be publishing while another thread also starting publishing an event. Thus, your subscribers can be hit at the same time by two or more threads. Your subscribers need to be stateless and thread-safe. Multiple threads can publish at the same time. Dispatcher dispatcher = Dispatchers.synchronousUnsafe(); Asynchronous Safe Dispatching
This strategy guarantees the order of listeners called and the order of published events. The publish method does not block and the publisher thread immediately returns. The event is queued and wait for its turn to be processed by the background thread. This strategy is useful when your publishers must execute as fast as possible, but be careful to also have fast subscribers to not fill the queue. This behavior allows you to have stateful non thread-safe subscribers since only one thread will dequeue and fire events. Since this strategy uses an unbounded queue, be careful to have fast subscribers and to not publish events more than your subscribers can consume. Dispatcher dispatcher = Dispatchers.asynchronousSafe(); Asynchronous Unsafe Dispatching
This strategy guarantees the order of listeners called, but not the order of event. Basically, each call to publish will return immediately. All events in the queue are handled by a thread pool. This strategy is useful when your publishers must execute as fast as possible, and event publishing needs to be processed quickly, but by respecting listener order. This dispatcher can be seen as a concurrent event dispatcher, respecting listener order. Since this strategy uses an unbounded queue, be careful to have fast subscribers and to not publish events more than your subscribers can consume. Your subscribers need to be stateless and thread-safe. Multiple threads can send events at the same time. Dispatcher dispatcher = Dispatchers.asynchronousUnsafe(); Broadcast Ordered Dispatching
This strategy guarantees the order of events but calls listeners unordered. The goal of a broadcasting is to reach as fast as possible each listeners in the smallest amount of time. A thread-pool is used to handle subscriber's execution. This type of dispatching is really useful when you don't care about ordering and want to publish fast and want your subscribers to be called as fast as possible. Note that when a thread publishes an event, it is enqueued. A thread is used to enqueue events one per one and fire this event concurrently to all subscribers. The publish method returns immediately, but for another event to be processed, all concurrent firing of the previous event must have finished. This guarantees that all subscribers will be called concurrently, but they will all receive the events in the same order. Dispatcher dispatcher = Dispatchers.broadcastOrdered(); Broadcast Unordered Dispatching
This type of dispatching is really useful when you don't care about any ordering. Since the publish method does not block, any thread will be able to publish events really fast and a thread-pool is used to process them all, unordered. Thus, subscribers taking a long time to execute don't affect publishing of other events to other subscribers. Dispatcher dispatcher = Dispatchers.broadcastUnordered(); Custom strategyYou can easily implelement and control your own dispatching strategy: simply look at the source code of Dispatchers to have more example. You can create a custom dispacther like this: Dispatcher dispatcher = Dispatchers.custom(errorHandlerProvider, publishingExecutor, subscriberExecutor); Executors are implementations of java.util.concurrent.Executor. The first one control the concurrency of the whole publishing process and the second one control the concurrency for calling a subscriber. The class com.mycila.event.util.Executors has two flavors that can help you for basic cases:
Summary and StatisticsUsing this test: http://mycila.googlecode.com/svn/mycila-event/trunk/src/test/java/com/mycila/event/DefaultDispatcherTest.java We have the following time for each strategy on my DELL 6400 laptop (Intel Centrino Duo), 5 years old. RUN1: For 1000 publishing threads sending each 1000 events to a random topic between 5 topics, and each topic being watched by two consumers, pool core size has been set to 1000 threads. RUN2: For 1000 publishing threads sending each 1000 events to a random topic between 5 topics, and each topic being watched by two consumers, pool core size has been set to 8 threads (less context switches). These statistics are for a worst case meaning that:
IntegrationGoogle Guicehttp://code.google.com/p/google-guice/ Mycila Event can be used without any IOC. But thanks to the powerful annotation support and injection listeners of Google Guice, this dependency injection library integrates very well with Mycila Event. Binding generated publishers public final class MyModule implements Module {
@Override
public void configure(Binder binder) {
MycilaEventGuice.bindPublisher(binder, MyCustomPublisher.class).in(Singleton.class);
MycilaEventGuice.bindPublisher(binder, MyCustomPublisher2.class).in(Singleton.class);
MycilaEventGuice.bindPublisher(binder, MyCustomPublisher3.class).in(Singleton.class);
}
@Reference(Reachability.WEAK)
static interface MyCustomPublisher {
@Publish(topics = "a/topic/path")
void send(String... messages);
}
static abstract class MyCustomPublisher2 {
@Publish(topics = "a/topic/path")
@Multiple
abstract void send(int event1, String... otherEvents);
}
static abstract class MyCustomPublisher3 {
@Publish(topics = "a/topic/path")
@Multiple
abstract void send(int event1, Iterable<String> events);
}
}
injector.getInstance(MyCustomPublisher.class).send("A", "cut", "message", "containing", "bad words");
injector.getInstance(MyCustomPublisher2.class).send(1, "A", "cut", "message", "containing", "bad words", "in varg");
injector.getInstance(MyCustomPublisher3.class).send(1, Arrays.asList("A", "cut", "message", "containing", "bad words", "in list"));Automatically inject publishers and create subscriptions Suppose you have a class like this: public final class MyImpl implements MyClass {
Publisher<String> publisher;
@Subscribe(topics = "a/topic/path", eventType = String.class)
void subscribe(Event<String> event) {
System.out.println("(subscribe) Got: " + event);
}
@Subscribe(topics = "a/topic/path", eventType = String[].class)
void subscribeToList(Event<String[]> event) {
System.out.println("(subscribeToList) Got: " + Arrays.toString(event.source()));
}
@Subscribe(topics = "a/topic/path", eventType = Integer.class)
void subscribeToInts(Event<Integer> event) {
System.out.println("(subscribeToInts) Got: " + event.source());
}
@Publish(topics = "a/topic/path")
void publisher(Publisher<String> publisher) {
System.out.println("Publisher injected");
publisher.publish("Hello from publisher !");
this.publisher = publisher;
}
}When configuring Guice, simply put MycileEventModule like this: Injector injector = Guice.createInjector(new MycilaEventGuiceModule(), new AbstractModule() {
@Override
public void configure(Binder binder) {
binder.bind(MyClass.class).to(MyImpl.class);
}
});All the created instances by Guice will automatically discover MycilaEvent annotations and subscribing methods will be registered and publishers will be injected. Spring integration//TODO Apache Camel integration//TODO REST integration//TODO JMS integration//TODO Esper integrationhttp://svn.codehaus.org/esper/ //TODO | ||||||||||||||||||||||||||||
Nice! I like the simplicity of the design. Though at first blush it seems like a simpler form of Camel, the appeal of this lies in its focus on one thing.
Though I'm curious - your tests on the Laptop had 1000 threads?! Why? Just the context switching...
Yeah that was just for fun ;) I've tries with 2-3 threads since it is a dual core, then with 1000 threads. But it didn't changed the results so much... To have a better benchmark, I would need to have access to better infrastructure since my 5 years-old laptop doesn't make the scalability quite clear (that you can get more processing after adding more hardware).
What are the semantics of a synchronous request-response where there are potentially multiple listeners? Is it a case of first-registered first-served? Once a response has been set on the MessageResponse? event by one listener, what happens to the other listeners? Are they still serviced? Does having more than one listener for a synchronous request-response exchange even make sense?
Thanks for your feedback!
You can have several subscribers to a topic so you can have several subscribers that will be aware of the request. But only one can respond. If another one also reponds, an exception will be thrown. So this just allows to watch for requests.
Having multiple listeners for a response can make sense in exemple in a UI application where you would liek to register multiple components to be aware of a response.
Nice framework. I'm having lots of headache making it work with Guice 3.0 + MycilaEvent 4.2.ga via annotations. It seems some part of the api have changed since you published the usage guidelines above. Is annotationprocessor step still required when using guice?