Skip to content

Commit

Permalink
Sewio updates (#233)
Browse files Browse the repository at this point in the history
* adding hardware option to killerbee

* debugging goodfet read

* fixed up mypy

* add deprecation notice

* added changes from PR 148

* fixed indents

* changed built-in list types to typing.List

* fixed zbstumbler byte type mismatches

* rely on capabilities to throw errors, changed other exceptions to TypeError

* added back accidentally removed tests for jamming method not supported

Co-authored-by: Taylor <[email protected]>
Co-authored-by: Taylor <[email protected]>
Co-authored-by: Taylor <[email protected]>
  • Loading branch information
4 people authored Jul 14, 2021
1 parent 3bbf7eb commit bb71d57
Show file tree
Hide file tree
Showing 11 changed files with 265 additions and 154 deletions.
42 changes: 21 additions & 21 deletions killerbee/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def show_dev(vendor: str=None, product: str=None, gps: str=None, include: str=No
@param include: Provide device names in this argument if you would like only
these to be enumerated. Aka, include only these items.
'''
fmt: str = "{: >14} {: <20} {: >10}"
fmt: str = "{: >14} {: <30} {: >10}"
print((fmt.format("Dev", "Product String", "Serial Number")))
for dev in devlist(vendor=vendor, product=product, gps=gps, include=include):
# Using None as a format value is an TypeError in python3
Expand Down Expand Up @@ -76,42 +76,39 @@ def __init__(self, device: Optional[str]=None, hardware: Optional[str]=None, dat
else: del isSewio


if device is not None:
self.dev = device

if hardware is None:
print("Auto-detection is being deprecated - Please specify hardware")
elif self.dev is not None:
elif device is not None:
if hardware == "apimote":
from .dev_apimote import APIMOTE
self.driver = APIMOTE(self.dev)
self.driver = APIMOTE(device)
elif hardware == "rzusbstick":
from .dev_rzusbstick import RZUSBSTICK
self.driver = RZUSBSTICK(self.dev, self.__bus)
self.driver = RZUSBSTICK(device, self.__bus)
elif hardware == "cc2530":
from .dev_cc253x import CC253x
self.driver = CC253x(self.dev, self.__bus, CC253x.VARIANT_CC2530)
self.driver = CC253x(device, self.__bus, CC253x.VARIANT_CC2530)
elif hardware == "cc2531":
from .dev_cc253x import CC253x
self.driver = CC253x(self.dev, self.__bus, CC253x.VARIANT_CC2531)
self.driver = CC253x(device, self.__bus, CC253x.VARIANT_CC2531)
elif hardware == "bumblebee":
from .dev_bumblebee import Bumblebee
self.driver = Bumblebee(self.dev, self.__bus)
self.driver = Bumblebee(device, self.__bus)
elif hardware == "sl_nodetest":
from .dev_sl_nodetest import SL_NODETEST
self.driver = SL_NODETEST(self.dev)
self.driver = SL_NODETEST(device)
elif hardware == "sl_beehive":
from .dev_sl_beehive import SL_BEEHIVE
self.driver = SL_BEEHIVE(self.dev)
self.driver = SL_BEEHIVE(device)
elif hardware == "zigduino":
from .dev_zigduino import ZIGDUINO
self.driver = ZIGDUINO(self.dev)
self.driver = ZIGDUINO(device)
elif hardware == "freakdruino":
from .dev_freakduino import FREAKDUINO
self.driver = FREAKDUINO(self.dev)
self.driver = FREAKDUINO(device)
elif hardware == "telosb":
from .dev_telosb import TELOSB
self.driver = TELOSB(self.dev)
self.driver = TELOSB(device)

# Figure out a device is one is not set, trying USB devices next
if self.driver is None:
Expand Down Expand Up @@ -208,8 +205,10 @@ def __device_is(self, vendorId, productId):
@rtype: Boolean
@return: True if KillerBee class has device matching the vendor and product IDs provided.
'''
if self.dev.idVendor == vendorId and self.dev.idProduct == productId: return True
else: return False
if self.dev.idVendor == vendorId and self.dev.idProduct == productId:
return True
else:
return False

def get_dev_info(self) -> List[str]:
'''
Expand Down Expand Up @@ -442,7 +441,7 @@ def pnext(self, timeout: int=100) -> Optional[Dict[Union[int, str], Any]]:

return self.driver.pnext(timeout)

def jammer_on(self, channel: Optional[int]=None):
def jammer_on(self, channel: Optional[int]=None, method: Optional[str]=None):
'''
Attempts reflexive jamming on all 802.15.4 frames.
Targeted frames must be >12 bytes for reliable jamming in current firmware.
Expand All @@ -454,9 +453,10 @@ def jammer_on(self, channel: Optional[int]=None):
if self.driver is None:
raise KBInterfaceError("Driver not configured")

return self.driver.jammer_on(channel=channel)
return self.driver.jammer_on(channel=channel, method=method)


def jammer_off(self, channel: Optional[int]=None):
def jammer_off(self):
'''
End reflexive jamming on all 802.15.4 frames.
Targeted frames must be >12 bytes for reliable jamming in current firmware.
Expand All @@ -468,5 +468,5 @@ def jammer_off(self, channel: Optional[int]=None):
if self.driver is None:
raise KBInterfaceError("Driver not configured")

return self.driver.jammer_off(channel=channel)
return self.driver.jammer_off()

25 changes: 19 additions & 6 deletions killerbee/dev_apimote.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __set_capabilities(self) -> None:
self.capabilities.setcapab(KBCapabilities.INJECT, True)
self.capabilities.setcapab(KBCapabilities.PHYJAM_REFLEX, True)
self.capabilities.setcapab(KBCapabilities.SET_SYNC, True)
self.capabilities.setcapab(KBCapabilities.PHYJAM, True)
return

# KillerBee expects the driver to implement this function
Expand Down Expand Up @@ -259,19 +260,28 @@ def ping(self, da: Any, panid: Any, sa: Any, channel: Optional[int]=None, page:
'''
raise Exception('Not yet implemented')

def jammer_on(self, channel: Optional[int]=None, page: int=0) -> None:
def jammer_on(self, channel: Optional[int]=None, page: int=0, method: Optional[str]=None) -> None:
'''
Not yet implemented.
Implements reflexive jamming or constant carrier wave jamming.
@type channel: Integer
@param channel: Sets the channel, optional
@type page: Integer
@param page: Sets the subghz page, not supported on this device
@rtype: None
'''

if self.handle is None:
raise Exception("Handle does not exist")

self.capabilities.require(KBCapabilities.PHYJAM_REFLEX)
if method is None:
method = "constant"

if method == "reflexive":
self.capabilities.require(KBCapabilities.PHYJAM_REFLEX)
elif method == "constant":
self.capabilities.require(KBCapabilities.PHYJAM)
else:
raise ValueError('Parameter "method" must be either \'reflexive\' or \'constant\'.')

self.handle.RF_promiscuity(1)
self.handle.RF_autocrc(0)
Expand All @@ -280,8 +290,11 @@ def jammer_on(self, channel: Optional[int]=None, page: int=0) -> None:
self.set_channel(channel, page)

self.handle.CC_RFST_RX()
self.handle.RF_carrier() #constant carrier wave jamming
#self.handle.RF_reflexjam() #reflexive jamming (advanced)

if method == "reflexive":
self.handle.RF_reflexjam()
elif method == 'constant':
self.handle.RF_carrier()

#TODO maybe move sync to byte string rather than int
def set_sync(self, sync: int=0xA70F) -> Any:
Expand All @@ -294,7 +307,7 @@ def set_sync(self, sync: int=0xA70F) -> Any:
raise Exception("Sync word (%x) must be 2-bytes or less." % sync)
return self.handle.poke(CC2420_REG_SYNC, sync)

def jammer_off(self, channel: Optional[int]=None, page: int=0) -> None:
def jammer_off(self) -> None:
'''
Not yet implemented.
@return: None
Expand Down
4 changes: 2 additions & 2 deletions killerbee/dev_cc253x.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def pnext(self, timeout=100):
return ret


def jammer_on(self, channel=None, page=0):
def jammer_on(self, channel=None, page=0, method=None):
"""
Not yet implemented.
@type channel: Integer
Expand All @@ -289,7 +289,7 @@ def set_sync(self, sync=0xA7):
"""
raise Exception('Not yet implemented')

def jammer_off(self, channel=None, page=0):
def jammer_off(self):
"""
Not yet implemented.
@return: None
Expand Down
4 changes: 2 additions & 2 deletions killerbee/dev_freakduino.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def ping(self, da, panid, sa, channel=None, page=0):
'''
raise Exception('Not yet implemented')

def jammer_on(self, channel=None, page=0):
def jammer_on(self, channel=None, page=0, method=None):
'''
Not yet implemented.
@type channel: Integer
Expand All @@ -333,7 +333,7 @@ def jammer_on(self, channel=None, page=0):
#TODO implement
raise Exception('Not yet implemented')

def jammer_off(self, channel=None, page=0):
def jammer_off(self):
'''
Not yet implemented.
@return: None
Expand Down
Loading

0 comments on commit bb71d57

Please sign in to comment.