Skip to content

Commit

Permalink
Merge pull request eclipse-vertx#1192 from eclipse/flow_control
Browse files Browse the repository at this point in the history
Event bus flow control / interceptors
  • Loading branch information
purplefox committed Nov 16, 2015
2 parents bf39131 + 8e0f2d6 commit 1fce2b1
Show file tree
Hide file tree
Showing 22 changed files with 766 additions and 218 deletions.
25 changes: 18 additions & 7 deletions src/main/asciidoc/java/eventbus.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,14 @@ The `link:../../apidocs/io/vertx/core/eventbus/Message.html#body--[body]` of the

The headers of the message are available with `link:../../apidocs/io/vertx/core/eventbus/Message.html#headers--[headers]`.

==== Replying to messages
==== Acknowledging messages / sending replies

Sometimes after you send a message you want to receive a reply from the recipient.
This is known as the *request-response pattern*.
When using `link:../../apidocs/io/vertx/core/eventbus/EventBus.html#send-java.lang.String-java.lang.Object-[send]` the event bus attempts to deliver the message to a
`link:../../apidocs/io/vertx/core/eventbus/MessageConsumer.html[MessageConsumer]` registered with the event bus.

To do this you can specify a reply handler when sending the message.
In some cases it's useful for the sender to know when the consumer has received the message and "processed" it.

When the receiver receives the message they can reply to it by calling `link:../../apidocs/io/vertx/core/eventbus/Message.html#reply-java.lang.Object-[reply]`.
To acknowledge that the message has been processed the consumer can reply to the message by calling `link:../../apidocs/io/vertx/core/eventbus/Message.html#reply-java.lang.Object-[reply]`.

When this happens it causes a reply to be sent back to the sender and the reply handler is invoked with the reply.

Expand Down Expand Up @@ -246,8 +246,19 @@ eventBus.send("news.uk.sport", "Yay! Someone kicked a ball across a patch of gra
});
----

The replies themselves can also be replied to so you can create a dialog between two different parties
consisting of multiple rounds.
The reply can contain a message body which can contain useful information.

What the "processing" actually means is application defined and depends entirely on what the message consumer does
and is not something that the Vert.x event bus itself knows or cares about.

Some examples:

* A simple message consumer which implements a service which returns the time of the day would acknowledge with a message
containing the time of day in the reply body
* A message consumer which implements a persistent queue, might acknowledge with `true` if the message was successfully
persisted in storage, or `false` if not.
* A message consumer which processes an order might acknowledge with `true` when the order has been successfully processed
so it can be deleted from the database

==== Sending with timeouts

Expand Down
16 changes: 16 additions & 0 deletions src/main/java/io/vertx/core/eventbus/BridgeInterceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.vertx.core.eventbus;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class BridgeInterceptor extends FilteringInterceptor {

public BridgeInterceptor(String startsWith) {
super(startsWith);
}

@Override
protected void handleContext(SendContext sendContext) {

}
}
20 changes: 17 additions & 3 deletions src/main/java/io/vertx/core/eventbus/EventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public interface EventBus extends Measured {
@Fluent
<T> EventBus send(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler);


/**
* Like {@link #send(String, Object)} but specifying {@code options} that can be used to configure the delivery.
*
Expand Down Expand Up @@ -236,15 +235,15 @@ public interface EventBus extends Measured {
* Unregister a default message codec.
* <p>
* @param clazz the class for which the codec was registered
* @return @return a reference to this, so the API can be used fluently
* @return a reference to this, so the API can be used fluently
*/
@GenIgnore
EventBus unregisterDefaultCodec(Class clazz);

/**
* Start the event bus. This would not normally be called in user code
*
* @param completionHandler
* @param completionHandler handler will be called when event bus is started
*/
@GenIgnore
void start(Handler<AsyncResult<Void>> completionHandler);
Expand All @@ -257,6 +256,21 @@ public interface EventBus extends Measured {
@GenIgnore
void close(Handler<AsyncResult<Void>> completionHandler);

/**
* Add an interceptor that will be called whenever a message is sent from Vert.x
*
* @param interceptor the interceptor
* @return a reference to this, so the API can be used fluently
*/
EventBus addInterceptor(Handler<SendContext> interceptor);

/**
* Remove an interceptor
*
* @param interceptor the interceptor
* @return a reference to this, so the API can be used fluently
*/
EventBus removeInterceptor(Handler<SendContext> interceptor);

}

29 changes: 29 additions & 0 deletions src/main/java/io/vertx/core/eventbus/FilteringInterceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.vertx.core.eventbus;

import io.vertx.core.Handler;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public abstract class FilteringInterceptor implements Handler<SendContext> {

private final String startsWith;

public FilteringInterceptor(String startsWith) {
this.startsWith = startsWith;
}

// TODO regex

@Override
public void handle(SendContext sendContext) {
if (sendContext.message().address().startsWith(startsWith)) {
handleContext(sendContext);
} else {
sendContext.next();
}
}

protected abstract void handleContext(SendContext sendContext);

}
14 changes: 14 additions & 0 deletions src/main/java/io/vertx/core/eventbus/MessageProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.streams.WriteStream;

Expand All @@ -30,6 +31,18 @@
@VertxGen
public interface MessageProducer<T> extends WriteStream<T> {

int DEFAULT_WRITE_QUEUE_MAX_SIZE = 1000;

/**
* Synonym for {@link #write(Object)}.
*
* @param message the message to send
* @return reference to this for fluency
*/
MessageProducer<T> send(T message);

<R> MessageProducer<T> send(T message, Handler<AsyncResult<Message<R>>> replyHandler);

@Override
MessageProducer<T> exceptionHandler(Handler<Throwable> handler);

Expand All @@ -56,4 +69,5 @@ public interface MessageProducer<T> extends WriteStream<T> {
*/
String address();

void close();
}
29 changes: 29 additions & 0 deletions src/main/java/io/vertx/core/eventbus/SendContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.vertx.core.eventbus;

import io.vertx.codegen.annotations.VertxGen;

/**
*
* Encapsulates a message being sent from Vert.x. Used with event bus interceptors
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
@VertxGen
public interface SendContext<T> {

/**
* @return The message being sent
*/
Message<T> message();

/**
* Call the next interceptor
*/
void next();

/**
*
* @return true if the message is being sent (point to point) or False if the message is being published
*/
boolean send();
}
27 changes: 0 additions & 27 deletions src/main/java/io/vertx/core/eventbus/impl/EventBusFactoryImpl.java

This file was deleted.

Loading

0 comments on commit 1fce2b1

Please sign in to comment.