diff --git a/ngx_rtmp.h b/ngx_rtmp.h index 07495891f..3067b9d2b 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -214,6 +214,7 @@ typedef struct { /* auto-pushed? */ unsigned auto_pushed:1; unsigned relay:1; + unsigned static_relay:1; /* input stream 0 (reserved by RTMP spec) * is used as free chain link */ diff --git a/ngx_rtmp_auto_push_module.c b/ngx_rtmp_auto_push_module.c index de2481841..19e197c2c 100644 --- a/ngx_rtmp_auto_push_module.c +++ b/ngx_rtmp_auto_push_module.c @@ -401,7 +401,7 @@ ngx_rtmp_auto_push_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) ngx_rtmp_auto_push_conf_t *apcf; ngx_rtmp_auto_push_ctx_t *ctx; - if (s->auto_pushed || s->relay) { + if (s->auto_pushed || (s->relay && !s->static_relay)) { goto next; } diff --git a/ngx_rtmp_relay_module.c b/ngx_rtmp_relay_module.c index bf6dd4871..5cb144c6e 100644 --- a/ngx_rtmp_relay_module.c +++ b/ngx_rtmp_relay_module.c @@ -12,14 +12,18 @@ static ngx_rtmp_play_pt next_play; static ngx_rtmp_delete_stream_pt next_delete_stream; +static ngx_int_t ngx_rtmp_relay_init_process(ngx_cycle_t *cycle); static ngx_int_t ngx_rtmp_relay_postconfiguration(ngx_conf_t *cf); static void * ngx_rtmp_relay_create_app_conf(ngx_conf_t *cf); static char * ngx_rtmp_relay_merge_app_conf(ngx_conf_t *cf, - void *parent, void *child); + void *parent, void *child); static char * ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd, - void *conf); + void *conf); static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, - ngx_rtmp_publish_t *v); + ngx_rtmp_publish_t *v); +static ngx_rtmp_relay_ctx_t * ngx_rtmp_relay_create_connection( + ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name, + ngx_rtmp_relay_target_t *target); /* _____ @@ -37,16 +41,25 @@ static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, typedef struct { - ngx_array_t pulls; /* ngx_rtmp_relay_target_t * */ - ngx_array_t pushes; /* ngx_rtmp_relay_target_t * */ - ngx_log_t *log; - ngx_uint_t nbuckets; - ngx_msec_t buflen; - ngx_msec_t push_reconnect; - ngx_rtmp_relay_ctx_t **ctx; + ngx_array_t pulls; /* ngx_rtmp_relay_target_t * */ + ngx_array_t pushes; /* ngx_rtmp_relay_target_t * */ + ngx_array_t static_pulls; /* ngx_rtmp_relay_target_t * */ + ngx_array_t static_events; /* ngx_event_t * */ + ngx_log_t *log; + ngx_uint_t nbuckets; + ngx_msec_t buflen; + ngx_msec_t push_reconnect; + ngx_msec_t pull_reconnect; + ngx_rtmp_relay_ctx_t **ctx; } ngx_rtmp_relay_app_conf_t; +typedef struct { + ngx_rtmp_conf_ctx_t cctx; + ngx_rtmp_relay_target_t *target; +} ngx_rtmp_relay_static_t; + + #define NGX_RTMP_RELAY_CONNECT_TRANS 1 #define NGX_RTMP_RELAY_CREATE_STREAM_TRANS 2 @@ -89,6 +102,13 @@ static ngx_command_t ngx_rtmp_relay_commands[] = { NGX_RTMP_APP_CONF_OFFSET, offsetof(ngx_rtmp_relay_app_conf_t, push_reconnect), NULL }, + + { ngx_string("pull_reconnect"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1, + ngx_conf_set_msec_slot, + NGX_RTMP_APP_CONF_OFFSET, + offsetof(ngx_rtmp_relay_app_conf_t, pull_reconnect), + NULL }, ngx_null_command @@ -114,7 +134,7 @@ ngx_module_t ngx_rtmp_relay_module = { NGX_RTMP_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ - NULL, /* init process */ + ngx_rtmp_relay_init_process, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ @@ -133,14 +153,31 @@ ngx_rtmp_relay_create_app_conf(ngx_conf_t *cf) return NULL; } - ngx_array_init(&racf->pushes, cf->pool, 1, - sizeof(ngx_rtmp_relay_target_t *)); - ngx_array_init(&racf->pulls, cf->pool, 1, - sizeof(ngx_rtmp_relay_target_t *)); + if (ngx_array_init(&racf->pushes, cf->pool, 1, sizeof(void *)) != NGX_OK) { + return NULL; + } + + if (ngx_array_init(&racf->pulls, cf->pool, 1, sizeof(void *)) != NGX_OK) { + return NULL; + } + + if (ngx_array_init(&racf->static_pulls, cf->pool, 1, sizeof(void *)) + != NGX_OK) + { + return NULL; + } + + if (ngx_array_init(&racf->static_events, cf->pool, 1, sizeof(void *)) + != NGX_OK) + { + return NULL; + } + racf->nbuckets = 1024; racf->log = &cf->cycle->new_log; racf->buflen = NGX_CONF_UNSET; racf->push_reconnect = NGX_CONF_UNSET; + racf->pull_reconnect = NGX_CONF_UNSET; return racf; } @@ -158,13 +195,40 @@ ngx_rtmp_relay_merge_app_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_msec_value(conf->buflen, prev->buflen, 5000); ngx_conf_merge_msec_value(conf->push_reconnect, prev->push_reconnect, 3000); + ngx_conf_merge_msec_value(conf->pull_reconnect, prev->pull_reconnect, + 3000); return NGX_CONF_OK; } static void -ngx_rtmp_relay_reconnect(ngx_event_t *ev) +ngx_rtmp_relay_static_pull_reconnect(ngx_event_t *ev) +{ + ngx_rtmp_relay_static_t *rs = ev->data; + + ngx_rtmp_relay_ctx_t *ctx; + ngx_rtmp_relay_app_conf_t *racf; + + racf = ngx_rtmp_get_module_app_conf(&rs->cctx, ngx_rtmp_relay_module); + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0, + "relay: reconnecting static pull"); + + ctx = ngx_rtmp_relay_create_connection(&rs->cctx, &rs->target->name, + rs->target); + if (ctx) { + ctx->session->static_relay = 1; + ctx->static_evt = ev; + return; + } + + ngx_add_timer(ev, racf->pull_reconnect); +} + + +static void +ngx_rtmp_relay_push_reconnect(ngx_event_t *ev) { ngx_rtmp_session_t *s = ev->data; @@ -257,14 +321,14 @@ ngx_rtmp_relay_copy_str(ngx_pool_t *pool, ngx_str_t *dst, ngx_str_t *src) static ngx_rtmp_relay_ctx_t * -ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name, +ngx_rtmp_relay_create_connection(ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name, ngx_rtmp_relay_target_t *target) { + ngx_rtmp_relay_app_conf_t *racf; ngx_rtmp_relay_ctx_t *rctx; ngx_rtmp_addr_conf_t *addr_conf; ngx_rtmp_conf_ctx_t *addr_ctx; ngx_rtmp_session_t *rs; - ngx_rtmp_relay_app_conf_t *racf; ngx_peer_connection_t *pc; ngx_connection_t *c; ngx_pool_t *pool; @@ -272,10 +336,10 @@ ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name, ngx_str_t v, *uri; u_char *first, *last, *p; - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "relay: create remote context"); + racf = ngx_rtmp_get_module_app_conf(cctx, ngx_rtmp_relay_module); - racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module); + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0, + "relay: create remote context"); pool = NULL; pool = ngx_create_pool(4096, racf->log); @@ -288,9 +352,11 @@ ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name, goto clear; } - if (ngx_rtmp_relay_copy_str(pool, &rctx->name, name) != NGX_OK || - ngx_rtmp_relay_copy_str(pool, &rctx->url, &target->url.url) != NGX_OK) - { + if (name && ngx_rtmp_relay_copy_str(pool, &rctx->name, name) != NGX_OK) { + goto clear; + } + + if (ngx_rtmp_relay_copy_str(pool, &rctx->url, &target->url.url) != NGX_OK) { goto clear; } @@ -357,8 +423,6 @@ ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name, } } - rctx->relay = 1; - pc = ngx_pcalloc(pool, sizeof(ngx_peer_connection_t)); if (pc == NULL) { goto clear; @@ -395,8 +459,8 @@ ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name, goto clear; } addr_conf->ctx = addr_ctx; - addr_ctx->main_conf = s->main_conf; - addr_ctx->srv_conf = s->srv_conf; + addr_ctx->main_conf = cctx->main_conf; + addr_ctx->srv_conf = cctx->srv_conf; ngx_str_set(&addr_conf->addr_text, "ngx-relay"); rs = ngx_rtmp_init_session(c, addr_conf); @@ -404,7 +468,7 @@ ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name, /* no need to destroy pool */ return NULL; } - rs->app_conf = s->app_conf; + rs->app_conf = cctx->app_conf; rs->relay = 1; rctx->session = rs; ngx_rtmp_set_ctx(rs, rctx, ngx_rtmp_relay_module); @@ -421,6 +485,20 @@ ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name, } +static ngx_rtmp_relay_ctx_t * +ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name, + ngx_rtmp_relay_target_t *target) +{ + ngx_rtmp_conf_ctx_t cctx; + + cctx.app_conf = s->app_conf; + cctx.srv_conf = s->srv_conf; + cctx.main_conf = s->main_conf; + + return ngx_rtmp_relay_create_connection(&cctx, name, target); +} + + static ngx_rtmp_relay_ctx_t * ngx_rtmp_relay_create_local_ctx(ngx_rtmp_session_t *s, ngx_str_t *name, ngx_rtmp_relay_target_t *target) @@ -442,7 +520,7 @@ ngx_rtmp_relay_create_local_ctx(ngx_rtmp_session_t *s, ngx_str_t *name, ctx->push_evt.data = s; ctx->push_evt.log = s->connection->log; - ctx->push_evt.handler = ngx_rtmp_relay_reconnect; + ctx->push_evt.handler = ngx_rtmp_relay_push_reconnect; if (ctx->publish) { return NULL; @@ -554,7 +632,7 @@ ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) } ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); - if (ctx && ctx->relay) { + if (ctx && s->relay) { goto next; } @@ -606,7 +684,7 @@ ngx_rtmp_relay_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) ngx_rtmp_relay_ctx_t *ctx; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); - if (ctx && ctx->relay) { + if (ctx && s->relay) { goto next; } @@ -1022,7 +1100,7 @@ ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); - if (ctx == NULL || !ctx->relay) { + if (ctx == NULL || !s->relay) { return NGX_OK; } @@ -1042,7 +1120,7 @@ ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, return ngx_rtmp_relay_send_create_stream(s); case NGX_RTMP_RELAY_CREATE_STREAM_TRANS: - if (ctx->publish != ctx) { + if (ctx->publish != ctx && !s->static_relay) { if (ngx_rtmp_relay_send_publish(s) != NGX_OK) { return NGX_ERROR; } @@ -1105,7 +1183,7 @@ ngx_rtmp_relay_on_error(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); - if (ctx == NULL || !ctx->relay) { + if (ctx == NULL || !s->relay) { return NGX_OK; } @@ -1175,7 +1253,7 @@ ngx_rtmp_relay_on_status(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); - if (ctx == NULL || !ctx->relay) { + if (ctx == NULL || !s->relay) { return NGX_OK; } @@ -1200,10 +1278,10 @@ static ngx_int_t ngx_rtmp_relay_handshake_done(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { - ngx_rtmp_relay_ctx_t *ctx; + ngx_rtmp_relay_ctx_t *ctx; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); - if (ctx == NULL || ctx->publish == NULL) { + if (ctx == NULL || !s->relay) { return NGX_OK; } @@ -1221,7 +1299,16 @@ ngx_rtmp_relay_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v) racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module); ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module); - if (ctx == NULL || ctx->publish == NULL) { + if (ctx == NULL) { + goto next; + } + + if (s->static_relay) { + ngx_add_timer(ctx->static_evt, racf->pull_reconnect); + goto next; + } + + if (ctx->publish == NULL) { goto next; } @@ -1239,7 +1326,7 @@ ngx_rtmp_relay_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v) &ctx->app, &ctx->name); /* push reconnect */ - if (ctx->relay && ctx->tag == &ngx_rtmp_relay_module && + if (s->relay && ctx->tag == &ngx_rtmp_relay_module && !ctx->publish->push_evt.timer_set) { ngx_add_timer(&ctx->publish->push_evt, racf->push_reconnect); @@ -1255,7 +1342,7 @@ ngx_rtmp_relay_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v) } #endif - if (ctx->publish->play == NULL && ctx->publish->relay) { + if (ctx->publish->play == NULL && ctx->publish->session->relay) { ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ctx->publish->session->connection->log, 0, "relay: publish disconnect empty app='%V' name='%V'", @@ -1306,28 +1393,23 @@ ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) ngx_rtmp_relay_target_t *target, **t; ngx_url_t *u; ngx_uint_t i; + ngx_int_t is_pull, is_static; + ngx_event_t **ee, *e; + ngx_rtmp_relay_static_t *rs; u_char *p; value = cf->args->elts; racf = ngx_rtmp_conf_get_module_app_conf(cf, ngx_rtmp_relay_module); - t = ngx_array_push(value[0].data[3] == 'h' - ? &racf->pushes /* push */ - : &racf->pulls /* pull */ - ); - - if (t == NULL) { - return NGX_CONF_ERROR; - } + is_pull = (value[0].data[3] == 'l'); + is_static = 0; target = ngx_pcalloc(cf->pool, sizeof(*target)); if (target == NULL) { return NGX_CONF_ERROR; } - *t = target; - target->tag = &ngx_rtmp_relay_module; target->data = target; @@ -1352,57 +1434,169 @@ ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) value += 2; for (i = 2; i < cf->args->nelts; ++i, ++value) { p = ngx_strlchr(value->data, value->data + value->len, '='); - if (p == NULL) { - return "key=value expected"; - } - if (p == value->data + value->len - 1) { - continue; - } + if (p == NULL) { + n = *value; + ngx_str_set(&v, "1"); - n.data = value->data; - n.len = p - value->data; + } else { + n.data = value->data; + n.len = p - value->data; - v.data = p + 1; - v.len = value->data + value->len - p - 1; + v.data = p + 1; + v.len = value->data + value->len - p - 1; + } #define NGX_RTMP_RELAY_STR_PAR(name, var) \ - if (n.len == sizeof(#name) - 1 \ - && ngx_strncasecmp(n.data, (u_char *)#name, n.len) == 0) \ + if (n.len == sizeof(name) - 1 \ + && ngx_strncasecmp(n.data, (u_char *) name, n.len) == 0) \ { \ target->var = v; \ continue; \ } #define NGX_RTMP_RELAY_NUM_PAR(name, var) \ - if (n.len == sizeof(#name) - 1 \ - && ngx_strncasecmp(n.data, (u_char *)#name, n.len) == 0) \ + if (n.len == sizeof(name) - 1 \ + && ngx_strncasecmp(n.data, (u_char *) name, n.len) == 0) \ { \ target->var = ngx_atoi(v.data, v.len); \ continue; \ } - NGX_RTMP_RELAY_STR_PAR(app, app); - NGX_RTMP_RELAY_STR_PAR(name, name); - NGX_RTMP_RELAY_STR_PAR(tcUrl, tc_url); - NGX_RTMP_RELAY_STR_PAR(pageUrl, page_url); - NGX_RTMP_RELAY_STR_PAR(swfUrl, swf_url); - NGX_RTMP_RELAY_STR_PAR(flashVer, flash_ver); - NGX_RTMP_RELAY_STR_PAR(playPath, play_path); - NGX_RTMP_RELAY_NUM_PAR(live, live); - NGX_RTMP_RELAY_NUM_PAR(start, start); - NGX_RTMP_RELAY_NUM_PAR(stop, stop); + NGX_RTMP_RELAY_STR_PAR("app", app); + NGX_RTMP_RELAY_STR_PAR("name", name); + NGX_RTMP_RELAY_STR_PAR("tcUrl", tc_url); + NGX_RTMP_RELAY_STR_PAR("pageUrl", page_url); + NGX_RTMP_RELAY_STR_PAR("swfUrl", swf_url); + NGX_RTMP_RELAY_STR_PAR("flashVer", flash_ver); + NGX_RTMP_RELAY_STR_PAR("playPath", play_path); + NGX_RTMP_RELAY_NUM_PAR("live", live); + NGX_RTMP_RELAY_NUM_PAR("start", start); + NGX_RTMP_RELAY_NUM_PAR("stop", stop); #undef NGX_RTMP_RELAY_STR_PAR #undef NGX_RTMP_RELAY_NUM_PAR + if (n.len == sizeof("static") - 1 && + ngx_strncasecmp(n.data, (u_char *) "static", n.len) == 0 && + ngx_atoi(v.data, v.len)) + { + is_static = 1; + continue; + } + return "unsuppored parameter"; } + if (is_static) { + + if (!is_pull) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "static push is not allowed"); + return NGX_CONF_ERROR; + } + + if (target->name.len == 0) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "stream name missing in static pull " + "declaration"); + return NGX_CONF_ERROR; + } + + ee = ngx_array_push(&racf->static_events); + if (ee == NULL) { + return NGX_CONF_ERROR; + } + + e = ngx_pcalloc(cf->pool, sizeof(ngx_event_t)); + if (e == NULL) { + return NGX_CONF_ERROR; + } + + *ee = e; + + rs = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_relay_static_t)); + if (rs == NULL) { + return NGX_CONF_ERROR; + } + + rs->target = target; + + e->data = rs; + e->log = &cf->cycle->new_log; + e->handler = ngx_rtmp_relay_static_pull_reconnect; + + t = ngx_array_push(&racf->static_pulls); + + } else if (is_pull) { + t = ngx_array_push(&racf->pulls); + + } else { + t = ngx_array_push(&racf->pushes); + } + + if (t == NULL) { + return NGX_CONF_ERROR; + } + + *t = target; + return NGX_CONF_OK; } +static ngx_int_t +ngx_rtmp_relay_init_process(ngx_cycle_t *cycle) +{ + ngx_rtmp_core_main_conf_t *cmcf = ngx_rtmp_core_main_conf; + ngx_rtmp_core_srv_conf_t **pcscf, *cscf; + ngx_rtmp_core_app_conf_t **pcacf, *cacf; + ngx_rtmp_relay_app_conf_t *racf; + ngx_uint_t n, m, k; + ngx_rtmp_relay_static_t *rs; + ngx_rtmp_listen_t *lst; + ngx_event_t **pevent, *event; + + if (cmcf->listen.nelts == 0) { + return NGX_OK; + } + + /* only first worker does static pulling */ + + if (ngx_process_slot) { + return NGX_OK; + } + + lst = cmcf->listen.elts; + + pcscf = cmcf->servers.elts; + for (n = 0; n < cmcf->servers.nelts; ++n, ++pcscf) { + + cscf = *pcscf; + pcacf = cscf->applications.elts; + + for (m = 0; m < cscf->applications.nelts; ++m, ++pcacf) { + + cacf = *pcacf; + racf = cacf->app_conf[ngx_rtmp_relay_module.ctx_index]; + pevent = racf->static_events.elts; + + for (k = 0; k < racf->static_events.nelts; ++k, ++pevent) { + event = *pevent; + + rs = event->data; + rs->cctx = *lst->ctx; + rs->cctx.app_conf = cacf->app_conf; + + ngx_post_event(event, &ngx_posted_events); + } + } + } + + return NGX_OK; +} + + static ngx_int_t ngx_rtmp_relay_postconfiguration(ngx_conf_t *cf) { diff --git a/ngx_rtmp_relay_module.h b/ngx_rtmp_relay_module.h index 7e676bf19..cec70e086 100644 --- a/ngx_rtmp_relay_module.h +++ b/ngx_rtmp_relay_module.h @@ -38,7 +38,6 @@ struct ngx_rtmp_relay_ctx_s { ngx_rtmp_relay_ctx_t *publish; ngx_rtmp_relay_ctx_t *play; ngx_rtmp_relay_ctx_t *next; - unsigned relay:1; ngx_str_t app; ngx_str_t tc_url; @@ -51,6 +50,7 @@ struct ngx_rtmp_relay_ctx_s { ngx_int_t stop; ngx_event_t push_evt; + ngx_event_t *static_evt; void *tag; void *data; };