Skip to content

Commit

Permalink
started to scale out test classes
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobias Oberstein committed Sep 19, 2013
1 parent e614fd2 commit 22f122b
Show file tree
Hide file tree
Showing 6 changed files with 371 additions and 21 deletions.
5 changes: 4 additions & 1 deletion autobahntestsuite/autobahntestsuite/wampcase/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@
## all WAMP test cases
##
Cases = []
Cases.extend(wampcase2_2_x_x.Cases)
#Cases.extend(wampcase2_2_x_x.Cases)
#Cases.extend(wampcase3_1_x_x.Cases)

import wampcase2_5_x_x
Cases.extend(wampcase2_5_x_x.Cases)


class WampCaseSet(CaseSet):

Expand Down
226 changes: 209 additions & 17 deletions autobahntestsuite/autobahntestsuite/wampcase/wampcase.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,221 @@
##
###############################################################################

from twisted.python import log
import json
__all__ = ('WampCase', 'WampCaseProtocol', 'WampCaseFactory',)


import json, random

from zope.interface import implementer

from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, maybeDeferred

from autobahn.websocket import connectWS
from autobahn.wamp import WampClientFactory, WampCraClientProtocol

from testrun import TestResult
from util import AttributeBag, perf_counter
from interfaces import ITestCase



class WampCaseProtocol(WampCraClientProtocol):


def sendMessage(self, payload):
self.factory.log('<pre class="wamp">TX => %s</pre>' % payload)
WampCraClientProtocol.sendMessage(self, payload)


def onMessage(self, payload, binary):
self.factory.log('<pre class="wamp">RX <= %s</pre>' % payload)
WampCraClientProtocol.onMessage(self, payload, binary)


def onSessionOpen(self):
self.factory.log("WAMP session opened to <strong>%s</strong> at <strong>%s</strong>." % (self.session_server, self.peerstr))

self.factory.result.observed[self.session_id] = []

if self.factory.test.testee.auth:
d = self.authenticate(**self.factory.test.testee.auth)
d.addCallbacks(self.onAuthSuccess, self.onAuthError)
else:
self.test()


def onAuthSuccess(self, permissions):
self.factory.log("WAMP session %s authenticated with credentials: <pre>%s</pre>" % (self.session_id, self.factory.test.testee.auth))
self.test()


def onAuthError(self, e):
uri, desc, details = e.value.args
self.factory.log("WAMP authentication error: %s" % details)
self.sendClose()


def test(self):
raise Exception("not implemented")


def ready(self):
self.factory.log("Test client prepared and ready.")
self.factory.onReady.callback(self.session_id)


def onEvent(self, topic, event):
self.factory.log("Received event for topic <pre>%s</pre> and payload <pre>%s</pre>" % (topic, event))
self.factory.result.observed[self.session_id].append((topic, event))


class WampCaseParams(AttributeBag):
"""
"""

ATTRIBUTES = ['peerCount']


class WampCaseFactory(WampClientFactory):

protocol = None

def __init__(self, peerIndex, onReady, onGone, test, result):
assert(self.protocol)
WampClientFactory.__init__(self, test.testee.url)
self.peerIndex = peerIndex
self.onReady = onReady
self.onGone = onGone
self.test = test
self.result = result
self.proto = None

def buildProtocol(self, addr):
proto = self.protocol()
proto.factory = self
proto.session_id = None
self.proto = proto
return proto

def log(self, msg):
ts = perf_counter()
sessionId = self.proto.session_id if self.proto else None
self.result.log.append((ts, self.peerIndex, sessionId, msg.encode('utf8')))
return ts

def clientConnectionLost(self, connector, reason):
reason = str(reason.value)
self.log("Client connection lost: %s" % reason)
self.onGone.callback(None)

def clientConnectionFailed(self, connector, reason):
reason = str(reason.value)
self.log("Client connection failed: %s" % reason)
self.onGone.callback(reason)



@implementer(ITestCase)
class WampCase:

FAILED = "FAILED"
OK = "OK"

SUBCASES = []
factory = None
index = None
description = None
expectation = None
params = None


def __init__(self, testee, spec):
self.testee = testee
self.spec = spec

self._uriSuffix = '#' + str(random.randint(0, 1000000))

if self.testee.options.has_key('rtt'):
self._rtt = self.testee.options['rtt']
elif self.spec.has_key('options') and self.spec['options'].has_key('rtt'):
self._rtt = self.spec['options']['rtt']
else:
self._rtt = 0.2


def test(self, result, clients):
raise Exception("not implemented")


def run(self):
assert(self.factory)
assert(self.index)
assert(self.params)

result = TestResult()
finished = Deferred()

result.passed = None
result.observed = {}
result.expected = {}
result.log = []

def log(msg, sessionIndex = None, sessionId = None):
ts = perf_counter()
result.log.append((ts, sessionIndex, sessionId, msg.encode('utf8')))
return ts

result.started = log("Test started.")

clients = []
peersready = []
peersgone = []
i = 1
for peerIndex in xrange(self.params.peerCount):
ready = Deferred()
gone = Deferred()
client = self.factory(peerIndex, ready, gone, self, result)
clients.append(client)
peersready.append(ready)
peersgone.append(gone)
connectWS(client)
i += 1

def shutdown(_):
for client in clients:
client.proto.sendClose()
log("Test client closing ...", client.peerIndex, client.proto.session_id)

def launch(_):
wait = 2.5 * self._rtt

def afterwait():
log("Continuing test ..")
d = maybeDeferred(self.test, log, result, clients)
d.addCallback(shutdown)

def beforewait():
log("Sleeping for <strong>%s ms</strong> ..." % (1000. * wait))
reactor.callLater(wait, afterwait)

beforewait()

def error(err):
## FIXME
print "ERROR", err
shutdown()
finished.errback(err)


def done(res):
result.ended = log("Test ended.")

def __init__(self, protocol):
self.p = protocol
self.init()
for r in res:
if not r[0]:
log("Client error: %s" % r[1].value)

def getSubcaseCount(self):
return len(Case.SUBCASES)
#assert(result.passed is not None)

def setSubcase(self, subcase):
self.subcase = subcase
finished.callback(result)

def init(self):
pass
DeferredList(peersready).addCallbacks(launch, error)
DeferredList(peersgone).addCallbacks(done, error)

def compare(self, obj1, obj2):
return json.dumps(obj1) == json.dumps(obj2)
return finished
81 changes: 81 additions & 0 deletions autobahntestsuite/autobahntestsuite/wampcase/wampcase2_5_x_x.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
###############################################################################
##
## Copyright 2013 Tavendo GmbH
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##
###############################################################################

__all__ = ['Cases']

## The set of cases we construct and export from this module.
## Everything else is private.
Cases = []

import random

from util import AttributeBag
from wampcase import WampCase, WampCaseFactory, WampCaseProtocol



class WampCase4_1_1_Params(AttributeBag):

ATTRIBUTES = ['peerCount', 'topicCount']


class WampCase4_1_1_Protocol(WampCaseProtocol):

def test(self):
## WAMP session opened and authenticated.

self._topics = []
for i in xrange(20):
topic = "http://example.com/simple#" + str(random.randint(0, self.factory.test.params.topicCount))
self.subscribe(topic, self.onEvent)
self._topics.append(topic)

## Signal the test controller our readiness.
self.ready()

def monkeyPublish(self, event):
i = random.randint(0, len(self._topics) - 1)
topic = self._topics[i]
self.publish(topic, event)


class WampCase4_1_1_Factory(WampCaseFactory):

protocol = WampCase4_1_1_Protocol


class WampCase4_1_1(WampCase):

factory = WampCase4_1_1_Factory
index = (4, 1, 1, 0)
description = "A NOP test."
expectation = "Nothing."
params = WampCase4_1_1_Params(peerCount = 10, topicCount = 100)


def test(self, log, result, clients):
msg = "NOP test running using %d sessions\n" % len(clients)
log(msg)
print msg
for i in xrange(100):
j = random.randint(0, len(clients) - 1)
clients[j].proto.monkeyPublish("Hello, world!")
result.passed = True


Cases.append(WampCase4_1_1)
Loading

0 comments on commit 22f122b

Please sign in to comment.