Skip to content

Commit

Permalink
Change SDR data buffers to persist valid
Browse files Browse the repository at this point in the history
  • Loading branch information
zuckschwerdt committed Mar 26, 2022
1 parent 2e2c307 commit 7d7b7bf
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 76 deletions.
29 changes: 29 additions & 0 deletions include/sdr.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

#include <stdint.h>

#define SDR_DEFAULT_BUF_NUMBER 15
#define SDR_DEFAULT_BUF_LENGTH 0x40000

typedef struct sdr_dev sdr_dev_t;

typedef enum sdr_event_flags {
Expand Down Expand Up @@ -48,6 +51,10 @@ int sdr_open(sdr_dev_t **out_dev, char const *dev_query, int verbose);

/** Close the device.
@note
All previous sdr_event_t buffers will be invalid after calling sdr_close().
Make sure none are in use anymore.
@param dev the device handle
@return 0 on success
*/
Expand Down Expand Up @@ -172,7 +179,29 @@ int sdr_deactivate(sdr_dev_t *dev);
*/
int sdr_reset(sdr_dev_t *dev, int verbose);

/** Start the SDR data acquisition.
@note
All previous sdr_event_t buffers will be invalid if @p buf_num or @p buf_len changed.
Make sure none are in use anymore.
@param dev the device handle
@param cb a callback for sdr_event_t messages
@param ctx a user context to be passed to @p cb
@param buf_num the number of buffers to keep
@param buf_len the size in bytes of each buffer
@return 0 on success
*/
int sdr_start(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len);

/** Stop the SDR data acquisition.
@note
All previous sdr_event_t buffers will remain valid until sdr_close().
@param dev the device handle
@return 0 on success
*/
int sdr_stop(sdr_dev_t *dev);

#endif /* INCLUDE_SDR_H_ */
81 changes: 20 additions & 61 deletions src/output_rtltcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,28 +86,25 @@ typedef SSIZE_T ssize_t;

// Only available if Threads are enabled.
// Currently serves a maximum of 1 client connection.
// A data backing of max_clients+1 slots is needed to write a data slot
// when each client is blocking a different data slot.
// Should use a global ring buffer or shared memory for sendfile() someday.
// The data backing from the SDR is assumed to be persistent, which is the case
// since we never restart the SDR with different parameters or close it while active.
// Should use shared memory for sendfile() someday.

#ifdef THREADS

#define DATA_SLOTS 2
typedef struct rtltcp_server {
struct sockaddr_storage addr;
socklen_t addr_len;
SOCKET sock;
int client_count; ///< number of connected clients

int data_recent; ///< the data slot with most recent data, -1 otherwise
int data_inuse[DATA_SLOTS]; ///< data slot is in use, 0 otherwise
void *data_buf[DATA_SLOTS]; ///< data slot memory of data_size bytes, NULL otherwise
int data_size[DATA_SLOTS]; ///< data slot data_buf size, 0 otherwise
int data_len[DATA_SLOTS]; ///< data slot valid bytes in data_buf, 0 otherwise
uint8_t const *data_buf; ///< data buffer with most recent data, NULL otherwise
uint32_t data_len; ///< data buffer length in bytes, 0 otherwise
unsigned data_cnt; ///< data buffer update counter

pthread_t thread;
pthread_mutex_t lock; ///< lock for data slots
pthread_cond_t cond; ///< wait for data slots
pthread_mutex_t lock; ///< lock for data buffer
pthread_cond_t cond; ///< wait for data buffer
r_cfg_t *cfg;
struct raw_output *output;
} rtltcp_server_t;
Expand Down Expand Up @@ -225,42 +222,16 @@ static int parse_command(r_cfg_t *cfg, uint8_t const *buf, int len)
}

// event handler to broadcast to all our sockets
static void rtltcp_broadcast_send(rtltcp_server_t *srv, uint8_t const *data, int len)
static void rtltcp_broadcast_send(rtltcp_server_t *srv, uint8_t const *data, uint32_t len)
{
// fprintf(stderr, "%s: %d byte frame\n", __func__, len);
pthread_mutex_lock(&srv->lock);
if (srv->client_count <= 0) {
pthread_mutex_unlock(&srv->lock);
return; // no clients, do nothing
}

// find a free slot
int slot = 0;
for (; slot < DATA_SLOTS; ++slot) {
if (srv->data_inuse[slot] == 0) {
break;
}
}
if (slot >= DATA_SLOTS) {
fprintf(stderr, "%s: all data slots in use!\n", __func__);
return; // this should never happen
}

// (re-)allocate slot buffer if needed
if (srv->data_buf[slot] == NULL || srv->data_size[slot] < len) {
//fprintf(stderr, "%s: allocating buffer of %d bytes for rtl_tcp\n", __func__, len);
free(srv->data_buf[slot]);
srv->data_buf[slot] = malloc(len);
if (!srv->data_buf[slot]) {
FATAL_MALLOC("rtltcp_broadcast_send()");
}
srv->data_size[slot] = len;
}
// update the data buffer reference
srv->data_buf = data;
srv->data_len = len;
srv->data_cnt += 1;

// transfer data to the buffer slot
memcpy(srv->data_buf[slot], data, len);
srv->data_len[slot] = len;
srv->data_recent = slot;
pthread_mutex_unlock(&srv->lock);
pthread_cond_signal(&srv->cond);
// perhaps broadcast if we want to support multiple clients
Expand Down Expand Up @@ -308,8 +279,8 @@ static THREAD_RETURN THREAD_CALL accept_thread(void *arg)

pthread_mutex_lock(&srv->lock);
srv->client_count += 1;
unsigned prev_cnt = srv->data_cnt + 9; // data sent in previous loop, random value to get the current buffer
pthread_mutex_unlock(&srv->lock);
int slot = -1; // data sent in previous loop

send_header(sock);

Expand Down Expand Up @@ -356,24 +327,19 @@ static THREAD_RETURN THREAD_CALL accept_thread(void *arg)
}

pthread_mutex_lock(&srv->lock);
if (srv->data_recent < 0 || srv->data_recent == slot)
while (srv->data_cnt == prev_cnt || srv->data_buf == NULL)
pthread_cond_wait(&srv->cond, &srv->lock);
// Maybe timeout to check recv()
// pthread_cond_timedwait(&srv->cond, &srv->lock, const struct timespec *abstime);

// Get data and mark as in use
slot = srv->data_recent;
srv->data_inuse[slot] += 1;
void const *data = srv->data_buf[slot];
int data_len = srv->data_len[slot];
// Get data buffer reference
void const *data = srv->data_buf;
int data_len = srv->data_len;
prev_cnt = srv->data_cnt;

pthread_mutex_unlock(&srv->lock);

send_all(sock, data, data_len, MSG_NOSIGNAL); // ignore SIGPIPE

// Mark data as done
pthread_mutex_lock(&srv->lock);
srv->data_inuse[slot] -= 1;
pthread_mutex_unlock(&srv->lock);
}

pthread_mutex_lock(&srv->lock);
Expand Down Expand Up @@ -429,8 +395,6 @@ static int rtltcp_server_start(rtltcp_server_t *srv, char const *host, char cons
srv->cfg = cfg;
srv->output = output;

srv->data_recent = -1;

char address[INET6_ADDRSTRLEN] = {0};
char portstr[NI_MAXSERV] = {0};

Expand Down Expand Up @@ -478,11 +442,6 @@ static int rtltcp_server_stop(rtltcp_server_t *srv)
pthread_cond_destroy(&srv->cond);

srv->client_count = 0;
for (int slot = 0; slot < DATA_SLOTS; ++slot) {
free(srv->data_buf[slot]);
srv->data_buf[slot] = NULL;
srv->data_inuse[slot] = 0;
}

// close server socket
int ret = 0;
Expand Down
66 changes: 51 additions & 15 deletions src/sdr.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ struct sdr_dev {

int running;
int polling;
void *buffer;
size_t buffer_size;
uint8_t *buffer; ///< sdr data buffer current and past frames
size_t buffer_size; ///< sdr data buffer overall size (num * len)
size_t buffer_pos; ///< sdr data buffer next write position

int sample_size;
int sample_signed;
Expand Down Expand Up @@ -284,20 +285,25 @@ static int rtltcp_close(SOCKET sock)

static int rtltcp_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len)
{
UNUSED(buf_num);
if (dev->buffer_size != buf_len) {
size_t buffer_size = buf_num * buf_len;
if (dev->buffer_size != buffer_size) {
free(dev->buffer);
dev->buffer = malloc(buf_len);
dev->buffer = malloc(buffer_size);
if (!dev->buffer) {
WARN_MALLOC("rtltcp_read_loop()");
return -1; // NOTE: returns error on alloc failure.
}
dev->buffer_size = buf_len;
dev->buffer_size = buffer_size;
dev->buffer_pos = 0;
}
uint8_t *buffer = dev->buffer;

dev->running = 1;
do {
if (dev->buffer_pos + buf_len > buffer_size)
dev->buffer_pos = 0;
uint8_t *buffer = &dev->buffer[dev->buffer_pos];
dev->buffer_pos += buf_len;

unsigned n_read = 0;
int r;

Expand Down Expand Up @@ -493,18 +499,38 @@ static int rtlsdr_find_tuner_gain(sdr_dev_t *dev, int centigain, int verbose)
static void rtlsdr_read_cb(unsigned char *iq_buf, uint32_t len, void *ctx)
{
sdr_dev_t *dev = ctx;

if (dev->buffer_pos + len > dev->buffer_size)
dev->buffer_pos = 0;
uint8_t *buffer = &dev->buffer[dev->buffer_pos];
dev->buffer_pos += len;

// NOTE: we need to copy the buffer, it might go away on cancel_async
memcpy(buffer, iq_buf, len);

sdr_event_t ev = {
.ev = SDR_EV_DATA,
.buf = iq_buf,
.buf = buffer,
.len = len,
};
if (len > 0) // prevent a crash in callback
dev->rtlsdr_cb(&ev, dev->rtlsdr_cb_ctx);
// NOTE: we actually need to copy the buffer to prevent it going away on cancel_async
}

static int rtlsdr_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len)
{
size_t buffer_size = buf_num * buf_len;
if (dev->buffer_size != buffer_size) {
free(dev->buffer);
dev->buffer = malloc(buffer_size);
if (!dev->buffer) {
WARN_MALLOC("rtlsdr_read_loop()");
return -1; // NOTE: returns error on alloc failure.
}
dev->buffer_size = buffer_size;
dev->buffer_pos = 0;
}

int r = 0;

dev->rtlsdr_cb = cb;
Expand Down Expand Up @@ -923,22 +949,27 @@ static int sdr_open_soapy(sdr_dev_t **out_dev, char const *dev_query, int verbos

static int soapysdr_read_loop(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, uint32_t buf_len)
{
UNUSED(buf_num);
if (dev->buffer_size != buf_len) {
size_t buffer_size = buf_num * buf_len;
if (dev->buffer_size != buffer_size) {
free(dev->buffer);
dev->buffer = malloc(buf_len);
dev->buffer = malloc(buffer_size);
if (!dev->buffer) {
WARN_CALLOC("soapysdr_read_loop()");
WARN_MALLOC("soapysdr_read_loop()");
return -1; // NOTE: returns error on alloc failure.
}
dev->buffer_size = buf_len;
dev->buffer_size = buffer_size;
dev->buffer_pos = 0;
}
int16_t *buffer = dev->buffer;

size_t buf_elems = buf_len / dev->sample_size;

dev->running = 1;
do {
if (dev->buffer_pos + buf_len > buffer_size)
dev->buffer_pos = 0;
int16_t *buffer = (void *)&dev->buffer[dev->buffer_pos];
dev->buffer_pos += buf_len;

void *buffs[] = {buffer};
int flags = 0;
long long timeNs = 0;
Expand Down Expand Up @@ -1567,6 +1598,11 @@ int sdr_start(sdr_dev_t *dev, sdr_event_cb_t cb, void *ctx, uint32_t buf_num, ui
if (!dev)
return -1;

if (buf_num == 0)
buf_num = SDR_DEFAULT_BUF_NUMBER;
if (buf_len == 0)
buf_len = SDR_DEFAULT_BUF_LENGTH;

if (dev->rtl_tcp)
return rtltcp_read_loop(dev, cb, ctx, buf_num, buf_len);

Expand Down

0 comments on commit 7d7b7bf

Please sign in to comment.