-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrx.py
67 lines (55 loc) · 2.45 KB
/
rx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# AUTHOR: STEPHAN MULLER
# OWNER: VIRGINIA TECH/LOCKHEED MARTIN
# RX Module
# This class provides the framework for generating a fully reliable
# transmission system. The VTECE-4805-6-LM project's conclusion was
# reached once binary bits were recovered from the received signal.
# Next steps are to decode the FEC encoding, and restructure the packets.
# Please view test_packets.py to show an example of packet reconstruction.
import SoapySDR
from SoapySDR import SOAPY_SDR_RX
from SoapySDR import SOAPY_SDR_CF32 #SOAPY_SDR_ constants
import numpy as np #use numpy for buffers
import matplotlib.pyplot as plt
import pickle
import time
class RxController:
def __init__(self, payload_size=1024, freq=2.412e9, bw=10e6, fs=1e6, gain=30) -> None:
self.setup_sdr(freq, bw, fs, gain)
self.rx_buffer = np.array([0]*10200, np.complex64)
self.signal = np.array([], dtype=np.complex64)
def setup_sdr(self, freq, bw, fs, gain):
self.sdr = SoapySDR.Device(dict(driver='lime')) #create connection with SDR
self.sdr.setSampleRate(SOAPY_SDR_RX, 0, fs)
self.sdr.setFrequency(SOAPY_SDR_RX, 0, freq)
self.sdr.setBandwidth(SOAPY_SDR_RX, 0, bw)
self.sdr.setGain(SOAPY_SDR_RX, 0, gain)
self.rx_stream = self.sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32)
def receive(self):
self.sdr.activateStream(self.rx_stream)
try:
received = False
receiving = False
while not received:
sr = self.sdr.readStream(self.rx_stream, [self.rx_buffer], len(self.rx_buffer))
if np.mean(np.abs(self.rx_buffer)) > .05:
receiving = True
self.signal = np.append(self.signal, self.rx_buffer)
elif receiving:
received = True
print("finished receiving")
# f = open(f'rx_{time.strftime("%H_%M_%S")}.pkl', 'wb')
# pickle.dump(self.signal, f)
# self.signal = np.array([], dtype=np.complex64)
except KeyboardInterrupt:
pass
plt.plot(np.real(self.signal[:10000]), label="I")
plt.plot(np.imag(self.signal[:10000]), label="Q")
plt.legend()
plt.title("I/Q Rx Samples Received")
plt.show()
self.sdr.deactivateStream(self.rx_stream) #stop streaming
self.sdr.closeStream(self.rx_stream)
if __name__ == '__main__':
rx_controller = RxController(payload_size=15)
rx_controller.receive()