Skip to content

Commit

Permalink
migration: Add the framework of multi-thread decompression
Browse files Browse the repository at this point in the history
Add the code to create and destroy the multiple threads those will be
used to do data decompression. Left some functions empty just to keep
clearness, and the code will be added later.

Signed-off-by: Liang Li <[email protected]>
Signed-off-by: Yang Zhang <[email protected]>
Reviewed-by: Dr.David Alan Gilbert <[email protected]>
Reviewed-by: Juan Quintela <[email protected]>
Signed-off-by: Juan Quintela <[email protected]>
  • Loading branch information
Liang Li authored and Juan Quintela committed May 6, 2015
1 parent 8706d2d commit 3fcb38c
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 0 deletions.
77 changes: 77 additions & 0 deletions arch_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <stdint.h>
#include <stdarg.h>
#include <stdlib.h>
#include <zlib.h>
#ifndef _WIN32
#include <sys/types.h>
#include <sys/mman.h>
Expand Down Expand Up @@ -127,6 +128,7 @@ static uint64_t bitmap_sync_count;
#define RAM_SAVE_FLAG_CONTINUE 0x20
#define RAM_SAVE_FLAG_XBZRLE 0x40
/* 0x80 is reserved in migration.h start with 0x100 next */
#define RAM_SAVE_FLAG_COMPRESS_PAGE 0x100

static struct defconfig_file {
const char *filename;
Expand Down Expand Up @@ -321,9 +323,18 @@ struct CompressParam {
};
typedef struct CompressParam CompressParam;

struct DecompressParam {
/* To be done */
};
typedef struct DecompressParam DecompressParam;

static CompressParam *comp_param;
static QemuThread *compress_threads;
static bool quit_comp_thread;
static bool quit_decomp_thread;
static DecompressParam *decomp_param;
static QemuThread *decompress_threads;
static uint8_t *compressed_data_buf;

static void *do_data_compress(void *opaque)
{
Expand Down Expand Up @@ -1203,10 +1214,59 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
}
}

static void *do_data_decompress(void *opaque)
{
while (!quit_decomp_thread) {
/* To be done */
}

return NULL;
}

void migrate_decompress_threads_create(void)
{
int i, thread_count;

thread_count = migrate_decompress_threads();
decompress_threads = g_new0(QemuThread, thread_count);
decomp_param = g_new0(DecompressParam, thread_count);
compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
quit_decomp_thread = false;
for (i = 0; i < thread_count; i++) {
qemu_thread_create(decompress_threads + i, "decompress",
do_data_decompress, decomp_param + i,
QEMU_THREAD_JOINABLE);
}
}

void migrate_decompress_threads_join(void)
{
int i, thread_count;

quit_decomp_thread = true;
thread_count = migrate_decompress_threads();
for (i = 0; i < thread_count; i++) {
qemu_thread_join(decompress_threads + i);
}
g_free(decompress_threads);
g_free(decomp_param);
g_free(compressed_data_buf);
decompress_threads = NULL;
decomp_param = NULL;
compressed_data_buf = NULL;
}

static void decompress_data_with_multi_threads(uint8_t *compbuf,
void *host, int len)
{
/* To be done */
}

static int ram_load(QEMUFile *f, void *opaque, int version_id)
{
int flags = 0, ret = 0;
static uint64_t seq_iter;
int len = 0;

seq_iter++;

Expand Down Expand Up @@ -1286,6 +1346,23 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
}
qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
break;
case RAM_SAVE_FLAG_COMPRESS_PAGE:
host = host_from_stream_offset(f, addr, flags);
if (!host) {
error_report("Invalid RAM offset " RAM_ADDR_FMT, addr);
ret = -EINVAL;
break;
}

len = qemu_get_be32(f);
if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
error_report("Invalid compressed data length: %d", len);
ret = -EINVAL;
break;
}
qemu_get_buffer(f, compressed_data_buf, len);
decompress_data_with_multi_threads(compressed_data_buf, host, len);
break;
case RAM_SAVE_FLAG_XBZRLE:
host = host_from_stream_offset(f, addr, flags);
if (!host) {
Expand Down
4 changes: 4 additions & 0 deletions include/migration/migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct MigrationState
QEMUBH *cleanup_bh;
QEMUFile *file;
int compress_thread_count;
int decompress_thread_count;
int compress_level;

int state;
Expand Down Expand Up @@ -108,6 +109,8 @@ MigrationState *migrate_get_current(void);

void migrate_compress_threads_create(void);
void migrate_compress_threads_join(void);
void migrate_decompress_threads_create(void);
void migrate_decompress_threads_join(void);
uint64_t ram_bytes_remaining(void);
uint64_t ram_bytes_transferred(void);
uint64_t ram_bytes_total(void);
Expand Down Expand Up @@ -159,6 +162,7 @@ int64_t xbzrle_cache_resize(int64_t new_size);
bool migrate_use_compression(void);
int migrate_compress_level(void);
int migrate_compress_threads(void);
int migrate_decompress_threads(void);

void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
Expand Down
19 changes: 19 additions & 0 deletions migration/migration.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@

/* Default compression thread count */
#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
/* Default decompression thread count, usually decompression is at
* least 4 times as fast as compression.*/
#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1

Expand All @@ -58,6 +61,7 @@ MigrationState *migrate_get_current(void)
.xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
.mbps = -1,
.compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
.decompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
.compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
};

Expand Down Expand Up @@ -113,6 +117,7 @@ static void process_incoming_migration_co(void *opaque)
free_xbzrle_decoded_buf();
if (ret < 0) {
error_report("load of migration failed: %s", strerror(-ret));
migrate_decompress_threads_join();
exit(EXIT_FAILURE);
}
qemu_announce_self();
Expand All @@ -121,6 +126,7 @@ static void process_incoming_migration_co(void *opaque)
bdrv_invalidate_cache_all(&local_err);
if (local_err) {
error_report_err(local_err);
migrate_decompress_threads_join();
exit(EXIT_FAILURE);
}

Expand All @@ -129,6 +135,7 @@ static void process_incoming_migration_co(void *opaque)
} else {
runstate_set(RUN_STATE_PAUSED);
}
migrate_decompress_threads_join();
}

void process_incoming_migration(QEMUFile *f)
Expand All @@ -137,6 +144,7 @@ void process_incoming_migration(QEMUFile *f)
int fd = qemu_get_fd(f);

assert(fd != -1);
migrate_decompress_threads_create();
qemu_set_nonblock(fd);
qemu_coroutine_enter(co, f);
}
Expand Down Expand Up @@ -400,6 +408,7 @@ static MigrationState *migrate_init(const MigrationParams *params)
int64_t xbzrle_cache_size = s->xbzrle_cache_size;
int compress_level = s->compress_level;
int compress_thread_count = s->compress_thread_count;
int decompress_thread_count = s->decompress_thread_count;

memcpy(enabled_capabilities, s->enabled_capabilities,
sizeof(enabled_capabilities));
Expand All @@ -412,6 +421,7 @@ static MigrationState *migrate_init(const MigrationParams *params)

s->compress_level = compress_level;
s->compress_thread_count = compress_thread_count;
s->decompress_thread_count = decompress_thread_count;
s->bandwidth_limit = bandwidth_limit;
s->state = MIGRATION_STATUS_SETUP;
trace_migrate_set_state(MIGRATION_STATUS_SETUP);
Expand Down Expand Up @@ -623,6 +633,15 @@ int migrate_compress_threads(void)
return s->compress_thread_count;
}

int migrate_decompress_threads(void)
{
MigrationState *s;

s = migrate_get_current();

return s->decompress_thread_count;
}

int migrate_use_xbzrle(void)
{
MigrationState *s;
Expand Down

0 comments on commit 3fcb38c

Please sign in to comment.