Skip to content

Commit

Permalink
exit after resolving multiples futures, and then exit (crossbario#227)
Browse files Browse the repository at this point in the history
* exit after resolving multiples futures, and then exit

* cleanup
  • Loading branch information
oberstet authored Jun 30, 2017
1 parent 96dfa30 commit 96f8c4b
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,96 +192,112 @@ public void onJoinHandler3(Session session, SessionDetails details) {
public void onJoinHandler4(Session session, SessionDetails details) {
System.out.println("onJoinHandler4 fired");

// no result mapping:

CompletableFuture<CallResult> f1 = mSession.call("com.example.get_person", null, null, null);

f1.thenAccept(result -> {
System.out.println("get_person() [untyped]: " + result.results.get(0));
});
f1.exceptionally(throwable -> {
throwable.printStackTrace();
return null;
});

// POJO typed result mappings:

if (true) {
TypeReference<Person> resultType = new TypeReference<Person>() {};

CompletableFuture<Person> f2 = mSession.call("com.example.get_person", null, null, resultType, null);

f2.thenAcceptAsync(person -> {
System.out.println("get_person() [typed]: " + person.firstname + " " + person.lastname);
}, mExecutor);

f2.exceptionally(throwable -> {
throwable.printStackTrace();
return null;
});
}

if (true) {
TypeReference<List<Person>> resultType = new TypeReference<List<Person>>() {};

CompletableFuture<List<Person>> f2 = mSession.call("com.example.get_all_persons", null, null, resultType, null);

f2.thenAcceptAsync(persons -> {
System.out.println("get_all_persons() [typed]:");
persons.forEach(person -> {
System.out.println(person.firstname + " " + person.lastname);
});
}, mExecutor);

f2.exceptionally(throwable -> {
throwable.printStackTrace();
return null;
});
}

if (true) {
TypeReference<List<Person>> resultType = new TypeReference<List<Person>>() {};

List<Object> args = new ArrayList<>();
args.add("development");

CompletableFuture<List<Person>> f2 = mSession.call("com.example.get_persons_by_department", args, null, resultType, null);

f2.thenAcceptAsync(persons -> {
System.out.println("get_persons_by_department('development') [typed]:");
persons.forEach(person -> {
System.out.println(person.firstname + " " + person.lastname);
});
}, mExecutor);

f2.exceptionally(throwable -> {
throwable.printStackTrace();
return null;
});
}

if (true) {
TypeReference<Map<String, List<Person>>> resultType = new TypeReference<Map<String, List<Person>>>() {};

CompletableFuture<Map<String, List<Person>>> f2 = mSession.call("com.example.get_persons_by_department", null, null, resultType, null);

f2.thenAcceptAsync(persons_by_department -> {
System.out.println("get_persons_by_department() [typed]:");

persons_by_department.forEach((department, persons) -> {
System.out.println("\ndepartment '" + department + "':");

persons.forEach(person -> {
System.out.println(" " + person.firstname + " " + person.lastname);
});
});
}, mExecutor);

f2.exceptionally(throwable -> {
throwable.printStackTrace();
return null;
});
}
// call a remote procedure that returns a Person
CompletableFuture<Void> f1 =
mSession.call("com.example.get_person", null, null, new TypeReference<Person>() {}, null)
.handleAsync(
(person, throwable) -> {
if (throwable != null) {
System.out.println("get_person() ERROR: " + throwable.getMessage());
//throwable.printStackTrace();
} else {
System.out.println("get_person() [typed]: " + person.firstname + " " + person.lastname + " (" + person.department + ")");
}
return null;
}, mExecutor
)
;

// call a remote procedure that returns a Person .. slowly (3 secs delay)
CompletableFuture<Void> f2 =
mSession.call("com.example.get_person_delayed", null, null, new TypeReference<Person>() {}, null)
.handleAsync(
(person, throwable) -> {
if (throwable != null) {
System.out.println("get_person_delayed() ERROR: " + throwable.getMessage());
//throwable.printStackTrace();
} else {
System.out.println("get_person_delayed() [typed]: " + person.firstname + " " + person.lastname + " (" + person.department + ")");
}
return null;
}, mExecutor
)
;

// call a remote procedure that returns a List<Person>
CompletableFuture<Void> f3 =
mSession.call("com.example.get_all_persons", null, null, new TypeReference<List<Person>>() {}, null)
.handleAsync(
(persons, throwable) -> {
if (throwable != null) {
System.out.println("get_all_persons() ERROR: " + throwable.getMessage());
//throwable.printStackTrace();
} else {
System.out.println("get_all_persons() [typed]:");
persons.forEach(person -> {
System.out.println(person.firstname + " " + person.lastname + " (" + person.department + ")");
});
}
return null;
}, mExecutor
)
;

// call a remote procedure that returns a List<Person>
List<Object> args = new ArrayList<>();
args.add("development");

CompletableFuture<Void> f4 =
mSession.call("com.example.get_persons_by_department", args, null, new TypeReference<List<Person>>() {}, null)
.handleAsync(
(persons, throwable) -> {
if (throwable != null) {
System.out.println("get_persons_by_department() ERROR: " + throwable.getMessage());
//throwable.printStackTrace();
} else {
System.out.println("get_persons_by_department() [typed]:");
persons.forEach(person -> {
System.out.println(person.firstname + " " + person.lastname + " (" + person.department + ")");
});
}
return null;
}, mExecutor
)
;

// call a remote procedure that returns a Map<String, List<Person>>
CompletableFuture<Void> f5 =
mSession.call("com.example.get_persons_by_department", null, null, new TypeReference<Map<String, List<Person>>>() {}, null)
.handleAsync(
(persons_by_department, throwable) -> {
if (throwable != null) {
System.out.println("get_persons_by_department() ERROR: " + throwable.getMessage());
//throwable.printStackTrace();
} else {

System.out.println("get_persons_by_department() [typed]:");

persons_by_department.forEach((department, persons) -> {
System.out.println("\ndepartment '" + department + "':");

persons.forEach(person -> {
System.out.println(" " + person.firstname + " " + person.lastname);
});
});
}
return null;
}, mExecutor
)
;

CompletableFuture.allOf(f1, f2, f3, f4, f5)
.thenRunAsync(
() -> {
System.out.println("all done!");
mSession.leave("wamp.close.normal", "all done!");
}, mExecutor
)
;
}


Expand All @@ -306,15 +322,18 @@ private Void onCounter(List<Object> args, Map<String, Object> kwargs) {
static class Person {
public String firstname;
public String lastname;
public String department;

public Person() {
this.firstname = "unknown";
this.lastname = "unknown";
this.department = "unknown";
}

public Person(String firstname, String lastname) {
public Person(String firstname, String lastname, String department) {
this.firstname = firstname;
this.lastname = lastname;
this.department = department;
}
}
}
49 changes: 43 additions & 6 deletions test_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,52 @@
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner
from autobahn.wamp.exception import ApplicationError

DEPARTMENTS = [u'hr', u'sales', u'development']

PERSONS = [
{
u'firstname': u'homer',
u'lastname': u'simpson',
u'department': u'hr',
},
{
u'firstname': u'joe',
u'lastname': u'doe',
u'department': u'development',
},
{
u'firstname': u'sammy',
u'lastname': u'davis',
u'department': u'development',
},
{
u'firstname': u'freddy',
u'lastname': u'krueger',
u'department': u'hr',
},
{
u'firstname': u'walter',
u'lastname': u'white',
u'department': u'sales',
},
{
u'firstname': u'jesse',
u'lastname': u'pinkman',
u'department': u'sales',
},
{
u'firstname': u'pablo',
u'lastname': u'escobar',
u'department': u'sales',
},
]

PERSONS_BY_DEPARTMENT = {
u'hr': [PERSONS[0], PERSONS[1]],
u'sales': [PERSONS[2], PERSONS[3], PERSONS[4]],
u'development': [PERSONS[5], PERSONS[4], PERSONS[0]],
}
PERSONS_BY_DEPARTMENT = {}

for person in PERSONS:
if person[u'department'] not in PERSONS_BY_DEPARTMENT:
PERSONS_BY_DEPARTMENT[person[u'department']] = []
PERSONS_BY_DEPARTMENT[person[u'department']].append(person)


class ClientSession(ApplicationSession):
Expand Down Expand Up @@ -72,6 +85,18 @@ def get_person(emp_no=None):

yield self.register(get_person, u'com.example.get_person')

@inlineCallbacks
def get_person_delayed(emp_no=None, delay=3):
self.log.info('PERSON API: get_person_delayed(emp_no={emp_no}, delay={delay}) called', emp_no=emp_no, delay=delay)
if delay:
yield sleep(delay)
if emp_no:
return PERSONS[emp_no]
else:
return PERSONS[0]

yield self.register(get_person_delayed, u'com.example.get_person_delayed')

def get_all_persons():
print('PERSON API: get_all_persons() called')
return PERSONS
Expand All @@ -87,6 +112,17 @@ def get_persons_by_department(department=None):

yield self.register(get_persons_by_department, u'com.example.get_persons_by_department')

def add_person(person):
self.log.info('PERSON API: add_person({person}) called', person=person)
department = person.get(u'department', None)
if department not in DEPARTMENTS:
raise Exception('no such department: {}'.format(department))

PERSONS.append(person)
PERSONS_BY_DEPARTMENT[department].append(person)

yield self.register(add_person, u'com.example.add_person')

self.log.info('PERSON API registered!')

@inlineCallbacks
Expand All @@ -102,6 +138,8 @@ def onJoin(self, details):

yield self._init_person_api()

@inlineCallbacks
def test(self):
# REGISTER
def add2(a, b):
print('----------------------------')
Expand Down Expand Up @@ -149,7 +187,6 @@ def oncounter(counter, id, type):

yield sleep(2)


def onLeave(self, details):
self.log.info("Router session closed ({details})", details=details)
self.disconnect()
Expand Down

0 comments on commit 96f8c4b

Please sign in to comment.