Skip to content

Commit

Permalink
add new socket message WARNING
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudwu committed Aug 11, 2015
1 parent 2714898 commit 947727e
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 6 deletions.
5 changes: 5 additions & 0 deletions examples/watchdog.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ function SOCKET.error(fd, msg)
close_agent(fd)
end

function SOCKET.warning(fd, size)
-- size K bytes havn't send out in fd
print("socket warning", fd, size)
end

function SOCKET.data(fd, msg)
end

Expand Down
9 changes: 8 additions & 1 deletion lualib-src/lua-netpack.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#define TYPE_ERROR 3
#define TYPE_OPEN 4
#define TYPE_CLOSE 5
#define TYPE_WARNING 6

/*
Each package is uint16 + data , uint16 (serialized in big-endian) is the number of bytes comprising the data .
Expand Down Expand Up @@ -371,6 +372,11 @@ lfilter(lua_State *L) {
lua_pushinteger(L, message->id);
pushstring(L, buffer, size);
return 4;
case SKYNET_SOCKET_TYPE_WARNING:
lua_pushvalue(L, lua_upvalueindex(TYPE_WARNING));
lua_pushinteger(L, message->id);
lua_pushinteger(L, message->ud);
return 4;
default:
// never get here
return 1;
Expand Down Expand Up @@ -537,8 +543,9 @@ luaopen_netpack(lua_State *L) {
lua_pushliteral(L, "error");
lua_pushliteral(L, "open");
lua_pushliteral(L, "close");
lua_pushliteral(L, "warning");

lua_pushcclosure(L, lfilter, 5);
lua_pushcclosure(L, lfilter, 6);
lua_setfield(L, -2, "filter");

return 1;
Expand Down
2 changes: 1 addition & 1 deletion lualib/skynet.lua
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ function skynet.sleep(ti)
end

function skynet.yield()
return skynet.sleep("0")
return skynet.sleep(0)
end

function skynet.wait()
Expand Down
6 changes: 6 additions & 0 deletions lualib/snax/gateserver.lua
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ function gateserver.start(handler)
close_fd(fd)
end

function MSG.warning(fd, size)
if handler.warning then
handler.warning(fd, size)
end
end

skynet.register_protocol {
name = "socket",
id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
Expand Down
25 changes: 25 additions & 0 deletions lualib/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,25 @@ socket_message[6] = function(id, size, data, address)
s.callback(str, address)
end

local function default_warning(id, size)
local s = socket_pool[id]
local last = s.warningsize or 0
if last + 64 < size then -- if size increase 64K
s.warningsize = size
skynet.error(string.format("WARNING: %d K bytes need to send out (fd = %d)", size, id))
end
s.warningsize = size
end

-- SKYNET_SOCKET_TYPE_WARNING
socket_message[7] = function(id, size)
local s = socket_pool[id]
if s then
local warning = s.warning or default_warning
warning(id, size)
end
end

skynet.register_protocol {
name = "socket",
id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6
Expand Down Expand Up @@ -404,4 +423,10 @@ end
socket.sendto = assert(driver.udp_send)
socket.udp_address = assert(driver.udp_address)

function socket.warning(id, callback)
local obj = socket_pool[id]
assert(obj)
obj.warning = callback
end

return socket
3 changes: 3 additions & 0 deletions service-src/service_gate.c
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ dispatch_socket_message(struct gate *g, const struct skynet_socket_message * mes
skynet_socket_start(ctx, message->ud);
}
break;
case SKYNET_SOCKET_TYPE_WARNING:
skynet_error(ctx, "fd (%d) send buffer (%d)K", message->id, message->ud);
break;
}
}

Expand Down
7 changes: 7 additions & 0 deletions service-src/service_harbor.c
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,13 @@ mainloop(struct skynet_context * context, void * ud, int type, int session, uint
case SKYNET_SOCKET_TYPE_CONNECT:
// fd forward to this service
break;
case SKYNET_SOCKET_TYPE_WARNING: {
int id = harbor_id(h, message->id);
if (id) {
skynet_error(context, "message havn't send to Harbor (%d) reach %d K", id, message->ud);
}
break;
}
default:
skynet_error(context, "recv invalid socket message type %d", type);
break;
Expand Down
4 changes: 4 additions & 0 deletions service/gate.lua
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ function handler.error(fd, msg)
skynet.send(watchdog, "lua", "socket", "error", fd, msg)
end

function handler.warning(fd, size)
skynet.send(watchdog, "lua", "socket", "warning", fd, size)
end

local CMD = {}

function CMD.forward(source, fd, client, address)
Expand Down
11 changes: 7 additions & 4 deletions skynet-src/skynet_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,13 @@ check_wsz(struct skynet_context *ctx, int id, void *buffer, int64_t wsz) {
if (wsz < 0) {
return -1;
} else if (wsz > 1024 * 1024) {
int kb4 = wsz / 1024 / 4;
if (kb4 % 256 == 0) {
skynet_error(ctx, "%d Mb bytes on socket %d need to send out", (int)(wsz / (1024 * 1024)), id);
}
struct skynet_socket_message tmp;
tmp.type = SKYNET_SOCKET_TYPE_WARNING;
tmp.id = id;
tmp.ud = (int)(wsz / 1024);
tmp.buffer = NULL;
skynet_send(ctx, 0, skynet_context_handle(ctx), PTYPE_SOCKET, 0 , &tmp, sizeof(tmp));
// skynet_error(ctx, "%d Mb bytes on socket %d need to send out", (int)(wsz / (1024 * 1024)), id);
}
return 0;
}
Expand Down
1 change: 1 addition & 0 deletions skynet-src/skynet_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ struct skynet_context;
#define SKYNET_SOCKET_TYPE_ACCEPT 4
#define SKYNET_SOCKET_TYPE_ERROR 5
#define SKYNET_SOCKET_TYPE_UDP 6
#define SKYNET_SOCKET_TYPE_WARNING 7

struct skynet_socket_message {
int type;
Expand Down

0 comments on commit 947727e

Please sign in to comment.