Skip to content

Commit

Permalink
TT#50652 create timer to gradually send packets without jitter
Browse files Browse the repository at this point in the history
Change-Id: I5b28f7514f54c6df9f3ee607e3bca76274b431b3
  • Loading branch information
rfuchs committed Mar 6, 2019
1 parent d639223 commit 3bdad88
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 17 deletions.
2 changes: 2 additions & 0 deletions daemon/call.c
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,7 @@ struct packet_stream *__packet_stream_new(struct call *call) {
atomic64_set_na(&stream->last_packet, rtpe_now.tv_sec);
stream->rtp_stats = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, __rtp_stats_free);
recording_init_stream(stream);
stream->send_timer = send_timer_new(stream);

return stream;
}
Expand Down Expand Up @@ -2220,6 +2221,7 @@ void call_destroy(struct call *c) {
rtpe_now.tv_sec - atomic64_get(&ps->last_packet));

statistics_update_totals(ps);
send_timer_put(&ps->send_timer);

}

Expand Down
38 changes: 34 additions & 4 deletions daemon/codec.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ struct codec_ssrc_handler {
int bytes_per_packet;
unsigned long first_ts; // for output TS scaling
unsigned long ts_in; // for DTMF dupe detection
struct timeval first_send;
unsigned long first_send_ts;
GString *sample_buffer;
};
struct transcode_packet {
Expand Down Expand Up @@ -563,7 +565,7 @@ void codec_handlers_free(struct call_media *m) {


void codec_add_raw_packet(struct media_packet *mp) {
struct codec_packet *p = g_slice_alloc(sizeof(*p));
struct codec_packet *p = g_slice_alloc0(sizeof(*p));
p->s = mp->raw;
p->free_func = NULL;
if (mp->rtp && mp->ssrc_out)
Expand Down Expand Up @@ -682,14 +684,14 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch,
struct codec_handler *handler, // normally == ch->handler except for DTMF
char *buf, // malloc'd, room for rtp_header + filled-in payload
unsigned int payload_len,
unsigned int payload_ts,
unsigned long payload_ts,
int marker, int seq, int seq_inc)
{
struct rtp_header *rh = (void *) buf;
struct ssrc_ctx *ssrc_out = mp->ssrc_out;
struct ssrc_entry_call *ssrc_out_p = ssrc_out->parent;
// reconstruct RTP header
unsigned int ts = payload_ts;
unsigned long ts = payload_ts;
ZERO(*rh);
rh->v_p_x_cc = 0x80;
rh->m_pt = handler->dest_pt.payload_type | (marker ? 0x80 : 0);
Expand All @@ -701,11 +703,39 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch,
rh->ssrc = htonl(ssrc_out_p->h.ssrc);

// add to output queue
struct codec_packet *p = g_slice_alloc(sizeof(*p));
struct codec_packet *p = g_slice_alloc0(sizeof(*p));
p->s.s = buf;
p->s.len = payload_len + sizeof(struct rtp_header);
payload_tracker_add(&ssrc_out->tracker, handler->dest_pt.payload_type);
p->free_func = free;

// this packet is dynamically allocated, so we're able to schedule it.
// determine scheduled time to send
if (ch->first_send.tv_sec) {
// scale first_send from first_send_ts to ts
p->to_send = ch->first_send;
uint32_t ts_diff = (uint32_t) ts - (uint32_t) ch->first_send_ts; // allow for wrap-around
unsigned long long ts_diff_us =
(unsigned long long) ts_diff * 1000000 / ch->encoder_format.clockrate
* ch->handler->dest_pt.codec_def->clockrate_mult;
timeval_add_usec(&p->to_send, ts_diff_us);

// how far in the future is this?
ts_diff_us = timeval_diff(&p->to_send, &rtpe_now); // negative wrap-around to positive OK

if (ts_diff_us > 1000000) // more than one second, can't be right
ch->first_send.tv_sec = 0; // fix it up below
}
if (!ch->first_send.tv_sec) {
p->to_send = ch->first_send = rtpe_now;
ch->first_send_ts = ts;
}
ilog(LOG_DEBUG, "Scheduling to send RTP packet (seq %u TS %lu) at %lu.%06lu",
ntohs(rh->seq_num),
ts,
(long unsigned) p->to_send.tv_sec,
(long unsigned) p->to_send.tv_usec);

g_queue_push_tail(&mp->packets_out, p);

atomic64_inc(&ssrc_out->packets);
Expand Down
4 changes: 3 additions & 1 deletion daemon/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -786,8 +786,10 @@ int main(int argc, char **argv) {

if (rtpe_config.media_num_threads < 0)
rtpe_config.media_num_threads = rtpe_config.num_threads;
for (idx = 0; idx < rtpe_config.media_num_threads; ++idx)
for (idx = 0; idx < rtpe_config.media_num_threads; ++idx) {
thread_create_detach_prio(media_player_loop, NULL, rtpe_config.scheduling, rtpe_config.priority);
thread_create_detach_prio(send_timer_loop, NULL, rtpe_config.scheduling, rtpe_config.priority);
}


while (!rtpe_shutdown) {
Expand Down
126 changes: 126 additions & 0 deletions daemon/media_player.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@


static struct timerthread media_player_thread;
static struct timerthread send_timer_thread;



Expand Down Expand Up @@ -83,6 +84,85 @@ struct media_player *media_player_new(struct call_monologue *ml) {
}


static void __send_timer_free(void *p) {
struct send_timer *st = p;

ilog(LOG_DEBUG, "freeing send_timer");

g_queue_clear_full(&st->packets, codec_packet_free);
mutex_destroy(&st->lock);
obj_put(st->call);
}


// call->master_lock held in W
struct send_timer *send_timer_new(struct packet_stream *ps) {
ilog(LOG_DEBUG, "creating send_timer");

struct send_timer *st = obj_alloc0("send_timer", sizeof(*st), __send_timer_free);
st->tt_obj.tt = &send_timer_thread;
mutex_init(&st->lock);
st->call = obj_get(ps->call);
st->sink = ps;
g_queue_init(&st->packets);

return st;
}


// st->stream->out_lock (or call->master_lock/W) must be held already
static int send_timer_send(struct send_timer *st, struct codec_packet *cp) {
if (cp->to_send.tv_sec && timeval_cmp(&cp->to_send, &rtpe_now) > 0)
return -1; // not yet

if (!st->sink->selected_sfd)
goto out;

struct rtp_header *rh = (void *) cp->s.s;
ilog(LOG_DEBUG, "Forward to sink endpoint: %s:%d (RTP seq %u TS %u)",
sockaddr_print_buf(&st->sink->endpoint.address),
st->sink->endpoint.port,
ntohs(rh->seq_num),
ntohl(rh->timestamp));

socket_sendto(&st->sink->selected_sfd->socket,
cp->s.s, cp->s.len, &st->sink->endpoint);

out:
codec_packet_free(cp);

return 0;
}


// st->stream->out_lock (or call->master_lock/W) must be held already
void send_timer_push(struct send_timer *st, struct codec_packet *cp) {
// can we send immediately?
if (!send_timer_send(st, cp))
return;

// queue for sending

struct rtp_header *rh = (void *) cp->s.s;
ilog(LOG_DEBUG, "queuing up packet for delivery at %lu.%06u (RTP seq %u TS %u)",
(unsigned long) cp->to_send.tv_sec,
(unsigned int) cp->to_send.tv_usec,
ntohs(rh->seq_num),
ntohl(rh->timestamp));

mutex_lock(&st->lock);
unsigned int qlen = st->packets.length;
// this hands over ownership of cp, so we must copy the timeval out
struct timeval tv_send = cp->to_send;
g_queue_push_tail(&st->packets, cp);
mutex_unlock(&st->lock);

// first packet in? we're probably not scheduled yet
if (!qlen)
timerthread_obj_schedule_abs(&st->tt_obj, &tv_send);
}


static int __ensure_codec_handler(struct media_player *mp, AVStream *avs) {
if (mp->handler)
return 0;
Expand Down Expand Up @@ -189,6 +269,10 @@ static void media_player_read_packet(struct media_player *mp) {

mp->handler->func(mp->handler, &packet);

// as this is timing sensitive and we may have spent some time decoding,
// update our global "now" timestamp
gettimeofday(&rtpe_now, NULL);

mutex_lock(&mp->sink->out_lock);
if (media_socket_dequeue(&packet, mp->sink))
ilog(LOG_ERR, "Error sending playback media to RTP sink");
Expand Down Expand Up @@ -234,6 +318,8 @@ static int media_player_play_init(struct media_player *mp) {
// call->master_lock held in W
static void media_player_play_start(struct media_player *mp) {
mp->next_run = rtpe_now;
// give ourselves a bit of a head start with decoding
timeval_add_usec(&mp->next_run, -50000);
media_player_read_packet(mp);
}

Expand Down Expand Up @@ -365,12 +451,52 @@ static void media_player_run(void *ptr) {
}


static void send_timer_run(void *ptr) {
struct send_timer *st = ptr;
struct call *call = st->call;

log_info_call(call);

ilog(LOG_DEBUG, "running scheduled send_timer");

struct timeval next_send = {0,};

rwlock_lock_r(&call->master_lock);
mutex_lock(&st->lock);

while (st->packets.length) {
struct codec_packet *cp = st->packets.head->data;
// XXX this could be made lock-free
if (!send_timer_send(st, cp)) {
g_queue_pop_head(&st->packets);
continue;
}
// couldn't send the last one. remember time to schedule
next_send = cp->to_send;
break;
}

mutex_unlock(&st->lock);
rwlock_unlock_r(&call->master_lock);

if (next_send.tv_sec)
timerthread_obj_schedule_abs(&st->tt_obj, &next_send);

log_info_clear();
}


void media_player_init(void) {
timerthread_init(&media_player_thread, media_player_run);
timerthread_init(&send_timer_thread, send_timer_run);
}


void media_player_loop(void *p) {
ilog(LOG_DEBUG, "media_player_loop");
timerthread_run(&media_player_thread);
}
void send_timer_loop(void *p) {
ilog(LOG_DEBUG, "send_timer_loop");
timerthread_run(&send_timer_thread);
}
15 changes: 3 additions & 12 deletions daemon/media_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "iptables.h"
#include "main.h"
#include "codec.h"
#include "media_player.h"


#ifndef PORT_RANDOM_MIN
Expand Down Expand Up @@ -1575,18 +1576,8 @@ static int do_rtcp(struct packet_handler_ctx *phc) {
// appropriate locks must be held
int media_socket_dequeue(struct media_packet *mp, struct packet_stream *sink) {
struct codec_packet *p;
while ((p = g_queue_pop_head(&mp->packets_out))) {
__C_DBG("Forward to sink endpoint: %s:%d", sockaddr_print_buf(&sink->endpoint.address),
sink->endpoint.port);

int ret = socket_sendto(&sink->selected_sfd->socket,
p->s.s, p->s.len, &sink->endpoint);

codec_packet_free(p);

if (ret == -1)
return -1;
}
while ((p = g_queue_pop_head(&mp->packets_out)))
send_timer_push(sink->send_timer, p);
return 0;
}

Expand Down
2 changes: 2 additions & 0 deletions include/call.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ struct ssrc_hash;
struct codec_handler;
struct rtp_payload_type;
struct media_player;
struct send_timer;


typedef bencode_buffer_t call_buffer_t;
Expand Down Expand Up @@ -281,6 +282,7 @@ struct packet_stream {
struct crypto_context crypto; /* OUT direction, LOCK: out_lock */
struct ssrc_ctx *ssrc_in, /* LOCK: in_lock */ // XXX eliminate these
*ssrc_out; /* LOCK: out_lock */
struct send_timer *send_timer; /* RO */

struct stats stats;
struct stats kernel_stats;
Expand Down
2 changes: 2 additions & 0 deletions include/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@


#include <glib.h>
#include <sys/time.h>
#include "str.h"
#include "codeclib.h"
#include "aux.h"
Expand Down Expand Up @@ -36,6 +37,7 @@ struct codec_handler {
struct codec_packet {
str s;
void (*free_func)(void *);
struct timeval to_send;
};


Expand Down
20 changes: 20 additions & 0 deletions include/media_player.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ struct call_monologue;
struct codec_handler;
struct ssrc_ctx;
struct packet_stream;
struct codec_packet;


struct media_player {
Expand All @@ -38,6 +39,14 @@ struct media_player {
str read_pos;
};

struct send_timer {
struct timerthread_obj tt_obj;
mutex_t lock;
struct call *call; // main reference that keeps this alive
struct packet_stream *sink;
GQueue packets;
};


struct media_player *media_player_new(struct call_monologue *);
int media_player_play_file(struct media_player *, const str *);
Expand All @@ -47,6 +56,11 @@ void media_player_stop(struct media_player *);
void media_player_init(void);
void media_player_loop(void *);

struct send_timer *send_timer_new(struct packet_stream *);
void send_timer_push(struct send_timer *, struct codec_packet *);

void send_timer_loop(void *p);



INLINE void media_player_put(struct media_player **mp) {
Expand All @@ -55,6 +69,12 @@ INLINE void media_player_put(struct media_player **mp) {
obj_put(&(*mp)->tt_obj);
*mp = NULL;
}
INLINE void send_timer_put(struct send_timer **st) {
if (!*st)
return;
obj_put(&(*st)->tt_obj);
*st = NULL;
}


#endif

0 comments on commit 3bdad88

Please sign in to comment.