Skip to content

Commit

Permalink
support listen at multiple ports.
Browse files Browse the repository at this point in the history
  • Loading branch information
winlinvip committed May 28, 2015
1 parent ff75239 commit ff5760d
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 24 deletions.
14 changes: 14 additions & 0 deletions trunk/src/core/dlp_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,17 @@ string dlp_get_peer_ip(int fd)

return ip;
}

void dlp_close_stfd(st_netfd_t& stfd)
{
if (stfd) {
int fd = st_netfd_fileno(stfd);
st_netfd_close(stfd);
stfd = NULL;

// st does not close it sometimes,
// close it manually.
close(fd);
}
}

5 changes: 4 additions & 1 deletion trunk/src/core/dlp_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
#include <inttypes.h>

#include <assert.h>
#define srs_assert(expression) assert(expression)
#define dlp_assert(expression) assert(expression)

#include <stddef.h>
#include <sys/types.h>
Expand Down Expand Up @@ -80,13 +80,16 @@ extern int dlp_master_id;
#define ERROR_FORK_WORKER 1001
#define ERROR_ST_INITIALIZE 1002
#define ERROR_ST_OPEN_FD 1003
#define ERROR_ST_TRHEAD 1004

// utilies.
#include <string>
#include <vector>
#include <st.h>
extern std::vector<int> dlp_list_to_ints(std::string str_list);
extern int dlp_listen_tcp(int port, int& fd);
extern int dlp_st_init();
extern std::string dlp_get_peer_ip(int fd);
extern void dlp_close_stfd(st_netfd_t& stfd);

#endif
147 changes: 131 additions & 16 deletions trunk/src/core/dlp_core_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,69 @@ using namespace std;

#include <st.h>

int dlp_proxy(st_netfd_t stfd)
DlpProxyContext::DlpProxyContext()
{
_port = -1;
_fd = -1;
}

DlpProxyContext::~DlpProxyContext()
{
::close(_fd);
}

int DlpProxyContext::initialize(int p, int f, vector<int> sps)
{
int ret = ERROR_SUCCESS;

_port = p;
_fd = f;
sports = sps;

return ret;
}

int DlpProxyContext::fd()
{
return _fd;
}

int DlpProxyContext::port()
{
return _port;
}

DlpProxyConnection::DlpProxyConnection()
{
context = NULL;
stfd = NULL;
}

DlpProxyConnection::~DlpProxyConnection()
{
dlp_close_stfd(stfd);
}

int DlpProxyConnection::initilaize(DlpProxyContext* c, st_netfd_t s)
{
int ret = ERROR_SUCCESS;

context = c;
stfd = s;

return ret;
}

int DlpProxyConnection::fd()
{
return st_netfd_fileno(stfd);
}

int dlp_proxy(DlpProxyConnection* conn)
{
int ret = ERROR_SUCCESS;

int fd = st_netfd_fileno(stfd);
int fd = conn->fd();
std::string ip = dlp_get_peer_ip(fd);
dlp_trace("woker serve fd=%d, %s", fd, ip.c_str());

Expand All @@ -45,51 +103,108 @@ int dlp_proxy(st_netfd_t stfd)

void* dlp_proxy_pfn(void* arg)
{
st_netfd_t stfd = (st_netfd_t)arg;
DlpProxyConnection* conn = (DlpProxyConnection*)arg;
dlp_assert(conn);

int ret = ERROR_SUCCESS;
if ((ret = dlp_proxy(stfd)) != ERROR_SUCCESS) {
dlp_warn("worker proxy failed, ret=%d", ret);
if ((ret = dlp_proxy(conn)) != ERROR_SUCCESS) {
dlp_warn("worker proxy connection failed, ret=%d", ret);
} else {
dlp_trace("worker proxy completed.");
dlp_trace("worker proxy connection completed.");
}

dlp_freep(conn);

return NULL;
}

int dlp_run_proxyer(int port, int fd)
int dlp_run_proxyer_for(DlpProxyContext* context)
{
int ret = ERROR_SUCCESS;

if ((ret = dlp_st_init()) != ERROR_SUCCESS) {
return ret;
}

dlp_trace("dolphin worker serve port=%d, fd=%d", port, fd);
dlp_trace("dolphin worker serve port=%d, fd=%d", context->port(), context->fd());

st_netfd_t stfd = NULL;
if ((stfd = st_netfd_open_socket(fd)) == NULL) {
if ((stfd = st_netfd_open_socket(context->fd())) == NULL) {
ret = ERROR_ST_OPEN_FD;
dlp_error("worker open stfd failed. ret=%d", ret);
return ret;
}
dlp_info("worker open fd ok, fd=%d", fd);
dlp_info("worker open fd ok, fd=%d", context->fd());

for (;;) {
dlp_verbose("worker proecess serve at port %d", port);
dlp_verbose("worker proecess serve at port %d", context->port());
st_netfd_t cfd = NULL;

if ((cfd = st_accept(stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT)) == NULL) {
dlp_warn("ignore worker accept client error.");
continue;
}

DlpProxyConnection* conn = new DlpProxyConnection();
if ((ret = conn->initilaize(context, cfd)) != ERROR_SUCCESS) {
return ret;
}

st_thread_t trd = NULL;
if ((trd = st_thread_create(dlp_proxy_pfn, cfd, 0, 0)) == NULL) {
if ((trd = st_thread_create(dlp_proxy_pfn, conn, 0, 0)) == NULL) {
dlp_freep(conn);

dlp_warn("ignore worker thread create error.");
continue;
}
}

return ret;
}

void* dlp_run_proxyer_pfn(void* arg)
{
DlpProxyContext* context = (DlpProxyContext*)arg;
dlp_assert(context);

int ret = ERROR_SUCCESS;
if ((ret = dlp_run_proxyer_for(context)) != ERROR_SUCCESS) {
dlp_warn("worker proxy context failed, ret=%d", ret);
} else {
dlp_trace("worker proxy context completed.");
}

dlp_freep(context);

return NULL;
}

int dlp_run_proxyer(vector<int> ports, std::vector<int> fds, std::vector<int> sports)
{
int ret = ERROR_SUCCESS;

if ((ret = dlp_st_init()) != ERROR_SUCCESS) {
return ret;
}

dlp_assert(ports.size() == fds.size());
for (int i = 0; i < (int)ports.size(); i++) {
int port = ports.at(i);
int fd = fds.at(i);

DlpProxyContext* context = new DlpProxyContext();
if ((ret = context->initialize(port, fd, sports)) != ERROR_SUCCESS) {
dlp_freep(context);
return ret;
}

st_thread_t trd = NULL;
if ((trd = st_thread_create(dlp_run_proxyer_pfn, context, 0, 0)) == NULL) {
dlp_freep(context);

ret = ERROR_ST_TRHEAD;
dlp_warn("worker thread create error. ret=%d", ret);
return ret;
}
}

st_thread_exit(NULL);

return ret;
}
34 changes: 33 additions & 1 deletion trunk/src/core/dlp_core_proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,38 @@

#include <dlp_core.hpp>

extern int dlp_run_proxyer(int port, int fd);
#include <vector>

#include <st.h>

class DlpProxyContext
{
private:
int _port;
int _fd;
std::vector<int> sports;
public:
DlpProxyContext();
virtual ~DlpProxyContext();
public:
virtual int initialize(int p, int f, std::vector<int> sps);
virtual int fd();
virtual int port();
};

class DlpProxyConnection
{
private:
DlpProxyContext* context;
st_netfd_t stfd;
public:
DlpProxyConnection();
virtual ~DlpProxyConnection();
public:
virtual int initilaize(DlpProxyContext* c, st_netfd_t s);
virtual int fd();
};

extern int dlp_run_proxyer(std::vector<int> ports, std::vector<int> fds, std::vector<int> sports);

#endif
12 changes: 6 additions & 6 deletions trunk/src/main/dlp_main_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ int dlp_listen_rtmp(vector<int> ports, vector<int>& fds)
return ret;
}

int dlp_fork_workers(vector<int> ports, vector<int> fds, int workers, vector<int>& pids)
int dlp_fork_workers(vector<int> ports, vector<int> fds, int workers, vector<int>& pids, vector<int> sports)
{
int ret = ERROR_SUCCESS;

Expand All @@ -181,8 +181,7 @@ int dlp_fork_workers(vector<int> ports, vector<int> fds, int workers, vector<int

// child process: worker proxy engine.
if (pid == 0) {
// TODO: FIXME: support multiple ports.
ret = dlp_run_proxyer(ports.at(0), fds.at(0));
ret = dlp_run_proxyer(ports, fds, sports);
exit(ret);
}

Expand Down Expand Up @@ -247,22 +246,23 @@ int main(int argc, char** argv)
dlp_trace("dolphin use srs binary at %s", srs_binary.c_str());
dlp_trace("dolphin use config file %s for srs", srs_config_file.c_str());

std::vector<int> rtmp_service_ports = dlp_list_to_ints(srs_service_ports);
std::vector<int> rtmp_proxy_ports = dlp_list_to_ints(dlp_proxy_ports);

// listen the serve socket for workers.
std:vector<int> rtmp_fds;
std::vector<int> rtmp_proxy_ports = dlp_list_to_ints(dlp_proxy_ports);
if ((ret = dlp_listen_rtmp(rtmp_proxy_ports, rtmp_fds)) != ERROR_SUCCESS) {
return ret;
}

// fork all workers.
std::vector<int> worker_pids;
if ((ret = dlp_fork_workers(rtmp_proxy_ports, rtmp_fds, dlp_worker_process, worker_pids)) != ERROR_SUCCESS) {
if ((ret = dlp_fork_workers(rtmp_proxy_ports, rtmp_fds, dlp_worker_process, worker_pids, rtmp_service_ports)) != ERROR_SUCCESS) {
return ret;
}

// fork all srs servers.
std::vector<int> srs_pids;
std::vector<int> rtmp_service_ports = dlp_list_to_ints(srs_service_ports);
if ((ret = dlp_fork_srs(rtmp_service_ports, srs_binary, srs_config_file, srs_pids)) != ERROR_SUCCESS) {
return ret;
}
Expand Down

0 comments on commit ff5760d

Please sign in to comment.