forked from f4exb/sdrangel
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfreqtracking.py
executable file
·281 lines (256 loc) · 13.8 KB
/
freqtracking.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
#!/usr/bin/env python3
'''
Frequency tracker:
Listens on a TCP port for SDRangel reverse API requests.
- When the request comes from a FreqTracker channel it gets the FreqTracker channel frequency shift
- When the request comes from another channel it records the difference between the FreqTracker frequency shift and
this channel frequency shift. Then it will periodically send a center frequency change request to this channel with
the FreqTracker channel frequency shift plus the difference thus achieving the locking of this channel to the FreqTracker
channel frequency.
- If the reply from the channel returns an error it will un-register the channel.
In the SDRangel instance you must activate the reverse API of the FreqTracker channel and the controlled channel(s)
reverse API giving the address and port of this instance of the script. You have to click on the small grey box at the
top left of the plugin GUI to open the channel details dialog where the reverse API can be configured.
'''
import requests
import time
import argparse
from flask import Flask
from flask import request, jsonify
SDRANGEL_API_ADDR = None
SDRANGEL_API_PORT = 8091
TRACKER_OFFSET = 0
TRACKER_DEVICE = 0
TRACKING_DICT = {}
TRACKER_FREQUENCY = None
XVTR_DEVICE = None
REFCORR_LIMIT = 1000
app = Flask(__name__)
# ======================================================================
def getInputOptions():
""" This is the argument line parser """
# ----------------------------------------------------------------------
parser = argparse.ArgumentParser(description="Manages PTT from an SDRangel instance automatically")
parser.add_argument("-A", "--address", dest="addr", help="listening address (default 0.0.0.0)", metavar="IP", type=str)
parser.add_argument("-P", "--port", dest="port", help="listening port (default 8888)", metavar="PORT", type=int)
parser.add_argument("-a", "--address-sdr", dest="sdrangel_address", help="SDRangel REST API address (defaults to calling address)", metavar="ADDRESS", type=str)
parser.add_argument("-p", "--port-sdr", dest="sdrangel_port", help="SDRangel REST API port (default 8091)", metavar="PORT", type=int)
parser.add_argument("-f", "--tracker-frequency", dest="tracker_frequency", help="Absolute frequency the tracker should aim at (Hz, optional)", metavar="FREQ", type=int)
parser.add_argument("-r", "--refcorr-limit", dest="refcorr_limit", help="Limit of the tracker frequency reference correction (Hz, optional, default 1000 Hz)", metavar="DFREQ", type=int)
parser.add_argument("-d", "--transverter-device", dest="transverter_device", help="Transverter device index to use for tracker frequency correction (optional)", metavar="DEVICE", type=int)
options = parser.parse_args()
if options.addr == None:
options.addr = "0.0.0.0"
if options.port == None:
options.port = 8888
if options.sdrangel_port == None:
options.sdrangel_port = 8091
return options.addr, options.port, options.sdrangel_address, options.sdrangel_port, options.tracker_frequency, options.transverter_device, options.refcorr_limit
# ======================================================================
def get_sdrangel_ip(request):
""" Extract originator address from request """
# ----------------------------------------------------------------------
if SDRANGEL_API_ADDR is not None:
return SDRANGEL_API_ADDR
if request.environ.get('HTTP_X_FORWARDED_FOR') is None:
return request.environ['REMOTE_ADDR']
else:
return request.environ['HTTP_X_FORWARDED_FOR']
# ======================================================================
def gen_dict_extract(key, var):
""" Gets a key value in a dictionnary or sub-dictionnary structure """
# ----------------------------------------------------------------------
if hasattr(var,'items'):
for k, v in var.items():
if k == key:
yield v
if isinstance(v, dict):
for result in gen_dict_extract(key, v):
yield result
elif isinstance(v, list):
for d in v:
for result in gen_dict_extract(key, d):
yield result
# ======================================================================
def update_frequency_setting(request_content, frequency_key, frequency):
""" Finds the channel settings key that contains the inputFrequencyOffset key
and replace it with a single inputFrequencyOffset key with new frequency
"""
# ----------------------------------------------------------------------
for k in request_content:
setting_item = request_content[k]
if isinstance(setting_item, dict):
if frequency_key in setting_item:
setting_item.update({
frequency_key: frequency
})
# ======================================================================
def get_device_frequency(sdrangel_ip, sdrangel_port, device_index):
""" Obtain the device center frequency from either the settings or
the report
"""
# ----------------------------------------------------------------------
base_url = f'http://{sdrangel_ip}:{sdrangel_port}/sdrangel'
device_frequency = None
# get frequency from settings
r = requests.get(url=base_url + f'/deviceset/{device_index}/device/settings')
if r.status_code // 100 == 2:
device_content = r.json()
for freq in gen_dict_extract('centerFrequency', device_content):
device_frequency = freq
# get frequency from report
if device_frequency is None:
r = requests.get(url=base_url + f'/deviceset/{device_index}/device/report')
if r.status_code // 100 != 2:
return None
device_content = r.json()
for freq in gen_dict_extract('centerFrequency', device_content):
device_frequency = freq
return device_frequency
# ======================================================================
def adjust_channels(sdrangel_ip, sdrangel_port):
""" Adjust registered channels center frequencies
Remove keys for channels returning error
"""
# ----------------------------------------------------------------------
global TRACKING_DICT
base_url = f'http://{sdrangel_ip}:{sdrangel_port}/sdrangel'
remove_keys = []
for k in TRACKING_DICT:
device_index = k[0]
channel_index = k[1]
tracking_item = TRACKING_DICT[k]
frequency_correction = TRACKER_OFFSET - tracking_item['trackerFrequency']
frequency = tracking_item['channelFrequency'] + frequency_correction
update_frequency_setting(tracking_item['requestContent'], 'inputFrequencyOffset', frequency)
r = requests.patch(url=base_url + f'/deviceset/{device_index}/channel/{channel_index}/settings', json=tracking_item['requestContent'])
if r.status_code // 100 != 2:
remove_keys.append(k)
for k in remove_keys:
tracking_item = TRACKING_DICT.pop(k, None)
if tracking_item:
request_content = tracking_item.get('requestContent')
if request_content:
channel_type = request_content.get('channelType')
else:
channel_type = 'Undefined'
device_index = k[0]
channel_index = k[1]
print(f'SDRangel: {sdrangel_ip}:{sdrangel_port} Removed {channel_type} [{device_index}:{channel_index}]')
# ======================================================================
def adjust_xvtr(sdrangel_ip, sdrangel_port, tracker_device_index, tracker_channel_index, tracker_content, tracker_offset):
""" Adjust transverter frequency so that the frequency tracker absolute frequency is loosely locked to the
carrier it is supposed to track
"""
# ----------------------------------------------------------------------
global TRACKER_OFFSET
base_url = f'http://{sdrangel_ip}:{sdrangel_port}/sdrangel'
correction = 0
tracker_device_frequency = get_device_frequency(sdrangel_ip, sdrangel_port, tracker_device_index)
# get correction from report
if tracker_device_frequency is None:
print(f'SDRangel::adjust_xvtr: {sdrangel_ip}:{SDRANGEL_API_PORT} get tracker device {tracker_device_index} frequency failed')
return
tracker_frequency = tracker_device_frequency + tracker_offset
correction = TRACKER_FREQUENCY - tracker_frequency
# do not correct if correction is too small
if correction > -REFCORR_LIMIT and correction < REFCORR_LIMIT:
return
# apply correction
r = requests.get(url=base_url + f'/deviceset/{XVTR_DEVICE}/device/settings')
if r.status_code // 100 != 2:
print(f'SDRangel::adjust_xvtr: {sdrangel_ip}:{SDRANGEL_API_PORT} get transverter device {XVTR_DEVICE} settings failed')
return
device_content = r.json()
for xvtr_freq in gen_dict_extract('transverterDeltaFrequency', device_content):
# device
update_frequency_setting(device_content, 'transverterDeltaFrequency', xvtr_freq + correction)
r = requests.patch(url=base_url + f'/deviceset/{XVTR_DEVICE}/device/settings', json=device_content)
if r.status_code // 100 != 2:
print(f'SDRangel::adjust_xvtr: {sdrangel_ip}:{SDRANGEL_API_PORT} transverter device {XVTR_DEVICE} adjust failed')
return
# tracker
TRACKER_OFFSET = tracker_offset + correction
update_frequency_setting(tracker_content, 'inputFrequencyOffset', tracker_offset + correction)
r = requests.patch(url=base_url + f'/deviceset/{tracker_device_index}/channel/{tracker_channel_index}/settings', json=tracker_content)
if r.status_code // 100 != 2:
print(f'SDRangel::adjust_xvtr: {sdrangel_ip}:{SDRANGEL_API_PORT} tracker [{tracker_device_index}:{tracker_channel_index}] adjust failed')
# ======================================================================
def register_channel(device_index, channel_index, channel_frequency, request_content):
""" Register a channel or change its center frequency reference """
# ----------------------------------------------------------------------
global TRACKING_DICT
TRACKING_DICT.update({
(device_index, channel_index) : {
'channelFrequency': channel_frequency,
'trackerFrequency': TRACKER_OFFSET,
'requestContent': request_content
}
})
# ======================================================================
@app.route('/ftrack/refcorr/<int:correction>', methods=['PUT'])
def ftrack_set_corr(correction):
""" Frequency tracker set reference correction limit """
# ----------------------------------------------------------------------
global REFCORR_LIMIT
sdrangel_ip = get_sdrangel_ip(request)
print(f'ftrack_set_corr: {correction}')
REFCORR_LIMIT = correction
return 'OK processed'
# ======================================================================
@app.route('/sdrangel')
def hello_sdrangel():
""" Just to test if it works """
# ----------------------------------------------------------------------
sdrangel_ip = get_sdrangel_ip(request)
print(f'SDRangel IP: {sdrangel_ip}')
return 'Hello, SDRangel!'
# ======================================================================
@app.route('/sdrangel/deviceset/<int:deviceset_index>/channel/<int:channel_index>/settings', methods=['GET', 'PATCH', 'PUT'])
def channel_settings(deviceset_index, channel_index):
""" Receiving channel settings from reverse API """
# ----------------------------------------------------------------------
global TRACKER_OFFSET
global TRACKER_DEVICE
orig_device_index = None
orig_channel_index = None
content = request.get_json(silent=True)
if content:
orig_device_index = content.get('originatorDeviceSetIndex')
orig_channel_index = content.get('originatorChannelIndex')
if orig_device_index is None or orig_channel_index is None:
print('device_settings: SDRangel reverse API v4.5.2 or higher required. No or invalid originator information')
return "SDRangel reverse API v4.5.2 or higher required "
sdrangel_ip = get_sdrangel_ip(request)
channel_type = content.get('channelType')
for freq_offset in gen_dict_extract('inputFrequencyOffset', content):
if channel_type == "FreqTracker":
print(f'SDRangel: {sdrangel_ip}:{SDRANGEL_API_PORT} Tracker [{orig_device_index}:{orig_channel_index}] at {freq_offset} Hz')
TRACKER_OFFSET = freq_offset
TRACKER_DEVICE = orig_device_index
adjust_channels(sdrangel_ip, SDRANGEL_API_PORT)
if TRACKER_FREQUENCY is not None and XVTR_DEVICE is not None: # optionally lock tracker to the beacon it is supposed to follow
adjust_xvtr(sdrangel_ip, SDRANGEL_API_PORT, orig_device_index, orig_channel_index, content, freq_offset)
else:
register_channel(orig_device_index, orig_channel_index, freq_offset, content)
print(f'SDRangel: {sdrangel_ip}:{SDRANGEL_API_PORT} {channel_type} [{orig_device_index}:{orig_channel_index}] at {freq_offset} Hz')
return "OK processed "
# ======================================================================
def main():
""" This is the main routine """
# ----------------------------------------------------------------------
global SDRANGEL_API_ADDR
global SDRANGEL_API_PORT
global TRACKER_FREQUENCY
global XVTR_DEVICE
global REFCORR_LIMIT
addr, port, SDRANGEL_API_ADDR, SDRANGEL_API_PORT, TRACKER_FREQUENCY, XVTR_DEVICE, refcorr_limit = getInputOptions()
if refcorr_limit is not None:
REFCORR_LIMIT = refcorr_limit
print(f'main: starting at: {addr}:{port}')
app.run(debug=True, host=addr, port=port)
# ======================================================================
if __name__ == "__main__":
""" When called from command line... """
# ----------------------------------------------------------------------
main()