Skip to content

Commit

Permalink
Stream: UDP proxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
arut committed Jan 20, 2016
1 parent c790f96 commit 2ce791f
Show file tree
Hide file tree
Showing 23 changed files with 768 additions and 107 deletions.
4 changes: 2 additions & 2 deletions auto/options
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,8 @@ cat << END
--without-mail_imap_module disable ngx_mail_imap_module
--without-mail_smtp_module disable ngx_mail_smtp_module

--with-stream enable TCP proxy module
--with-stream=dynamic enable dynamic TCP proxy module
--with-stream enable TCP/UDP proxy module
--with-stream=dynamic enable dynamic TCP/UDP proxy module
--with-stream_ssl_module enable ngx_stream_ssl_module
--without-stream_limit_conn_module disable ngx_stream_limit_conn_module
--without-stream_access_module disable ngx_stream_access_module
Expand Down
1 change: 1 addition & 0 deletions auto/sources
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ UNIX_SRCS="$CORE_SRCS $EVENT_SRCS \
src/os/unix/ngx_udp_recv.c \
src/os/unix/ngx_send.c \
src/os/unix/ngx_writev_chain.c \
src/os/unix/ngx_udp_send.c \
src/os/unix/ngx_channel.c \
src/os/unix/ngx_shmem.c \
src/os/unix/ngx_process.c \
Expand Down
39 changes: 39 additions & 0 deletions auto/unix
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,45 @@ ngx_feature_test="setsockopt(0, SOL_SOCKET, SO_ACCEPTFILTER, NULL, 0)"
. auto/feature


# BSD way to get IPv4 datagram destination address

ngx_feature="IP_RECVDSTADDR"
ngx_feature_name="NGX_HAVE_IP_RECVDSTADDR"
ngx_feature_run=no
ngx_feature_incs="#include <sys/socket.h>
#include <netinet/in.h>"
ngx_feature_path=
ngx_feature_libs=
ngx_feature_test="setsockopt(0, IPPROTO_IP, IP_RECVDSTADDR, NULL, 0)"
. auto/feature


# Linux way to get IPv4 datagram destination address

ngx_feature="IP_PKTINFO"
ngx_feature_name="NGX_HAVE_IP_PKTINFO"
ngx_feature_run=no
ngx_feature_incs="#include <sys/socket.h>
#include <netinet/in.h>"
ngx_feature_path=
ngx_feature_libs=
ngx_feature_test="setsockopt(0, IPPROTO_IP, IP_PKTINFO, NULL, 0)"
. auto/feature


# RFC 3542 way to get IPv6 datagram destination address

ngx_feature="IPV6_RECVPKTINFO"
ngx_feature_name="NGX_HAVE_IPV6_RECVPKTINFO"
ngx_feature_run=no
ngx_feature_incs="#include <sys/socket.h>
#include <netinet/in.h>"
ngx_feature_path=
ngx_feature_libs=
ngx_feature_test="setsockopt(0, IPPROTO_IPV6, IPV6_RECVPKTINFO, NULL, 0)"
. auto/feature


ngx_feature="TCP_DEFER_ACCEPT"
ngx_feature_name="NGX_HAVE_DEFERRED_ACCEPT"
ngx_feature_run=no
Expand Down
92 changes: 82 additions & 10 deletions src/core/ngx_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,11 @@ ngx_open_listening_sockets(ngx_cycle_t *cycle)
}
#endif

if (ls[i].type != SOCK_STREAM) {
ls[i].fd = s;
continue;
}

if (listen(s, ls[i].backlog) == -1) {
err = ngx_socket_errno;

Expand Down Expand Up @@ -865,6 +870,67 @@ ngx_configure_listening_sockets(ngx_cycle_t *cycle)
#endif

#endif /* NGX_HAVE_DEFERRED_ACCEPT */

#if (NGX_HAVE_IP_RECVDSTADDR)

if (ls[i].wildcard
&& ls[i].type == SOCK_DGRAM
&& ls[i].sockaddr->sa_family == AF_INET)
{
value = 1;

if (setsockopt(ls[i].fd, IPPROTO_IP, IP_RECVDSTADDR,
(const void *) &value, sizeof(int))
== -1)
{
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_socket_errno,
"setsockopt(IP_RECVDSTADDR) "
"for %V failed, ignored",
&ls[i].addr_text);
}
}

#elif (NGX_HAVE_IP_PKTINFO)

if (ls[i].wildcard
&& ls[i].type == SOCK_DGRAM
&& ls[i].sockaddr->sa_family == AF_INET)
{
value = 1;

if (setsockopt(ls[i].fd, IPPROTO_IP, IP_PKTINFO,
(const void *) &value, sizeof(int))
== -1)
{
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_socket_errno,
"setsockopt(IP_PKTINFO) "
"for %V failed, ignored",
&ls[i].addr_text);
}
}

#endif

#if (NGX_HAVE_INET6 && NGX_HAVE_IPV6_RECVPKTINFO)

if (ls[i].wildcard
&& ls[i].type == SOCK_DGRAM
&& ls[i].sockaddr->sa_family == AF_INET6)
{
value = 1;

if (setsockopt(ls[i].fd, IPPROTO_IPV6, IPV6_RECVPKTINFO,
(const void *) &value, sizeof(int))
== -1)
{
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_socket_errno,
"setsockopt(IPV6_RECVPKTINFO) "
"for %V failed, ignored",
&ls[i].addr_text);
}
}

#endif
}

return;
Expand Down Expand Up @@ -978,7 +1044,7 @@ ngx_get_connection(ngx_socket_t s, ngx_log_t *log)
ngx_cycle->free_connections = c->data;
ngx_cycle->free_connection_n--;

if (ngx_cycle->files) {
if (ngx_cycle->files && ngx_cycle->files[s] == NULL) {
ngx_cycle->files[s] = c;
}

Expand Down Expand Up @@ -1019,7 +1085,7 @@ ngx_free_connection(ngx_connection_t *c)
ngx_cycle->free_connections = c;
ngx_cycle->free_connection_n++;

if (ngx_cycle->files) {
if (ngx_cycle->files && ngx_cycle->files[c->fd] == c) {
ngx_cycle->files[c->fd] = NULL;
}
}
Expand All @@ -1045,16 +1111,18 @@ ngx_close_connection(ngx_connection_t *c)
ngx_del_timer(c->write);
}

if (ngx_del_conn) {
ngx_del_conn(c, NGX_CLOSE_EVENT);
if (!c->shared) {
if (ngx_del_conn) {
ngx_del_conn(c, NGX_CLOSE_EVENT);

} else {
if (c->read->active || c->read->disabled) {
ngx_del_event(c->read, NGX_READ_EVENT, NGX_CLOSE_EVENT);
}
} else {
if (c->read->active || c->read->disabled) {
ngx_del_event(c->read, NGX_READ_EVENT, NGX_CLOSE_EVENT);
}

if (c->write->active || c->write->disabled) {
ngx_del_event(c->write, NGX_WRITE_EVENT, NGX_CLOSE_EVENT);
if (c->write->active || c->write->disabled) {
ngx_del_event(c->write, NGX_WRITE_EVENT, NGX_CLOSE_EVENT);
}
}
}

Expand All @@ -1078,6 +1146,10 @@ ngx_close_connection(ngx_connection_t *c)
fd = c->fd;
c->fd = (ngx_socket_t) -1;

if (c->shared) {
return;
}

if (ngx_close_socket(fd) == -1) {

err = ngx_socket_errno;
Expand Down
4 changes: 4 additions & 0 deletions src/core/ngx_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct ngx_listening_s {
unsigned nonblocking:1;
unsigned shared:1; /* shared between threads or processes */
unsigned addr_ntop:1;
unsigned wildcard:1;

#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
unsigned ipv6only:1;
Expand Down Expand Up @@ -141,6 +142,8 @@ struct ngx_connection_s {

ngx_pool_t *pool;

int type;

struct sockaddr *sockaddr;
socklen_t socklen;
ngx_str_t addr_text;
Expand Down Expand Up @@ -174,6 +177,7 @@ struct ngx_connection_s {
unsigned idle:1;
unsigned reusable:1;
unsigned close:1;
unsigned shared:1;

unsigned sendfile:1;
unsigned sndlowat:1;
Expand Down
4 changes: 3 additions & 1 deletion src/event/ngx_event.c
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ ngx_event_process_init(ngx_cycle_t *cycle)
return NGX_ERROR;
}

c->type = ls[i].type;
c->log = &ls[i].log;

c->listening = &ls[i];
Expand Down Expand Up @@ -818,7 +819,8 @@ ngx_event_process_init(ngx_cycle_t *cycle)

#else

rev->handler = ngx_event_accept;
rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept
: ngx_event_recvmsg;

if (ngx_use_accept_mutex
#if (NGX_HAVE_REUSEPORT)
Expand Down
4 changes: 4 additions & 0 deletions src/event/ngx_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ extern ngx_os_io_t ngx_io;
#define ngx_udp_recv ngx_io.udp_recv
#define ngx_send ngx_io.send
#define ngx_send_chain ngx_io.send_chain
#define ngx_udp_send ngx_io.udp_send


#define NGX_EVENT_MODULE 0x544E5645 /* "EVNT" */
Expand Down Expand Up @@ -491,6 +492,9 @@ extern ngx_module_t ngx_event_core_module;


void ngx_event_accept(ngx_event_t *ev);
#if !(NGX_WIN32)
void ngx_event_recvmsg(ngx_event_t *ev);
#endif
ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle);
u_char *ngx_accept_log_error(ngx_log_t *log, u_char *buf, size_t len);

Expand Down
Loading

0 comments on commit 2ce791f

Please sign in to comment.