Skip to content

Commit

Permalink
colo-compare: track connection and enqueue packet
Browse files Browse the repository at this point in the history
In this patch we use kernel jhash table to track
connection, and then enqueue net packet like this:

+ CompareState ++
|               |
+---------------+   +---------------+         +---------------+
|conn list      +--->conn           +--------->conn           |
+---------------+   +---------------+         +---------------+
|               |     |           |             |          |
+---------------+ +---v----+  +---v----+    +---v----+ +---v----+
                  |primary |  |secondary    |primary | |secondary
                  |packet  |  |packet  +    |packet  | |packet  +
                  +--------+  +--------+    +--------+ +--------+
                      |           |             |          |
                  +---v----+  +---v----+    +---v----+ +---v----+
                  |primary |  |secondary    |primary | |secondary
                  |packet  |  |packet  +    |packet  | |packet  +
                  +--------+  +--------+    +--------+ +--------+
                      |           |             |          |
                  +---v----+  +---v----+    +---v----+ +---v----+
                  |primary |  |secondary    |primary | |secondary
                  |packet  |  |packet  +    |packet  | |packet  +
                  +--------+  +--------+    +--------+ +--------+

We use conn_list to record connection info.
When we want to enqueue a packet, firstly get the
connection from connection_track_table. then push
the packet to g_queue(pri/sec) in it's own conn.

Signed-off-by: Zhang Chen <[email protected]>
Signed-off-by: Li Zhijian <[email protected]>
Signed-off-by: Wen Congyang <[email protected]>
Signed-off-by: Jason Wang <[email protected]>
  • Loading branch information
zhangckid authored and jasowang committed Sep 27, 2016
1 parent ccf0426 commit b6540d4
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 10 deletions.
53 changes: 43 additions & 10 deletions net/colo-compare.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#define COLO_COMPARE(obj) \
OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)

#define MAX_QUEUE_SIZE 1024

/*
+ CompareState ++
| |
Expand Down Expand Up @@ -67,6 +69,11 @@ typedef struct CompareState {
SocketReadState pri_rs;
SocketReadState sec_rs;

/* connection list: the connections belonged to this NIC could be found
* in this list.
* element type: Connection
*/
GQueue conn_list;
/* hashtable to save connection */
GHashTable *connection_track_table;
} CompareState;
Expand Down Expand Up @@ -94,7 +101,9 @@ static int compare_chr_send(CharDriverState *out,
*/
static int packet_enqueue(CompareState *s, int mode)
{
ConnectionKey key;
Packet *pkt = NULL;
Connection *conn;

if (mode == PRIMARY_IN) {
pkt = packet_new(s->pri_rs.buf, s->pri_rs.packet_len);
Expand All @@ -107,17 +116,34 @@ static int packet_enqueue(CompareState *s, int mode)
pkt = NULL;
return -1;
}
/* TODO: get connection key from pkt */
fill_connection_key(pkt, &key);

/*
* TODO: use connection key get conn from
* connection_track_table
*/
conn = connection_get(s->connection_track_table,
&key,
&s->conn_list);

/*
* TODO: insert pkt to it's conn->primary_list
* or conn->secondary_list
*/
if (!conn->processing) {
g_queue_push_tail(&s->conn_list, conn);
conn->processing = true;
}

if (mode == PRIMARY_IN) {
if (g_queue_get_length(&conn->primary_list) <=
MAX_QUEUE_SIZE) {
g_queue_push_tail(&conn->primary_list, pkt);
} else {
error_report("colo compare primary queue size too big,"
"drop packet");
}
} else {
if (g_queue_get_length(&conn->secondary_list) <=
MAX_QUEUE_SIZE) {
g_queue_push_tail(&conn->secondary_list, pkt);
} else {
error_report("colo compare secondary queue size too big,"
"drop packet");
}
}

return 0;
}
Expand Down Expand Up @@ -308,7 +334,12 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize);
net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize);

/* use g_hash_table_new_full() to new a hashtable */
g_queue_init(&s->conn_list);

s->connection_track_table = g_hash_table_new_full(connection_key_hash,
connection_key_equal,
g_free,
connection_destroy);

return;
}
Expand Down Expand Up @@ -349,6 +380,8 @@ static void colo_compare_finalize(Object *obj)
qemu_chr_fe_release(s->chr_out);
}

g_queue_free(&s->conn_list);

g_free(s->pri_indev);
g_free(s->sec_indev);
g_free(s->outdev);
Expand Down
108 changes: 108 additions & 0 deletions net/colo.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,29 @@
#include "trace.h"
#include "net/colo.h"

uint32_t connection_key_hash(const void *opaque)
{
const ConnectionKey *key = opaque;
uint32_t a, b, c;

/* Jenkins hash */
a = b = c = JHASH_INITVAL + sizeof(*key);
a += key->src.s_addr;
b += key->dst.s_addr;
c += (key->src_port | key->dst_port << 16);
__jhash_mix(a, b, c);

a += key->ip_proto;
__jhash_final(a, b, c);

return c;
}

int connection_key_equal(const void *key1, const void *key2)
{
return memcmp(key1, key2, sizeof(ConnectionKey)) == 0;
}

int parse_packet_early(Packet *pkt)
{
int network_length;
Expand Down Expand Up @@ -59,6 +82,61 @@ int parse_packet_early(Packet *pkt)
return 0;
}

void fill_connection_key(Packet *pkt, ConnectionKey *key)
{
uint32_t tmp_ports;

memset(key, 0, sizeof(*key));
key->ip_proto = pkt->ip->ip_p;

switch (key->ip_proto) {
case IPPROTO_TCP:
case IPPROTO_UDP:
case IPPROTO_DCCP:
case IPPROTO_ESP:
case IPPROTO_SCTP:
case IPPROTO_UDPLITE:
tmp_ports = *(uint32_t *)(pkt->transport_header);
key->src = pkt->ip->ip_src;
key->dst = pkt->ip->ip_dst;
key->src_port = ntohs(tmp_ports & 0xffff);
key->dst_port = ntohs(tmp_ports >> 16);
break;
case IPPROTO_AH:
tmp_ports = *(uint32_t *)(pkt->transport_header + 4);
key->src = pkt->ip->ip_src;
key->dst = pkt->ip->ip_dst;
key->src_port = ntohs(tmp_ports & 0xffff);
key->dst_port = ntohs(tmp_ports >> 16);
break;
default:
break;
}
}

Connection *connection_new(ConnectionKey *key)
{
Connection *conn = g_slice_new(Connection);

conn->ip_proto = key->ip_proto;
conn->processing = false;
g_queue_init(&conn->primary_list);
g_queue_init(&conn->secondary_list);

return conn;
}

void connection_destroy(void *opaque)
{
Connection *conn = opaque;

g_queue_foreach(&conn->primary_list, packet_destroy, NULL);
g_queue_free(&conn->primary_list);
g_queue_foreach(&conn->secondary_list, packet_destroy, NULL);
g_queue_free(&conn->secondary_list);
g_slice_free(Connection, conn);
}

Packet *packet_new(const void *data, int size)
{
Packet *pkt = g_slice_new(Packet);
Expand All @@ -84,3 +162,33 @@ void connection_hashtable_reset(GHashTable *connection_track_table)
{
g_hash_table_remove_all(connection_track_table);
}

/* if not found, create a new connection and add to hash table */
Connection *connection_get(GHashTable *connection_track_table,
ConnectionKey *key,
GQueue *conn_list)
{
Connection *conn = g_hash_table_lookup(connection_track_table, key);

if (conn == NULL) {
ConnectionKey *new_key = g_memdup(key, sizeof(*key));

conn = connection_new(key);

if (g_hash_table_size(connection_track_table) > HASHTABLE_MAX_SIZE) {
trace_colo_proxy_main("colo proxy connection hashtable full,"
" clear it");
connection_hashtable_reset(connection_track_table);
/*
* clear the conn_list
*/
while (!g_queue_is_empty(conn_list)) {
connection_destroy(g_queue_pop_head(conn_list));
}
}

g_hash_table_insert(connection_track_table, new_key, conn);
}

return conn;
}
39 changes: 39 additions & 0 deletions net/colo.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@

#define HASHTABLE_MAX_SIZE 16384

#ifndef IPPROTO_DCCP
#define IPPROTO_DCCP 33
#endif

#ifndef IPPROTO_SCTP
#define IPPROTO_SCTP 132
#endif

#ifndef IPPROTO_UDPLITE
#define IPPROTO_UDPLITE 136
#endif

typedef struct Packet {
void *data;
union {
Expand All @@ -30,7 +42,34 @@ typedef struct Packet {
int size;
} Packet;

typedef struct ConnectionKey {
/* (src, dst) must be grouped, in the same way than in IP header */
struct in_addr src;
struct in_addr dst;
uint16_t src_port;
uint16_t dst_port;
uint8_t ip_proto;
} QEMU_PACKED ConnectionKey;

typedef struct Connection {
/* connection primary send queue: element type: Packet */
GQueue primary_list;
/* connection secondary send queue: element type: Packet */
GQueue secondary_list;
/* flag to enqueue unprocessed_connections */
bool processing;
uint8_t ip_proto;
} Connection;

uint32_t connection_key_hash(const void *opaque);
int connection_key_equal(const void *opaque1, const void *opaque2);
int parse_packet_early(Packet *pkt);
void fill_connection_key(Packet *pkt, ConnectionKey *key);
Connection *connection_new(ConnectionKey *key);
void connection_destroy(void *opaque);
Connection *connection_get(GHashTable *connection_track_table,
ConnectionKey *key,
GQueue *conn_list);
void connection_hashtable_reset(GHashTable *connection_track_table);
Packet *packet_new(const void *data, int size);
void packet_destroy(void *opaque, void *user_data);
Expand Down

0 comments on commit b6540d4

Please sign in to comment.