Skip to content

Commit

Permalink
Add description of dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
Bruce Robbins committed Jan 19, 2011
1 parent 514785b commit c1bb522
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 5 deletions.
3 changes: 3 additions & 0 deletions source/manual.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ Manual
manual/overview
manual/getting_events_into_s4
manual/joining_streams
manual/dispatcher
manual/specifying_keys
manual/windowing
Installing S4 core locally <manual/installing_core_locally>

97 changes: 96 additions & 1 deletion source/manual/dispatcher.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,99 @@
.. index:: dispatcher, keys, emit

Dispatcher
==========

*Coming soon.*
Overview
--------

PE instances use a dispatcher to get events to other PE instances; That is, PEs use a dispatcher to emit events. A dispatcher delivers events to PE instances, even if those instances reside on other S4 nodes in the cluster.

Using the dispatcher
--------------------

One example of a PE that uses a dispatcher is ``io.s4.processor.ReroutePE``, a platform-provided class. It uses the dispatcher as `follows <https://github.com/s4/core/blob/master/src/main/java/io/s4/processor/ReroutePE.java#L105>`_:

.. code-block:: java
dispatcher.dispatchEvent(outputStreamName, newEvent);
where ``dispatcher`` is a reference to some configured dispatcher.

The caller of Dispatcher#dispatchEvent passes the event and the stream name, but does not specify the keys on which the event will be dispatched. That decision is made by the dispatcher based on its configuration.

You configure the dispatcher with the keys on which it will dispatch events. You can specify a different set of keys for each stream.

For example, in the `speech002 <https://github.com/s4/examples/tree/master/speech02>`_ application, the dispatcher is configured to dispatch events as follows:

======================== =============================
Stream name key on which to dispatch event
======================== =============================
Sentence, SentenceJoined speechId
Speech id
======================== =============================

That is, if the the event is passed to dispatcher#dispatchEvent with a stream name of ``Sentence`` or ``SentenceJoined``, the dispatcher will dispatch the event based on the content of the event's ``speechId`` field. If the event does not have a ``speechId`` field, or the field's value is null, the event is ignored.

The dispatcher will annotate the event with information about the key that was used for dispatching. Then the dispatcher will send the event to the S4 node base on the value of the key. The receiving S4 node uses the annotation to determine which local PE instances should receive the event.

Here's how the dispatcher configuration looks in the `speech002 <https://github.com/s4/examples/tree/master/speech02>`_ application:

.. code-block:: xml
<bean id="dispatcher" class="io.s4.dispatcher.Dispatcher" init-method="init">
<property name="partitioners">
<list>
<ref bean="sentenceSpeechIdPartitioner"/>
<ref bean="speechIdPartitioner"/>
</list>
</property>
<property name="eventEmitter" ref="commLayerEmitter"/>
<property name="loggerName" value="s4"/>
</bean>
The configuration for the dispatcher itself is quite short: It specifies a list of partitioner objects (each of which you also configure), and a reference to the commLayerEmitter component. You do not need to create the commLayerEmitter component: it is instantiated by S4 runtime on startup.

Here's the configuration for the two partitioners:

.. code-block:: xml
<bean id="sentenceSpeechIdPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
<property name="streamNames">
<list>
<value>Sentence</value>
<value>SentenceJoined</value>
</list>
</property>
<property name="hashKey">
<list>
<value>speechId</value>
</list>
</property>
<property name="hasher" ref="hasher"/>
<property name="debug" value="false"/>
</bean>
<bean id="speechIdPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
<property name="streamNames">
<list>
<value>Speech</value>
</list>
</property>
<property name="hashKey">
<list>
<value>id</value>
</list>
</property>
<property name="hasher" ref="hasher"/>
<property name="debug" value="false"/>
</bean>
The partitioners specify the keys on which the associated dispatcher will dispatch events. Each partitioner is an instance of ``DefaultPartitioner``, a class that is provided by the platform. Besides specifying the stream names and keys, you also specify a reference to a hasher. The S4 runtime provides a hasher instance with the bean name ``hasher``. This hasher uses the FNV1-64 algorithm. However, you can swap it out for you own instance (as long as it implements the ``Hasher`` interface).

Compound keys, nested keys, and list keys are discussed in :doc:`specifying_keys`.






42 changes: 38 additions & 4 deletions source/manual/joining_streams.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,41 @@ In the `speech02 example application <https://github.com/s4/examples/tree/master
A note on speeches.txt
----------------------

In :doc:`/manual/getting_events_into_s4`, you used :file:`speeches.txt` as input to the load generator. :file:`speeches.txt` contains a series of Speech, Sentence, and Highlight events that can be replayed using :file:`generate_load.sh`. The story of the contents is as follows:
In :doc:`/manual/getting_events_into_s4`, you used :file:`speeches.txt` as input to the load generator. :file:`speeches.txt` contains a series of Speech, Sentence, and Highlight events that can be replayed using :file:`generate_load.sh`. The events have these attributes:

*Speech*

============== ======= ====================================================
attribute name type notes
============== ======= ====================================================
id long The id of the speech
location String The original location where the speech was delivered
speaker String The original deliverer of the speech
time long The start time of the recitation of the speech
============== ======= ====================================================

*Sentence*

============== ======= ====================================================
attribute name type notes
============== ======= ====================================================
id long The id of the sentence
speechId long The id of the speech to which this sentence belongs
time long The start time of the recitation of the sentence
location String The original location where the associated speech was
delivered (not set in :file:`speeches.txt`)
============== ======= ====================================================

*Highlight*

============== ======= ====================================================
attribute name type notes
============== ======= ====================================================
sentenceId long The id of the sentence being highlighted
time long The time at which the sentence was highlighted
============== ======= ====================================================

The story of the events in :file:`speeches.txt` is as follows:

* Anonymous actors read famous speeches out loud. Each actor chooses a speech and recites it. These recitations are captured live and the audio is streamed to listeners.
* When the audio stream for a given speech starts, a ``Speech`` event is generated. The ``time`` field in the event indicates the time at which the audio for that speech began streaming, which is also roughly the time the event was generated.
Expand Down Expand Up @@ -174,19 +208,19 @@ Here's a typical flow for the sentenceJoinPE:
#. 10 seconds later, a ``Sentence`` event for speech id 11 arrives on the Sentence stream.
#. S4 locates the sentenceJoinPE instance for speech id 11.
#. S4 calls the instance's processEvent() method.
#. The PE instance stores the event in the slot for stream ``Sentence``. Because the all slots are full, the PE instance does the following:
#. The PE instance stores the event in the slot for stream ``Sentence``. Because all slots are full, the PE instance does the following:

#. Creates a new ``Sentence`` object.
#. Copies all fields from the old ``Sentence`` event into the new ``Sentence`` event.
#. Copies the ``location`` field from the ``Speech`` event into the new ``Sentence`` event.
#. Emits the new ``Sentence`` event onto the SentenceJoined stream.
9. 4 seconds later, another ``Sentence`` event for speech id 11 arrives on the Sentence stream.
9. Four seconds later, another ``Sentence`` event for speech id 11 arrives on the Sentence stream.
#. S4 locates the sentenceJoinPE instance for speech id 11.
#. S4 calls the instance's processEvent() method.
#. The PE instance replaces the existing event in the slot for stream Sentence with the newly arrived event. Because all slots are full, the PE instance repeats the above steps for emitting a new event.


sentenceJoinPE's ``ttl`` property is set to 600 seconds (10 minutes). The framework will consider the PE instance for speech id *n* dead if that instance receives no events for 10 minutes. If an event for speech id *n* arrives after that 10 minute period of idleness, then a new instance for value *n* will be created with all slots reset. Therefore, a join succeeds only if the related events arrive within 10 minutes of each other.
sentenceJoinPE's ``ttl`` property is set to 600 seconds (10 minutes). The framework will consider the PE instance for speech id *n* dead if that instance receives no events for 10 minutes. If an event for speech id *n* arrives after that 10-minute period of idleness, then a new instance for value *n* will be created with all slots reset. Therefore, a join succeeds only if the related events arrive within 10 minutes of each other.

The sentenceJoinPE uses the configured dispatcher to dispatch the events to the appropriate nodes. The dispatcher is described in :doc:`dispatcher`.

Expand Down
2 changes: 2 additions & 0 deletions source/manual/specifying_keys.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Specifying Keys (to do)
=======================
2 changes: 2 additions & 0 deletions source/manual/windowing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Windowing (to do)
=================

0 comments on commit c1bb522

Please sign in to comment.