Skip to content

Commit

Permalink
[Add] Add event engine
Browse files Browse the repository at this point in the history
  • Loading branch information
vnpy committed Jan 5, 2019
1 parent e3c22e3 commit 9d3ec12
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 4 deletions.
8 changes: 4 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Python
*.pyc

# IDE
.vscode
.idea

# Python
.pyc
.pyo

# Temp
build
dist
Empty file added vnpy/__init__.py
Empty file.
1 change: 1 addition & 0 deletions vnpy/event/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from engine import Event, EventEngine, EVENT_TIMER
139 changes: 139 additions & 0 deletions vnpy/event/engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
from queue import Queue, Empty
from threading import Thread, Timer
from collections import defaultdict
from time import sleep
from typing import Any, Callable

EVENT_TIMER = "eTimer"


class Event:
"""
Event object consists of a type string which is used
by event engine for distributing event, and a data
object which contains the real data.
"""

def __init__(self, type: str, data: Any = None):
""""""
self.type = type
self.data = data


# Defines handler function to be used in event engine.
HandlerType = Callable[[Event], None]


class EventEngine:
"""
Event engine distributes event object based on its type
to those handlers registered.
It also generates timer event by every interval seconds,
which can be used for timing purpose.
"""

def __init__(self, interval: int = 1):
"""
Timer event is generated every 1 second by default, if
interval not specified.
"""
self._interval = interval
self._queue = Queue()
self._active = False
self._thread = Thread(target=self._run)
self._timer = Thread(target=self._run_timer)
self._handlers = defaultdict(list)
self._general_handlers = []

def _run(self):
"""
Get event from queue and then process it.
"""
while self._active:
try:
event = self._queue.get(block=True, timeout=1)
self._process(event)
except Empty:
pass

def _process(self, event: Event):
"""
First ditribute event to those handlers registered listening
to this type.
Then distrubute event to those general handlers which listens
to all types.
"""
if event.type in self._handlers:
[handler(event) for handler in self._handlers[event.type]]

if self._general_handlers:
[handler(event) for handler in self._general_handlers]

def _run_timer(self):
"""
Sleep by interval second(s) and then generate a timer event.
"""
while self._active:
sleep(self._interval)
event = Event(EVENT_TIMER)
self.put(event)

def start(self):
"""
Start event engine to process events and generate timer events.
"""
self._active = True
self._thread.start()
self._timer.start()

def stop(self):
"""
Stop event engine.
"""
self._active = False
self._timer.join()
self._thread.join()

def put(self, event: Event):
"""
Put an event object into event queue.
"""
self._queue.put(event)

def register(self, type: str, handler: HandlerType):
"""
Register a new handler function for a specific event type. Every
function can only be registered once for each event type.
"""
handler_list = self._handlers[type]
if handler not in handler_list:
handler_list.append(handler)

def unregister(self, type: str, handler: HandlerType):
"""
Unregister an existing handler function from event engine.
"""
handler_list = self._handlers[type]

if handler in handler_list:
handler_list.remove(handler)

if not handler_list:
self._handlers.pop(type)

def register_general(self, handler: HandlerType):
"""
Register a new handler function for all event types. Every
function can only be registered once for each event type.
"""
if handler not in self._general_handlers:
self._general_handlers.append(handler)

def unregister_general(self, handler: HandlerType):
"""
Unregister an existing general handler function.
"""
if handler in self._general_handlers:
self._general_handlers.remove(handler)

0 comments on commit 9d3ec12

Please sign in to comment.