58 Star 430 Fork 129

GVPwinshining / nginx-http-flv-module

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
ngx_rtmp_auto_push_module.c 20.77 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700
/*
* Copyright (C) Roman Arutyunyan
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include "ngx_rtmp_cmd_module.h"
#include "ngx_rtmp_relay_module.h"
static ngx_rtmp_publish_pt next_publish;
static ngx_rtmp_delete_stream_pt next_delete_stream;
static ngx_int_t ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle);
static void ngx_rtmp_auto_push_exit_process(ngx_cycle_t *cycle);
static void * ngx_rtmp_auto_push_create_conf(ngx_cycle_t *cf);
static char * ngx_rtmp_auto_push_init_conf(ngx_cycle_t *cycle, void *conf);
#if (NGX_HAVE_UNIX_DOMAIN)
static ngx_int_t ngx_rtmp_auto_push_publish(ngx_rtmp_session_t *s,
ngx_rtmp_publish_t *v);
static ngx_int_t ngx_rtmp_auto_push_delete_stream(ngx_rtmp_session_t *s,
ngx_rtmp_delete_stream_t *v);
#endif
typedef struct ngx_rtmp_auto_push_ctx_s ngx_rtmp_auto_push_ctx_t;
struct ngx_rtmp_auto_push_ctx_s {
ngx_int_t *slots; /* NGX_MAX_PROCESSES */
u_char name[NGX_RTMP_MAX_NAME];
u_char args[NGX_RTMP_MAX_ARGS];
ngx_event_t push_evt;
};
typedef struct {
ngx_flag_t auto_push;
ngx_str_t socket_dir;
ngx_msec_t push_reconnect;
} ngx_rtmp_auto_push_conf_t;
static ngx_command_t ngx_rtmp_auto_push_commands[] = {
{ ngx_string("rtmp_auto_push"),
NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
0,
offsetof(ngx_rtmp_auto_push_conf_t, auto_push),
NULL },
{ ngx_string("rtmp_auto_push_reconnect"),
NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
0,
offsetof(ngx_rtmp_auto_push_conf_t, push_reconnect),
NULL },
{ ngx_string("rtmp_socket_dir"),
NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
0,
offsetof(ngx_rtmp_auto_push_conf_t, socket_dir),
NULL },
ngx_null_command
};
static ngx_core_module_t ngx_rtmp_auto_push_module_ctx = {
ngx_string("rtmp_auto_push"),
ngx_rtmp_auto_push_create_conf, /* create conf */
ngx_rtmp_auto_push_init_conf /* init conf */
};
ngx_module_t ngx_rtmp_auto_push_module = {
NGX_MODULE_V1,
&ngx_rtmp_auto_push_module_ctx, /* module context */
ngx_rtmp_auto_push_commands, /* module directives */
NGX_CORE_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
ngx_rtmp_auto_push_init_process, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
ngx_rtmp_auto_push_exit_process, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
static ngx_rtmp_module_t ngx_rtmp_auto_push_index_module_ctx = {
NULL, /* preconfiguration */
NULL, /* postconfiguration */
NULL, /* create main configuration */
NULL, /* init main configuration */
NULL, /* create server configuration */
NULL, /* merge server configuration */
NULL, /* create app configuration */
NULL /* merge app configuration */
};
ngx_module_t ngx_rtmp_auto_push_index_module = {
NGX_MODULE_V1,
&ngx_rtmp_auto_push_index_module_ctx, /* module context */
NULL, /* module directives */
NGX_RTMP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
#define NGX_RTMP_AUTO_PUSH_SOCKNAME "nginx-http-flv"
static ngx_int_t
ngx_rtmp_auto_push_init_process(ngx_cycle_t *cycle)
{
#if (NGX_HAVE_UNIX_DOMAIN)
ngx_rtmp_auto_push_conf_t *apcf;
ngx_listening_t *ls, *lss;
struct sockaddr_un *saun;
#if (nginx_version >= 1009011)
ngx_event_t *rev;
ngx_connection_t *c;
ngx_module_t **modules;
ngx_int_t i, auto_push_index, event_core_index;
#endif
int reuseaddr;
ngx_socket_t s;
size_t n;
ngx_file_info_t fi;
ngx_pid_t pid;
if (ngx_process != NGX_PROCESS_WORKER) {
return NGX_OK;
}
apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(cycle->conf_ctx,
ngx_rtmp_auto_push_module);
if (apcf->auto_push == 0) {
return NGX_OK;
}
next_publish = ngx_rtmp_publish;
ngx_rtmp_publish = ngx_rtmp_auto_push_publish;
next_delete_stream = ngx_rtmp_delete_stream;
ngx_rtmp_delete_stream = ngx_rtmp_auto_push_delete_stream;
reuseaddr = 1;
s = (ngx_socket_t) -1;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, cycle->log, 0,
"auto_push: creating sockets");
/*TODO: clone all RTMP listenings? */
ls = cycle->listening.elts;
lss = NULL;
for (n = 0; n < cycle->listening.nelts; ++n, ++ls) {
if (ls->handler == ngx_rtmp_init_connection) {
lss = ls;
break;
}
}
if (lss == NULL) {
return NGX_OK;
}
ls = ngx_array_push(&cycle->listening);
if (ls == NULL) {
return NGX_ERROR;
}
*ls = *lss;
/* Disable unix socket client address extraction
* from accept call
* Nginx generates bad addr_text with this enabled */
ls->addr_ntop = 0;
ls->socklen = sizeof(struct sockaddr_un);
saun = ngx_pcalloc(cycle->pool, ls->socklen);
ls->sockaddr = (struct sockaddr *) saun;
if (ls->sockaddr == NULL) {
return NGX_ERROR;
}
saun->sun_family = AF_UNIX;
pid = ngx_getpid();
*ngx_snprintf((u_char *) saun->sun_path, sizeof(saun->sun_path),
"%V/" NGX_RTMP_AUTO_PUSH_SOCKNAME ".%P",
&apcf->socket_dir, pid)
= 0;
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, cycle->log, 0,
"auto_push: create socket '%s'",
saun->sun_path);
if (ngx_file_info(saun->sun_path, &fi) != ENOENT) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, cycle->log, 0,
"auto_push: delete existing socket '%s'",
saun->sun_path);
ngx_delete_file(saun->sun_path);
}
ls->addr_text.data = (u_char *) saun->sun_path;
ls->addr_text.len = ngx_strlen(saun->sun_path);
s = ngx_socket(AF_UNIX, SOCK_STREAM, 0);
if (s == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
ngx_socket_n " %s failed", saun->sun_path);
return NGX_ERROR;
}
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
(const void *) &reuseaddr, sizeof(int))
== -1)
{
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
"setsockopt(SO_REUSEADDR) %s failed", saun->sun_path);
goto sock_error;
}
if (!(ngx_event_flags & NGX_USE_AIO_EVENT)) {
if (ngx_nonblocking(s) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
ngx_nonblocking_n " %s failed", saun->sun_path);
return NGX_ERROR;
}
}
if (bind(s, (struct sockaddr *) saun, sizeof(*saun)) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
ngx_nonblocking_n " %s bind failed", saun->sun_path);
goto sock_error;
}
if (listen(s, NGX_LISTEN_BACKLOG) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
"listen() to %s, backlog %d failed",
saun->sun_path, NGX_LISTEN_BACKLOG);
goto sock_error;
}
ls->fd = s;
ls->listen = 1;
/* Socket option `SO_REUSEPORT` has been supported since nginx-1.9.1,
* if option `reuseport` is added for the directive `listen`, listening
* structure of unix domain socket in the non-first process will not be
* initialized, in fact `reuseport` is useless for a unix domain socket
* on which there is only a process listening */
#if (NGX_HAVE_REUSEPORT)
ls->reuseport = 0;
#endif
/* for dynamic module */
#if (nginx_version >= 1009011)
auto_push_index = -1;
event_core_index = -1;
modules = cycle->modules;
for (i = 0; modules[i]; ++i) {
if (ngx_strcmp(modules[i]->name, "ngx_event_core_module") == 0) {
event_core_index = i;
}
if (ngx_strcmp(modules[i]->name, "ngx_rtmp_auto_push_module") == 0) {
auto_push_index = i;
}
if (auto_push_index != -1 && event_core_index != -1) {
break;
}
}
if (auto_push_index > event_core_index) {
c = ngx_get_connection(ls->fd, cycle->log);
if (c == NULL) {
goto sock_error;
}
rev = c->read;
#if (nginx_version >= 1009013)
c->type = ls->type;
#endif
c->log = &ls->log;
c->listening = ls;
ls->connection = c;
rev->log = c->log;
rev->accept = 1;
#if (NGX_HAVE_DEFERRED_ACCEPT)
rev->deferred_accept = ls->deferred_accept;
#endif
#if (nginx_version >= 1009013)
rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept
: ngx_event_recvmsg;
#else
rev->handler = ngx_event_accept;
#endif
if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
return NGX_ERROR;
}
}
#endif
return NGX_OK;
sock_error:
if (s != (ngx_socket_t) -1 && ngx_close_socket(s) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
ngx_close_socket_n " %s failed", saun->sun_path);
}
ngx_delete_file(saun->sun_path);
return NGX_ERROR;
#else /* NGX_HAVE_UNIX_DOMAIN */
return NGX_OK;
#endif /* NGX_HAVE_UNIX_DOMAIN */
}
static void
ngx_rtmp_auto_push_exit_process(ngx_cycle_t *cycle)
{
#if (NGX_HAVE_UNIX_DOMAIN)
ngx_rtmp_auto_push_conf_t *apcf;
u_char path[NGX_MAX_PATH];
ngx_listening_t *ls;
ngx_connection_t *c;
size_t n;
ngx_pid_t pid;
apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(cycle->conf_ctx,
ngx_rtmp_auto_push_module);
if (apcf->auto_push == 0) {
return;
}
ls = cycle->listening.elts;
for (n = 0; n < cycle->listening.nelts; ++n, ++ls) {
if ((ls->handler == ngx_rtmp_init_connection) &&
(ls->sockaddr && ls->sockaddr->sa_family == AF_UNIX))
{
c = ls->connection;
if (c) {
if (c->read->active) {
if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {
/*
* delete the old accept events that were bound to
* the old cycle read events array
*/
ngx_del_event(c->read,
NGX_READ_EVENT, NGX_CLOSE_EVENT);
ngx_free_connection(c);
c->fd = (ngx_socket_t) -1;
}
}
}
if (ngx_close_socket(ls->fd) == -1) {
ngx_log_error(NGX_LOG_ERR, cycle->log, ngx_socket_errno,
ngx_close_socket_n "%V failed",
&ls->addr_text);
}
ls->fd = (ngx_socket_t) -1;
break;
}
}
pid = ngx_getpid();
*ngx_snprintf(path, sizeof(path),
"%V/" NGX_RTMP_AUTO_PUSH_SOCKNAME ".%P",
&apcf->socket_dir, pid)
= 0;
ngx_delete_file(path);
#endif
}
static void *
ngx_rtmp_auto_push_create_conf(ngx_cycle_t *cycle)
{
ngx_rtmp_auto_push_conf_t *apcf;
apcf = ngx_pcalloc(cycle->pool, sizeof(ngx_rtmp_auto_push_conf_t));
if (apcf == NULL) {
return NULL;
}
apcf->auto_push = NGX_CONF_UNSET;
apcf->push_reconnect = NGX_CONF_UNSET_MSEC;
return apcf;
}
static char *
ngx_rtmp_auto_push_init_conf(ngx_cycle_t *cycle, void *conf)
{
ngx_rtmp_auto_push_conf_t *apcf = conf;
ngx_conf_init_value(apcf->auto_push, 0);
ngx_conf_init_msec_value(apcf->push_reconnect, 100);
if (apcf->socket_dir.len == 0) {
ngx_str_set(&apcf->socket_dir, "/tmp");
}
return NGX_CONF_OK;
}
#if (NGX_HAVE_UNIX_DOMAIN)
static void
ngx_rtmp_auto_push_reconnect(ngx_event_t *ev)
{
ngx_rtmp_session_t *s = ev->data;
ngx_rtmp_auto_push_conf_t *apcf;
ngx_rtmp_auto_push_ctx_t *ctx;
ngx_int_t *slot;
ngx_int_t n;
ngx_rtmp_relay_target_t at;
u_char path[sizeof("unix:") + NGX_MAX_PATH];
u_char flash_ver[sizeof("APSH ,") +
NGX_INT_T_LEN * 2];
u_char play_path[NGX_RTMP_MAX_NAME];
ngx_str_t name;
u_char *p;
ngx_str_t *u;
ngx_pid_t pid;
ngx_int_t npushed;
ngx_core_conf_t *ccf;
ngx_file_info_t fi;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"auto_push: reconnect");
apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx,
ngx_rtmp_auto_push_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_auto_push_index_module);
if (ctx == NULL) {
return;
}
name.data = ctx->name;
name.len = ngx_strlen(name.data);
ngx_memzero(&at, sizeof(at));
ngx_str_set(&at.page_url, "nginx-auto-push");
at.tag = &ngx_rtmp_auto_push_module;
if (ctx->args[0]) {
at.play_path.data = play_path;
at.play_path.len = ngx_snprintf(play_path, sizeof(play_path),
"%s?%s", ctx->name, ctx->args) -
play_path;
}
slot = ctx->slots;
npushed = 0;
for (n = 0; n < NGX_MAX_PROCESSES; ++n, ++slot) {
if (n == ngx_process_slot) {
continue;
}
pid = ngx_processes[n].pid;
if (pid == 0 || pid == NGX_INVALID_PID) {
continue;
}
if (*slot) {
npushed++;
continue;
}
at.data = &ngx_processes[n];
ngx_memzero(&at.url, sizeof(at.url));
u = &at.url.url;
p = ngx_snprintf(path, sizeof(path) - 1,
"unix:%V/" NGX_RTMP_AUTO_PUSH_SOCKNAME ".%P",
&apcf->socket_dir, pid);
*p = 0;
if (ngx_file_info(path + sizeof("unix:") - 1, &fi) != NGX_OK) {
ngx_log_debug5(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"auto_push: " ngx_file_info_n " failed: "
"slot=%i pid=%P socket='%s'" "url='%V' name='%s'",
n, pid, path, u, ctx->name);
continue;
}
u->data = path;
u->len = p - path;
if (ngx_parse_url(s->connection->pool, &at.url) != NGX_OK) {
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"auto_push: auto-push parse_url failed "
"url='%V' name='%s'",
u, ctx->name);
continue;
}
p = ngx_snprintf(flash_ver, sizeof(flash_ver) - 1, "APSH %i,%i",
(ngx_int_t) ngx_process_slot, (ngx_int_t) ngx_pid);
at.flash_ver.data = flash_ver;
at.flash_ver.len = p - flash_ver;
ngx_log_debug4(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"auto_push: connect slot=%i pid=%P socket='%s' name='%s'",
n, pid, path, ctx->name);
if (ngx_rtmp_relay_push(s, &name, &at) == NGX_OK) {
*slot = 1;
npushed++;
continue;
}
ngx_log_debug5(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"auto_push: connect failed: slot=%i pid=%P socket='%s'"
"url='%V' name='%s'",
n, pid, path, u, ctx->name);
}
ccf = (ngx_core_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx,
ngx_core_module);
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"auto_push: pushed=%i total=%i failed=%i",
npushed, ccf->worker_processes,
ccf->worker_processes - 1 - npushed);
if (ccf->worker_processes == npushed + 1) {
return;
}
/* several workers failed */
slot = ctx->slots;
for (n = 0; n < NGX_MAX_PROCESSES; ++n, ++slot) {
pid = ngx_processes[n].pid;
if (n == ngx_process_slot || *slot == 1 ||
pid == 0 || pid == NGX_INVALID_PID)
{
continue;
}
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"auto_push: connect failed: slot=%i pid=%P name='%s'",
n, pid, ctx->name);
}
if (!ctx->push_evt.timer_set) {
ngx_add_timer(&ctx->push_evt, apcf->push_reconnect);
}
}
static ngx_int_t
ngx_rtmp_auto_push_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
ngx_rtmp_auto_push_conf_t *apcf;
ngx_rtmp_auto_push_ctx_t *ctx;
if (s->auto_pushed || (s->relay && !s->static_relay)) {
goto next;
}
apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx,
ngx_rtmp_auto_push_module);
if (apcf->auto_push == 0) {
goto next;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_auto_push_index_module);
if (ctx == NULL) {
ctx = ngx_palloc(s->connection->pool,
sizeof(ngx_rtmp_auto_push_ctx_t));
if (ctx == NULL) {
goto next;
}
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_auto_push_index_module);
}
ngx_memzero(ctx, sizeof(*ctx));
ctx->push_evt.data = s;
ctx->push_evt.log = s->connection->log;
ctx->push_evt.handler = ngx_rtmp_auto_push_reconnect;
ctx->slots = ngx_pcalloc(s->connection->pool,
sizeof(ngx_int_t) * NGX_MAX_PROCESSES);
if (ctx->slots == NULL) {
goto next;
}
ngx_memcpy(ctx->name, v->name, sizeof(ctx->name));
ngx_memcpy(ctx->args, v->args, sizeof(ctx->args));
ngx_rtmp_auto_push_reconnect(&ctx->push_evt);
next:
return next_publish(s, v);
}
static ngx_int_t
ngx_rtmp_auto_push_delete_stream(ngx_rtmp_session_t *s,
ngx_rtmp_delete_stream_t *v)
{
ngx_rtmp_auto_push_conf_t *apcf;
ngx_rtmp_auto_push_ctx_t *ctx, *pctx;
ngx_rtmp_relay_ctx_t *rctx;
ngx_int_t slot;
apcf = (ngx_rtmp_auto_push_conf_t *) ngx_get_conf(ngx_cycle->conf_ctx,
ngx_rtmp_auto_push_module);
if (apcf->auto_push == 0) {
goto next;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_auto_push_index_module);
if (ctx) {
if (ctx->push_evt.timer_set) {
ngx_del_timer(&ctx->push_evt);
}
goto next;
}
/* skip non-relays & publishers */
rctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (rctx == NULL ||
rctx->tag != &ngx_rtmp_auto_push_module ||
rctx->publish == NULL)
{
goto next;
}
slot = (ngx_process_t *) rctx->data - &ngx_processes[0];
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"auto_push: disconnect slot=%i app='%V' name='%V'",
slot, &rctx->app, &rctx->name);
pctx = ngx_rtmp_get_module_ctx(rctx->publish->session,
ngx_rtmp_auto_push_index_module);
if (pctx == NULL) {
goto next;
}
pctx->slots[slot] = 0;
/* push reconnect */
if (!pctx->push_evt.timer_set) {
ngx_add_timer(&pctx->push_evt, apcf->push_reconnect);
}
next:
return next_delete_stream(s, v);
}
#endif /* NGX_HAVE_UNIX_DOMAIN */
C
1
https://gitee.com/winshining/nginx-http-flv-module.git
git@gitee.com:winshining/nginx-http-flv-module.git
winshining
nginx-http-flv-module
nginx-http-flv-module
master

搜索帮助