From 82f0842a8e2adc9d44c40cf2eac4449d8e3f25ed Mon Sep 17 00:00:00 2001 From: Glyph Date: Fri, 26 Dec 2014 15:35:43 -0800 Subject: [PATCH] Import from Twisted branch and change names. --- setup.py | 21 + tubes/__init__.py | 10 + tubes/_components.py | 53 ++ tubes/_siphon.py | 484 ++++++++++++++++++ tubes/fan.py | 443 +++++++++++++++++ tubes/framing.py | 200 ++++++++ tubes/itube.py | 287 +++++++++++ tubes/kit.py | 106 ++++ tubes/memory.py | 50 ++ tubes/protocol.py | 269 ++++++++++ tubes/routing.py | 172 +++++++ tubes/test/__init__.py | 7 + tubes/test/test_fan.py | 236 +++++++++ tubes/test/test_framing.py | 181 +++++++ tubes/test/test_kit.py | 95 ++++ tubes/test/test_memory.py | 160 ++++++ tubes/test/test_protocol.py | 279 +++++++++++ tubes/test/test_tube.py | 962 ++++++++++++++++++++++++++++++++++++ tubes/test/test_undefer.py | 155 ++++++ tubes/test/util.py | 313 ++++++++++++ tubes/tube.py | 344 +++++++++++++ tubes/undefer.py | 35 ++ 22 files changed, 4862 insertions(+) create mode 100644 setup.py create mode 100644 tubes/__init__.py create mode 100644 tubes/_components.py create mode 100644 tubes/_siphon.py create mode 100644 tubes/fan.py create mode 100644 tubes/framing.py create mode 100644 tubes/itube.py create mode 100644 tubes/kit.py create mode 100644 tubes/memory.py create mode 100644 tubes/protocol.py create mode 100644 tubes/routing.py create mode 100644 tubes/test/__init__.py create mode 100644 tubes/test/test_fan.py create mode 100644 tubes/test/test_framing.py create mode 100644 tubes/test/test_kit.py create mode 100644 tubes/test/test_memory.py create mode 100644 tubes/test/test_protocol.py create mode 100644 tubes/test/test_tube.py create mode 100644 tubes/test/test_undefer.py create mode 100644 tubes/test/util.py create mode 100644 tubes/tube.py create mode 100644 tubes/undefer.py diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..c93c1a1 --- /dev/null +++ b/setup.py @@ -0,0 +1,21 @@ +""" +Setup file for tubes. +""" + +from setuptools import setup, find_packages + +setup( + name='Tubes', + version='0.0.0', + description=""" + Flow control and backpressure for event-driven applications. + """, + packages=find_packages(exclude=[]), + package_dir={'tubes': 'tubes'}, + install_requires=[ + "characteristic", + "six", + ], + include_package_data=True, + license="MIT", +) diff --git a/tubes/__init__.py b/tubes/__init__.py new file mode 100644 index 0000000..f1bb477 --- /dev/null +++ b/tubes/__init__.py @@ -0,0 +1,10 @@ +# -*- test-case-name: tubes.test -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +L{tubes} offers an abstraction of data flow and backpressure for event-driven +applications. + +@see: L{tubes.tube} +""" diff --git a/tubes/_components.py b/tubes/_components.py new file mode 100644 index 0000000..61a7cab --- /dev/null +++ b/tubes/_components.py @@ -0,0 +1,53 @@ + +from zope.interface.adapter import AdapterRegistry +from twisted.python.components import _addHook, _removeHook +from contextlib import contextmanager + +@contextmanager +def _registryActive(registry): + """ + A context manager that activates and deactivates a zope adapter registry + for the duration of the call. + + For example, if you wanted to have a function that could adapt L{IFoo} to + L{IBar}, but doesn't expose that adapter outside of itself:: + + def convertToBar(maybeFoo): + with _registryActive(_registryAdapting((IFoo, IBar, fooToBar))): + return IBar(maybeFoo) + + @note: This isn't thread safe, so other threads will be affected as well. + + @param registry: The registry to activate. + @type registry: L{AdapterRegistry} + + @rtype: + """ + hook = _addHook(registry) + yield + _removeHook(hook) + + + +def _registryAdapting(*fromToAdapterTuples): + """ + Construct a Zope Interface adapter registry. + + For example, if you want to construct an adapter registry that can convert + C{IFoo} to C{IBar} with C{fooToBar}. + + @param fromToAdapterTuples: A sequence of tuples of C{(fromInterface, + toInterface, adapterCallable)}, where C{fromInterface} and + C{toInterface} are L{Interface}s, and C{adapterCallable} is a callable + that takes one argument which provides C{fromInterface} and returns an + object providing C{toInterface}. + @type fromToAdapterTuples: C{tuple} of 3-C{tuple}s of C{(Interface, + Interface, callable)} + + @rtype: L{AdapterRegistry} + """ + result = AdapterRegistry() + for From, to, adapter in fromToAdapterTuples: + result.register([From], to, '', adapter) + return result + diff --git a/tubes/_siphon.py b/tubes/_siphon.py new file mode 100644 index 0000000..c3efbfd --- /dev/null +++ b/tubes/_siphon.py @@ -0,0 +1,484 @@ +# -*- test-case-name: tubes.test.test_tube.SeriesTest -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Adapters for converting L{ITube} to L{IDrain} and L{IFount}. +""" + +from collections import deque + +from zope.interface import implementer + +from .itube import IPause, IDrain, IFount, ITube +from .kit import Pauser, beginFlowingFrom, beginFlowingTo +from ._components import _registryAdapting + +from twisted.python.failure import Failure + +from twisted.python import log + +whatever = object() +paused = object() +finished = object() +skip = object() + +class SiphonPendingValues(object): + """ + An iterable that iterates iterables by destructively traversing a mutable + deque. + """ + + def __init__(self): + self._deque = deque() + self._suspended = False + + + def suspend(self): + """ + Pretend to be empty until resume() is called. + """ + self._suspended = True + + + def resume(self): + """ + + """ + self._suspended = False + + + def prepend(self, iterator): + """ + + """ + self._deque.appendleft(iterator) + + + def append(self, iterator): + """ + + """ + self._deque.append(iterator) + + + def clear(self): + """ + + """ + self._deque.clear() + + + def popPendingValue(self, regardless=False): + """ + Get the next value in the leftmost iterator in the deque. + """ + if self._suspended and not regardless: + return paused + while self._deque: + result = next(self._deque[0], whatever) + if self._suspended and not regardless: + self.prepend(iter([result])) + return paused + if result is whatever: + self._deque.popleft() + else: + return result + return finished + + + +class _SiphonPiece(object): + """ + Shared functionality between L{_SiphonFount} and L{_SiphonDrain} + """ + def __init__(self, siphon): + self._siphon = siphon + + + @property + def _tube(self): + """ + Expose the siphon's C{_tube} directly since many things will want to + manipulate it. + + @return: L{ITube} + """ + return self._siphon._tube + + + +@implementer(IFount) +class _SiphonFount(_SiphonPiece): + """ + Implementation of L{IFount} for L{_Siphon}. + + @ivar fount: the implementation of the L{IDrain.fount} attribute. The + L{IFount} which is flowing to this L{_Siphon}'s L{IDrain} + implementation. + + @ivar drain: the implementation of the L{IFount.drain} attribute. The + L{IDrain} to which this L{_Siphon}'s L{IFount} implementation is + flowing. + """ + drain = None + + def __init__(self, siphon): + super(_SiphonFount, self).__init__(siphon) + + def _actuallyPause(): + fount = self._siphon._tdrain.fount + self._siphon._pending.suspend() + if fount is None: + return + self._siphon._pauseBecausePauseCalled = fount.pauseFlow() + + def _actuallyResume(): + fp = self._siphon._pauseBecausePauseCalled + self._siphon._pauseBecausePauseCalled = None + + self._siphon._pending.resume() + self._siphon._unbufferIterator() + + # TODO: validate that the siphon's fount is always set consistently + # with _pauseBecausePauseCalled. + if fp is not None: + fp.unpause() + + self._pauser = Pauser(_actuallyPause, _actuallyResume) + + + def __repr__(self): + """ + Nice string representation. + """ + return "".format(repr(self._siphon._tube)) + + + @property + def outputType(self): + """ + Relay the C{outputType} declared by the tube. + + @return: see L{IFount.outputType} + """ + return self._tube.outputType + + + def flowTo(self, drain): + """ + Flow data from this L{_Siphon} to the given drain. + + @param drain: see L{IFount.flowTo} + + @return: an L{IFount} that emits items of the output-type of this + siphon's tube. + """ + result = beginFlowingTo(self, drain) + if self._siphon._pauseBecauseNoDrain: + pbnd = self._siphon._pauseBecauseNoDrain + self._siphon._pauseBecauseNoDrain = None + pbnd.unpause() + self._siphon._unbufferIterator() + return result + + + def pauseFlow(self): + """ + Pause the flow from the fount, or remember to do that when the fount is + attached, if it isn't yet. + + @return: L{IPause} + """ + return self._pauser.pause() + + + def stopFlow(self): + """ + Stop the flow from the fount to this L{_Siphon}, and stop delivering + buffered items. + """ + self._siphon._noMore(input=True, output=True) + fount = self._siphon._tdrain.fount + if fount is None: + return + fount.stopFlow() + + + +@implementer(IPause) +class _PlaceholderPause(object): + """ + L{IPause} provider that does nothing. + """ + + def unpause(self): + """ + No-op. + """ + + + +@implementer(IDrain) +class _SiphonDrain(_SiphonPiece): + """ + Implementation of L{IDrain} for L{_Siphon}. + """ + fount = None + + def __repr__(self): + """ + Nice string representation. + """ + return ''.format(self._siphon._tube) + + + @property + def inputType(self): + """ + Relay the tube's declared inputType. + + @return: see L{IDrain.inputType} + """ + return self._tube.inputType + + + def flowingFrom(self, fount): + """ + This siphon will now have 'receive' called on it by the given fount. + + @param fount: see L{IDrain.flowingFrom} + + @return: see L{IDrain.flowingFrom} + """ + beginFlowingFrom(self, fount) + if self._siphon._pauseBecausePauseCalled: + pbpc = self._siphon._pauseBecausePauseCalled + self._siphon._pauseBecausePauseCalled = None + if fount is None: + pauseFlow = _PlaceholderPause + else: + pauseFlow = fount.pauseFlow + self._siphon._pauseBecausePauseCalled = pauseFlow() + pbpc.unpause() + if fount is not None: + if not self._siphon._canStillProcessInput: + fount.stopFlow() + # Is this the right place, or does this need to come after + # _pauseBecausePauseCalled's check? + if not self._siphon._everStarted: + self._siphon._everStarted = True + self._siphon._deliverFrom(self._tube.started) + nextFount = self._siphon._tfount + nextDrain = nextFount.drain + if nextDrain is None: + return nextFount + return nextFount.flowTo(nextDrain) + + + def receive(self, item): + """ + An item was received. Pass it on to the tube for processing. + + @param item: an item to deliver to the tube. + """ + def tubeReceivedItem(): + return self._tube.received(item) + self._siphon._deliverFrom(tubeReceivedItem) + + + def flowStopped(self, reason): + """ + This siphon's fount has communicated the end of the flow to this + siphon. This siphon should finish yielding its current buffer, then + yield the result of it's C{_tube}'s C{stopped} method, then communicate + the end of flow to its downstream drain. + + @param reason: the reason why our fount stopped the flow. + """ + self._siphon._noMore(input=True, output=False) + self._siphon._flowStoppingReason = reason + def tubeStopped(): + return self._tube.stopped(reason) + self._siphon._deliverFrom(tubeStopped) + + + +class _Siphon(object): + """ + A L{_Siphon} is an L{IDrain} and possibly also an L{IFount}, and provides + lots of conveniences to make it easy to implement something that does fancy + flow control with just a few methods. + + @ivar _tube: the L{Tube} which will receive values from this siphon and + call C{deliver} to deliver output to it. (When set, this will + automatically set the C{siphon} attribute of said L{Tube} as well, as + well as un-setting the C{siphon} attribute of the old tube.) + + @ivar _currentlyPaused: is this L{_Siphon} currently paused? Boolean: + C{True} if paused, C{False} if not. + + @ivar _pauseBecausePauseCalled: an L{IPause} from the upstream fount, + present because pauseFlow has been called. + + @ivar _flowStoppingReason: If this is not C{None}, then call C{flowStopped} + on the downstream L{IDrain} at the next opportunity, where "the next + opportunity" is when all buffered input (values yielded from + C{started}, C{received}, and C{stopped}) has been written to the + downstream drain and we are unpaused. + + @ivar _everStarted: Has this L{_Siphon} ever called C{started} on its + L{Tube}? + @type _everStarted: L{bool} + """ + + def __init__(self, tube): + """ + Initialize this L{_Siphon} with the given L{Tube} to control its + behavior. + """ + self._canStillProcessInput = True + self._pauseBecausePauseCalled = None + self._tube = None + self._everStarted = False + self._unbuffering = False + self._flowStoppingReason = None + self._pauseBecauseNoDrain = None + + self._tfount = _SiphonFount(self) + self._tdrain = _SiphonDrain(self) + self._tube = tube + self._pending = SiphonPendingValues() + + + def _noMore(self, input, output): + """ + I am now unable to produce further input, or output, or both. + + @param input: L{True} if I can no longer produce input. + + @param output: L{True} if I can no longer produce output. + """ + if input: + self._canStillProcessInput = False + if output: + self._pending.clear() + + + def __repr__(self): + """ + Nice string representation. + """ + return '<_Siphon for {0}>'.format(repr(self._tube)) + + + def _deliverFrom(self, deliverySource): + """ + Deliver some items from a callable that will produce an iterator. + + @param deliverySource: a 0-argument callable that will return an + iterable. + """ + try: + iterableOrNot = deliverySource() + except: + f = Failure() + log.err(f, "Exception raised when delivering from {0!r}" + .format(deliverySource)) + self._tdrain.fount.stopFlow() + downstream = self._tfount.drain + if downstream is not None: + downstream.flowStopped(f) + return + if iterableOrNot is None: + return + self._pending.append(iter(iterableOrNot)) + if self._tfount.drain is None: + if self._pauseBecauseNoDrain is None: + self._pauseBecauseNoDrain = self._tfount.pauseFlow() + + self._unbufferIterator() + + + def _unbufferIterator(self): + """ + Un-buffer some items buffered in C{self._pending} and actually deliver + them, as long as we're not paused. + """ + if self._unbuffering: + return + + self._unbuffering = True + + while True: + value = self._pending.popPendingValue() + if value is paused: + break + elif value is skip: + continue + elif value is finished: + if self._flowStoppingReason: + self._endOfLine(self._flowStoppingReason) + break + else: + self._tfount.drain.receive(value) + + self._unbuffering = False + + + def _endOfLine(self, flowStoppingReason): + """ + We've reached the end of the line. Immediately stop delivering all + buffers and notify our downstream drain why the flow has stopped. + """ + self._noMore(input=True, output=True) + self._flowStoppingReason = None + self._pending.clear() + downstream = self._tfount.drain + if downstream is not None: + self._tfount.drain.flowStopped(flowStoppingReason) + + + def ejectPending(self): + """ + Eject the entire pending buffer into a list for reassembly by a + diverter. + + @return: a L{list} of all buffered output values. + """ + result = [] + # self._suspended = False + while True: + value = self._pending.popPendingValue(regardless=True) + assert value is not paused, """ + TODO: if _pending is suspended here we + will get an infinite sequence of the + 'paused' object, we should probably be + sure to safely extract it. + """ + if value is finished: + return result + result.append(value) + + + + +def _tube2drain(tube): + """ + An adapter that can convert an L{ITube} to an L{IDrain} by wrapping it in a + L{_Siphon}. + + @param tube: L{ITube} + + @return: L{IDrain} + """ + return _Siphon(tube)._tdrain + + + +_tubeRegistry = _registryAdapting( + (ITube, IDrain, _tube2drain), +) + + + diff --git a/tubes/fan.py b/tubes/fan.py new file mode 100644 index 0000000..65df58f --- /dev/null +++ b/tubes/fan.py @@ -0,0 +1,443 @@ +# -*- test-case-name: tubes.test.test_fan -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Tools for turning L{founts } and L{drains } into multiple +founts and drains. +""" + +from itertools import count + +from zope.interface import implementer + +from twisted.python.components import proxyForInterface + +from .kit import Pauser, beginFlowingTo, beginFlowingFrom +from .itube import IDrain, IFount, IPause + + +@implementer(IDrain) +class _InDrain(object): + """ + + """ + + inputType = None + + fount = None + + def __init__(self, fanIn): + """ + + """ + self._in = fanIn + self._pauseBecauseNoDrain = None + + + def flowingFrom(self, fount): + """ + + """ + beginFlowingFrom(self, fount) + # Except the fount is having similar thoughts about us as a drain, and + # this can only happen in one order or the other. right now siphon + # takes care of it. + self._checkNoDrainPause() + return None + + + def _checkNoDrainPause(self): + """ + + """ + pbnd = self._pauseBecauseNoDrain + self._pauseBecauseNoDrain = None + # Do this _before_ unpausing the old one; if it's a new fount, the + # order doesn't matter, but if it's the old fount, then doing it in + # this order ensures it never actually unpauses, we just hand off one + # pause for the other. + if self.fount is not None and self._in.fount.drain is None: + self._pauseBecauseNoDrain = self.fount.pauseFlow() + if pbnd is not None: + pbnd.unpause() + + + def receive(self, item): + """ + + """ + return self._in.fount.drain.receive(item) + + + def flowStopped(self, reason): + """ + + """ + return self._in.fount.drain.flowStopped(reason) + + + +@implementer(IFount) +class _InFount(object): + """ + + """ + + outputType = None + + drain = None + + def __init__(self, fanIn): + """ + + """ + self._in = fanIn + + + def flowTo(self, drain): + """ + + """ + result = beginFlowingTo(self, drain) + for drain in self._in._drains: + drain._checkNoDrainPause() + return result + + + def pauseFlow(self): + """ + + """ + subPauses = [] + for drain in self._in._drains: + # XXX wrong because drains could be added and removed + subPauses.append(drain.fount.pauseFlow()) + return _AggregatePause(subPauses) + + + def stopFlow(self): + """ + + """ + for drain in self._in._drains: + drain.fount.stopFlow() + + + +@implementer(IPause) +class _AggregatePause(object): + """ + + """ + + def __init__(self, subPauses): + """ + + """ + self._subPauses = subPauses + + + def unpause(self): + """ + + """ + for subPause in self._subPauses: + subPause.unpause() + + + +class In(object): + r""" + A fan.L{In} presents a single L{fount } that delivers the inputs + from multiple L{drains }:: + + your fount ---> In.newDrain()--\ + \ + your fount ---> In.newDrain()----> In ---> In.fount ---> your drain + / + your fount ---> In.newDrain()--/ + + @ivar fount: The fount which produces all new attributes. + @type fount: L{IFount} + """ + def __init__(self): + self.fount = _InFount(self) + self._drains = [] + self._subdrain = None + + + def newDrain(self): + """ + Create a new L{drains } which will send its + inputs out via C{self.fount}. + + @return: a drain. + """ + it = _InDrain(self) + self._drains.append(it) + return it + + + +@implementer(IFount) +class _OutFount(object): + """ + The concrete fount type returned by L{Out.newFount}. + """ + drain = None + + outputType = None + + def __init__(self, upstreamPauser, stopper): + """ + @param upstreamPauser: A L{Pauser} which will pause the upstream fount + flowing into our L{Out}. + + @param stopper: A 0-argument callback to execute on + L{IFount.flowStopped} + """ + self._receivedWhilePaused = [] + self._myPause = None + self._stopper = stopper + + def actuallyPause(): + self._myPause = upstreamPauser.pause() + + def actuallyUnpause(): + aPause = self._myPause + self._myPause = None + if self._receivedWhilePaused: + self.drain.receive(self._receivedWhilePaused.pop(0)) + aPause.unpause() + + self._pauser = Pauser(actuallyPause, actuallyUnpause) + + + def flowTo(self, drain): + """ + Flow to the given drain. Don't do anything special; just set up the + drain attribute and return the appropriate value. + + @param drain: A drain to fan out values to. + + @return: the result of C{drain.flowingFrom} + """ + return beginFlowingTo(self, drain) + + + def pauseFlow(self): + """ + Pause the flow. + """ + return self._pauser.pause() + + + def stopFlow(self): + """ + Invoke the callback supplied to C{__init__} for stopping. + """ + self._stopper(self) + + + def _deliverOne(self, item): + """ + Deliver one item to this fount's drain. + + This is only invoked when the upstream is unpaused. + + @param item: An item that the upstream would like to pass on. + """ + if self.drain is None: + return + if self._myPause is not None: + self._receivedWhilePaused.append(item) + return + self.drain.receive(item) + + + +@implementer(IDrain) +class _OutDrain(object): + """ + + """ + + fount = None + + def __init__(self, founts, inputType, outputType): + """ + + """ + self._pause = None + self._paused = False + + self._founts = founts + + def _actuallyPause(): + if self._paused: + raise NotImplementedError() + self._paused = True + if self.fount is not None: + self._pause = self.fount.pauseFlow() + + def _actuallyResume(): + p = self._pause + self._pause = None + self._paused = False + if p is not None: + p.unpause() + + self._pauser = Pauser(_actuallyPause, _actuallyResume) + + self.inputType = inputType + self.outputType = outputType + + + def flowingFrom(self, fount): + """ + + """ + if self._paused: + p = self._pause + if fount is not None: + self._pause = fount.pauseFlow() + else: + self._pause = None + if p is not None: + p.unpause() + self.fount = fount + + + def receive(self, item): + """ + + """ + for fount in self._founts[:]: + fount._deliverOne(item) + + + def flowStopped(self, reason): + """ + + """ + for fount in self._founts[:]: + if fount.drain is not None: + fount.drain.flowStopped(reason) + + + +class Out(object): + r""" + A fan.L{Out} presents a single L{drain } that delivers the inputs + to multiple L{founts }:: + + /--> Out.newFount() --> your drain + / + your fount --> Out.drain --> Out <----> Out.newFount() --> your drain + \ + \--> Out.newFount() --> your drain + + @ivar drain: The fount which produces all new attributes. + @type drain: L{IDrain} + """ + + def __init__(self, inputType=None, outputType=None): + """ + + """ + self._founts = [] + self.drain = _OutDrain(self._founts, inputType=inputType, + outputType=outputType) + + + def newFount(self): + """ + + """ + f = _OutFount(self.drain._pauser, self._founts.remove) + self._founts.append(f) + return f + + + +class Thru(proxyForInterface(IDrain, "_outDrain")): + r""" + A fan.L{Thru} takes an input and fans it I{thru} multiple + drains-which-produce-founts, such as L{tubes }:: + + Your Fount + (producing "foo") + | + v + Thru + | + _/|\_ + _/ | \_ + / | \ + foo2bar foo2baz foo2qux + \_ | _/ + \_ | _/ + \|/ + | + v + Thru + | + v + Your Drain + (receiving a combination + of foo, bar, baz) + + The way you would construct such a flow in code would be:: + + yourFount.flowTo(Thru([series(foo2bar()), + series(foo2baz()), + series(foo2qux())])).flowTo(yourDrain) + """ + + def __init__(self, drains): + """ + Create a L{Thru} with an iterable of L{IDrain}. + + All of the drains in C{drains} should be drains that produce a new + L{IFount} from L{flowingFrom }, which means they + should be a L{series } of L{tubes + }, or drains that behave like that, such as L{Thru} + itself. + + @param drain: an iterable of L{IDrain} + """ + self._in = In() + self._out = Out() + + self._drains = list(drains) + self._founts = list(None for drain in self._drains) + self._outFounts = list(self._out.newFount() for drain in self._drains) + self._inDrains = list(self._in.newDrain() for drain in self._drains) + self._outDrain = self._out.drain + + + def flowingFrom(self, fount): + """ + Accept input from C{fount} and produce output filtered by all of the + C{drain}s given to this L{Thru}'s constructor. + + @param fount: + """ + super(Thru, self).flowingFrom(fount) + for idx, appDrain, outFount, inDrain in zip( + count(), self._drains, self._outFounts, self._inDrains): + appFount = outFount.flowTo(appDrain) + if appFount is None: + appFount = self._founts[idx] + else: + self._founts[idx] = appFount + appFount.flowTo(inDrain) + nextFount = self._in.fount + + # Literally copy/pasted from _SiphonDrain.flowingFrom. Hmm. + nextDrain = nextFount.drain + if nextDrain is None: + return nextFount + return nextFount.flowTo(nextDrain) + diff --git a/tubes/framing.py b/tubes/framing.py new file mode 100644 index 0000000..7f4136c --- /dev/null +++ b/tubes/framing.py @@ -0,0 +1,200 @@ +# -*- test-case-name: tubes.test.test_framing -*- +""" +Tubes that can convert streams of data into discrete chunks and back again. +""" + +from zope.interface import implementer + +from .itube import IDivertable, IFrame, ISegment +from .tube import tube, series, Diverter +from twisted.protocols.basic import ( + LineOnlyReceiver, NetstringReceiver, Int8StringReceiver, + Int16StringReceiver, Int32StringReceiver +) + +class _Transporter(object): + """ + Just enough of a mock of L{ITransport} to work with the protocols in + L{twisted.protocols.basic}, as a wrapper around a callable taking some + data. + + @ivar _dataWritten: 1-argument callable taking L{bytes}, a chunk of data + from a stream. + """ + + def __init__(self, dataWritten): + self._dataWritten = dataWritten + + + def write(self, data): + """ + Call the C{_dataWritten} callback. + """ + self._dataWritten(data) + + + def writeSequence(self, dati): + """ + Call the C{_dataWritten} callback for each element. + """ + for data in dati: + self._dataWritten(data) + + + +@tube +class _FramesToSegments(object): + """ + A tube which could convert "L{frames