Skip to content

Commit

Permalink
Merge remote-tracking branch 'Tesco/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
ramk committed Dec 10, 2016
2 parents 30c62ec + a514744 commit bf21b0d
Show file tree
Hide file tree
Showing 74 changed files with 2,318 additions and 898 deletions.
4 changes: 2 additions & 2 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ pieces of technology - a messaging system and a database.
## Introducing mewbase

Mewbase provides an engine that functions as a high performance streaming event store but also
allows you to run persistent *functions* in the engine that listen to raw events and maintain multiple *views* of
allows you to run persistent *functions* in the engine that listen to raw events and maintain multiple *projections* of
the event data.

The views can then be queried in similar way to how you would with a document or graph database.
The projections can then be queried in similar way to how you would with a document or graph database.

So you can think of mewbase as combining the following:

Expand Down
12 changes: 10 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,16 @@
<artifactId>vertx-core</artifactId>
<version>3.3.3</version>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-auth-common</artifactId>
<version>3.3.3</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-auth-shiro</artifactId>
<version>3.3.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Expand Down Expand Up @@ -72,7 +81,6 @@
<scope>test</scope>
</dependency>


</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,19 @@ public static void main(String[] args) {
private void example() throws Exception {

// Setup and start a server
ServerOptions options = new ServerOptions().setChannels(new String[]{"orders"});
ServerOptions options =
new ServerOptions().setChannels(new String[]{"orders"}).setBinders(new String[]{"baskets"});
Server server = Server.newServer(options);
server.start().get();

// Install a function that will respond to add_item events and increase/decrease the quantity of the item in the basket
server.installFunction(
"basket_add", // function name
"orders", // channel name
ev -> ev.getString("eventType").equals("add_item"), // event filter
"baskets", // binder name
ev -> ev.getString("basketID"), // document id selector; how to obtain the doc id from the event bson
(basket, del) -> // function to run
BsonPath.add(basket, del.event().getInteger("quantity"), "products", del.event().getString("productID"))
);
// Register a projection that will respond to add_item events and increase/decrease the quantity of the item in the basket
server.buildProjection("maintain_basket") // projection name
.projecting("orders") // channel name
.filteredBy(ev -> ev.getString("eventType").equals("add_item")) // event filter
.onto("baskets") // binder name
.identifiedBy(ev -> ev.getString("basketID")) // document id selector; how to obtain the doc id from the event bson
.as((basket, del) -> // projection function
BsonPath.add(basket, del.event().getInteger("quantity"), "products", del.event().getString("productID"))).register();

// Create a client
Client client = Client.newClient(new ClientOptions());
Expand All @@ -59,7 +58,6 @@ private void example() throws Exception {
// Now get the basket
BsonObject basket = client.findByID("baskets", "basket1111").get();


System.out.println("Basket is: " + basket);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ private void example() throws Exception {
// Subscribe to a channel
SubDescriptor descriptor = new SubDescriptor().setChannel("orders");
client.subscribe(descriptor, del -> {
System.out.println("Received event: " + del.event().getString("foo"));
System.
out.println("Received event: " + del.event().getString("foo"));
});

// Publish to the channel
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/com/tesco/mewbase/auth/MewbaseAuthProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.tesco.mewbase.auth;

import com.tesco.mewbase.bson.BsonObject;

import java.util.concurrent.CompletableFuture;

/**
*
* User-facing interface for authenticating users.
*
*/
public interface MewbaseAuthProvider {

/**
* Authenticate a user.
* <p>
* The first argument is a Bson object containing information for authenticating the user. What this actually contains
* depends on the specific implementation. In the case of a simple username/password based
* authentication it is likely to contain a JSON object with the following structure:
* <pre>
* {
* "username": "tim",
* "password": "mypassword"
* }
* </pre>
*
* @param authInfo the auth information
* @return a CompletableFuture containing a MewbaseUser where authorization operations can be performed
* on it
*/
CompletableFuture<MewbaseUser> authenticate(BsonObject authInfo);
}
8 changes: 8 additions & 0 deletions src/main/java/com/tesco/mewbase/auth/MewbaseUser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.tesco.mewbase.auth;

/**
* Contains all authorization info in order to perform operations
* in the Mewbase code.
*/
public interface MewbaseUser {
}
6 changes: 6 additions & 0 deletions src/main/java/com/tesco/mewbase/auth/impl/DummyUser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.tesco.mewbase.auth.impl;

import com.tesco.mewbase.auth.MewbaseUser;

public class DummyUser implements MewbaseUser {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.tesco.mewbase.auth.impl;

import com.tesco.mewbase.auth.MewbaseAuthProvider;
import com.tesco.mewbase.auth.MewbaseUser;
import com.tesco.mewbase.bson.BsonObject;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.AuthProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;

/**
*
* This class serves as an adapter for the {@link io.vertx.ext.auth.AuthProvider}
*
* It will container a {@link io.vertx.ext.auth.AuthProvider} attribute, which is then used
* in the authenticate method.
*
* For the authentication information, the attributes just need to be inserted in the BsonObject.
*
* The mapping to {@link io.vertx.core.json.JsonObject} is done inside the authenticate method.
*
*/
public class MewbaseVertxAuthProvider implements MewbaseAuthProvider {

private final static Logger log = LoggerFactory.getLogger(MewbaseVertxAuthProvider.class);

private AuthProvider authProvider;

public MewbaseVertxAuthProvider(AuthProvider authProvider) {
this.authProvider = authProvider;
}

@Override
public CompletableFuture<MewbaseUser> authenticate(BsonObject authInfo) {
CompletableFuture<MewbaseUser> cf = new CompletableFuture<>();

JsonObject jsonAuthInfo = new JsonObject();
authInfo.stream().forEach(entry -> {
jsonAuthInfo.put(entry.getKey(), entry.getValue());
});

authProvider.authenticate(jsonAuthInfo, vertxRes -> {
if (vertxRes.succeeded()) {
cf.complete(new VertxUser(vertxRes.result()));
} else {
cf.completeExceptionally(vertxRes.cause());
}
});

return cf;
}
}
18 changes: 18 additions & 0 deletions src/main/java/com/tesco/mewbase/auth/impl/NoAuthAuthProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.tesco.mewbase.auth.impl;

import com.tesco.mewbase.auth.MewbaseAuthProvider;
import com.tesco.mewbase.auth.MewbaseUser;
import com.tesco.mewbase.bson.BsonObject;

import java.util.concurrent.CompletableFuture;

/**
* The default auth provider implementation that does not enforce any authentication
*/
public class NoAuthAuthProvider implements MewbaseAuthProvider {

@Override
public CompletableFuture<MewbaseUser> authenticate(BsonObject authInfo) {
return CompletableFuture.completedFuture(new DummyUser());
}
}
13 changes: 13 additions & 0 deletions src/main/java/com/tesco/mewbase/auth/impl/VertxUser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.tesco.mewbase.auth.impl;

import com.tesco.mewbase.auth.MewbaseUser;
import io.vertx.ext.auth.User;

public class VertxUser implements MewbaseUser {

private User user;

public VertxUser(User user) {
this.user = user;
}
}
11 changes: 5 additions & 6 deletions src/main/java/com/tesco/mewbase/bson/Bson.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.tesco.mewbase.client.MewException;
import de.undercouch.bson4jackson.BsonFactory;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.EncodeException;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -56,20 +55,20 @@ public class Bson {
mapper.registerModule(module);
}

public static void encode(Object obj, OutputStream outputStream) throws EncodeException {
public static void encode(Object obj, OutputStream outputStream) {
try {
mapper.writeValue(outputStream, obj);
} catch (Exception e) {
throw new EncodeException("Failed to encode as BSON: " + e.getMessage());
throw new MewException(e);
}
}


public static <T> T decodeValue(InputStream inputStream, Class<T> clazz) throws DecodeException {
public static <T> T decodeValue(InputStream inputStream, Class<T> clazz) {
try {
return mapper.readValue(inputStream, clazz);
} catch (Exception e) {
throw new DecodeException("Failed to decode BSON:" + e.getMessage());
throw new MewException(e);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/tesco/mewbase/bson/BsonObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,8 @@ protected void encodeToString(StringBuilder sb) {
sb.append("\"");
sb.append(entry.getValue().toString());
sb.append("\"");
} else if (entry.getValue() == null) {
sb.append("<null>");
} else {
sb.append(entry.getValue().toString());
}
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/tesco/mewbase/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ static Client newClient(Vertx vertx, ClientOptions options) {
return factory.newClient(vertx, options);
}

// Error codes

int ERR_AUTHENTICATION_FAILED = 1;
int ERR_NOT_AUTHORISED = 2;
int ERR_NO_SUCH_CHANNEL = 3;
int ERR_FAILED_TO_PERSIST = 4;

ClientFactory factory = ServiceHelper.loadFactory(ClientFactory.class);

CompletableFuture<BsonObject> findByID(String binderName, String id);
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/tesco/mewbase/client/ClientOptions.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.tesco.mewbase.client;

import com.tesco.mewbase.bson.BsonObject;
import io.vertx.core.net.NetClientOptions;

/**
Expand All @@ -13,6 +14,7 @@ public class ClientOptions {
private String host = DEFAULT_HOST;
private int port = DEFAULT_PORT;
private NetClientOptions netClientOptions = new NetClientOptions();
private BsonObject authInfo;

public String getHost() {
return host;
Expand Down Expand Up @@ -41,6 +43,16 @@ public ClientOptions setNetClientOptions(NetClientOptions netClientOptions) {
return this;
}


public BsonObject getAuthInfo() {
return authInfo;
}

public ClientOptions setAuthInfo(BsonObject authInfo) {
this.authInfo = authInfo;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -69,4 +81,5 @@ public String toString() {
", port=" + port +
'}';
}

}
16 changes: 10 additions & 6 deletions src/main/java/com/tesco/mewbase/client/MewException.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,31 @@
*/
public class MewException extends RuntimeException {

private final String errorCode;
private final int errorCode;

public MewException(Exception e) {
this(null, e, null);
this(null, e, 0);
}

public MewException(String message) {
this(message, (String)null);
this(message, 0);
}

public MewException(String message, Throwable cause, String errorCode) {
public MewException(String message, Throwable cause, int errorCode) {
super(message, cause);
this.errorCode = errorCode;
}

public MewException(String message, Throwable cause) {
this(message, cause, null);
this(message, cause, 0);
}

public MewException(String message, String errorCode) {
public MewException(String message, int errorCode) {
super(message);
this.errorCode = errorCode;
}

public int getErrorCode() {
return errorCode;
}
}
8 changes: 8 additions & 0 deletions src/main/java/com/tesco/mewbase/client/Subscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,16 @@ public interface Subscription {

// TODO return CompletableFuture<Void> ?

/**
* Unsubscribe a durable subscription
*/
void unsubscribe();

/**
* Close the subscription
*/
void close();

// Blocking yuck!

BsonObject receive(long timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
*/
public class ClientDeliveryImpl extends DeliveryImpl implements ClientDelivery {

private final SubscriptionImpl sub;
private final int sizeBytes;
protected final SubscriptionImpl sub;
protected final int sizeBytes;

public ClientDeliveryImpl(String channel, long timestamp, long sequenceNumber, BsonObject event, SubscriptionImpl sub, int sizeBytes) {
super(channel, timestamp, sequenceNumber, event);
Expand All @@ -21,7 +21,7 @@ public ClientDeliveryImpl(String channel, long timestamp, long sequenceNumber, B

@Override
public void acknowledge() {
sub.acknowledge(sizeBytes);
sub.acknowledge(channelPos, sizeBytes);
}

@Override
Expand Down
Loading

1 comment on commit bf21b0d

@rammygit
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging

Please sign in to comment.