Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
lh3 committed Nov 26, 2014
2 parents cb89924 + 9d8c906 commit 309d1c5
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 70 deletions.
163 changes: 101 additions & 62 deletions fastmap.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,83 @@ extern unsigned char nst_nt4_table[256];

void *kopen(const char *fn, int *_fd);
int kclose(void *a);
void kt_pipeline(int n_threads, void *(*func)(void*, int, void*), void *shared_data, int n_steps);

typedef struct {
kseq_t *ks, *ks2;
mem_opt_t *opt;
mem_pestat_t *pes0;
int64_t n_processed;
int copy_comment, actual_chunk_size;
bwaidx_t *idx;
} ktp_aux_t;

typedef struct {
ktp_aux_t *aux;
int n_seqs;
bseq1_t *seqs;
} ktp_data_t;

static void *process(void *shared, int step, void *_data)
{
ktp_aux_t *aux = (ktp_aux_t*)shared;
ktp_data_t *data = (ktp_data_t*)_data;
int i;
if (step == 0) {
ktp_data_t *ret;
int64_t size = 0;
ret = calloc(1, sizeof(ktp_data_t));
ret->seqs = bseq_read(aux->actual_chunk_size, &ret->n_seqs, aux->ks, aux->ks2);
if (ret->seqs == 0) {
free(ret);
return 0;
}
if (!aux->copy_comment)
for (i = 0; i < ret->n_seqs; ++i) {
free(ret->seqs[i].comment);
ret->seqs[i].comment = 0;
}
for (i = 0; i < ret->n_seqs; ++i) size += ret->seqs[i].l_seq;
if (bwa_verbose >= 3)
fprintf(stderr, "[M::%s] read %d sequences (%ld bp)...\n", __func__, ret->n_seqs, (long)size);
return ret;
} else if (step == 1) {
const mem_opt_t *opt = aux->opt;
const bwaidx_t *idx = aux->idx;
if (opt->flag & MEM_F_SMARTPE) {
bseq1_t *sep[2];
int n_sep[2];
mem_opt_t tmp_opt = *opt;
bseq_classify(data->n_seqs, data->seqs, n_sep, sep);
if (bwa_verbose >= 3)
fprintf(stderr, "[M::%s] %d single-end sequences; %d paired-end sequences\n", __func__, n_sep[0], n_sep[1]);
if (n_sep[0]) {
tmp_opt.flag &= ~MEM_F_PE;
mem_process_seqs(&tmp_opt, idx->bwt, idx->bns, idx->pac, aux->n_processed, n_sep[0], sep[0], 0);
for (i = 0; i < n_sep[0]; ++i)
data->seqs[sep[0][i].id].sam = sep[0][i].sam;
}
if (n_sep[1]) {
tmp_opt.flag |= MEM_F_PE;
mem_process_seqs(&tmp_opt, idx->bwt, idx->bns, idx->pac, aux->n_processed + n_sep[0], n_sep[1], sep[1], aux->pes0);
for (i = 0; i < n_sep[1]; ++i)
data->seqs[sep[1][i].id].sam = sep[1][i].sam;
}
free(sep[0]); free(sep[1]);
} else mem_process_seqs(opt, idx->bwt, idx->bns, idx->pac, aux->n_processed, data->n_seqs, data->seqs, aux->pes0);
aux->n_processed += data->n_seqs;
return data;
} else if (step == 2) {
for (i = 0; i < data->n_seqs; ++i) {
if (data->seqs[i].sam) err_fputs(data->seqs[i].sam, stdout);
free(data->seqs[i].name); free(data->seqs[i].comment);
free(data->seqs[i].seq); free(data->seqs[i].qual); free(data->seqs[i].sam);
}
free(data->seqs); free(data);
return 0;
}
return 0;
}

static void update_a(mem_opt_t *opt, const mem_opt_t *opt0)
{
Expand All @@ -38,25 +115,24 @@ static void update_a(mem_opt_t *opt, const mem_opt_t *opt0)
int main_mem(int argc, char *argv[])
{
mem_opt_t *opt, opt0;
int fd, fd2, i, c, n, copy_comment = 0, ignore_alt = 0;
int fixed_chunk_size = -1, actual_chunk_size;
int fd, fd2, i, c, ignore_alt = 0, no_mt_io = 0;
int fixed_chunk_size = -1;
gzFile fp, fp2 = 0;
kseq_t *ks, *ks2 = 0;
bseq1_t *seqs;
bwaidx_t *idx;
char *p, *rg_line = 0, *hdr_line = 0;
const char *mode = 0;
void *ko = 0, *ko2 = 0;
int64_t n_processed = 0;
mem_pestat_t pes[4], *pes0 = 0;
mem_pestat_t pes[4];
ktp_aux_t aux;

memset(&aux, 0, sizeof(ktp_aux_t));
memset(pes, 0, 4 * sizeof(mem_pestat_t));
for (i = 0; i < 4; ++i) pes[i].failed = 1;

opt = mem_opt_init();
aux.opt = opt = mem_opt_init();
memset(&opt0, 0, sizeof(mem_opt_t));
while ((c = getopt(argc, argv, "epaFMCSPVYjk:c:v:s:r:t:R:A:B:O:E:U:w:L:d:T:Q:D:m:I:N:W:x:G:h:y:K:X:H:")) >= 0) {
while ((c = getopt(argc, argv, "1epaFMCSPVYjk:c:v:s:r:t:R:A:B:O:E:U:w:L:d:T:Q:D:m:I:N:W:x:G:h:y:K:X:H:")) >= 0) {
if (c == 'k') opt->min_seed_len = atoi(optarg), opt0.min_seed_len = 1;
else if (c == '1') no_mt_io = 1;
else if (c == 'x') mode = optarg;
else if (c == 'w') opt->w = atoi(optarg), opt0.w = 1;
else if (c == 'A') opt->a = atoi(optarg), opt0.a = 1;
Expand Down Expand Up @@ -85,7 +161,7 @@ int main_mem(int argc, char *argv[])
else if (c == 'N') opt->max_chain_extend = atoi(optarg), opt0.max_chain_extend = 1;
else if (c == 'W') opt->min_chain_weight = atoi(optarg), opt0.min_chain_weight = 1;
else if (c == 'y') opt->max_mem_intv = atol(optarg), opt0.max_mem_intv = 1;
else if (c == 'C') copy_comment = 1;
else if (c == 'C') aux.copy_comment = 1;
else if (c == 'K') fixed_chunk_size = atoi(optarg);
else if (c == 'X') opt->mask_level = atof(optarg);
else if (c == 'h') {
Expand Down Expand Up @@ -118,7 +194,7 @@ int main_mem(int argc, char *argv[])
} else if (c == 'H') {
hdr_line = bwa_insert_header(optarg, hdr_line);
} else if (c == 'I') { // specify the insert size distribution
pes0 = pes;
aux.pes0 = pes;
pes[1].failed = 0;
pes[1].avg = strtod(optarg, &p);
pes[1].std = pes[1].avg * .1;
Expand Down Expand Up @@ -238,22 +314,22 @@ int main_mem(int argc, char *argv[])
} else update_a(opt, &opt0);
bwa_fill_scmat(opt->a, opt->b, opt->mat);

idx = bwa_idx_load_from_shm(argv[optind]);
if (idx == 0) {
if ((idx = bwa_idx_load(argv[optind], BWA_IDX_ALL)) == 0) return 1; // FIXME: memory leak
aux.idx = bwa_idx_load_from_shm(argv[optind]);
if (aux.idx == 0) {
if ((aux.idx = bwa_idx_load(argv[optind], BWA_IDX_ALL)) == 0) return 1; // FIXME: memory leak
} else if (bwa_verbose >= 3)
fprintf(stderr, "[M::%s] load the bwa index from shared memory\n", __func__);
if (ignore_alt)
for (i = 0; i < idx->bns->n_seqs; ++i)
idx->bns->anns[i].is_alt = 0;
for (i = 0; i < aux.idx->bns->n_seqs; ++i)
aux.idx->bns->anns[i].is_alt = 0;

ko = kopen(argv[optind + 1], &fd);
if (ko == 0) {
if (bwa_verbose >= 1) fprintf(stderr, "[E::%s] fail to open file `%s'.\n", __func__, argv[optind + 1]);
return 1;
}
fp = gzdopen(fd, "r");
ks = kseq_init(fp);
aux.ks = kseq_init(fp);
if (optind + 2 < argc) {
if (opt->flag&MEM_F_PE) {
if (bwa_verbose >= 2)
Expand All @@ -265,58 +341,21 @@ int main_mem(int argc, char *argv[])
return 1;
}
fp2 = gzdopen(fd2, "r");
ks2 = kseq_init(fp2);
aux.ks2 = kseq_init(fp2);
opt->flag |= MEM_F_PE;
}
}
if (!(opt->flag & MEM_F_ALN_REG))
bwa_print_sam_hdr(idx->bns, hdr_line);
actual_chunk_size = fixed_chunk_size > 0? fixed_chunk_size : opt->chunk_size * opt->n_threads;
while ((seqs = bseq_read(actual_chunk_size, &n, ks, ks2)) != 0) {
int64_t size = 0;
if (!copy_comment)
for (i = 0; i < n; ++i) {
free(seqs[i].comment); seqs[i].comment = 0;
}
for (i = 0; i < n; ++i) size += seqs[i].l_seq;
if (bwa_verbose >= 3)
fprintf(stderr, "[M::%s] read %d sequences (%ld bp)...\n", __func__, n, (long)size);
if (opt->flag & MEM_F_SMARTPE) {
bseq1_t *sep[2];
int i, n_sep[2];
mem_opt_t tmp_opt = *opt;
bseq_classify(n, seqs, n_sep, sep);
if (bwa_verbose >= 3)
fprintf(stderr, "[M::%s] %d single-end sequences; %d paired-end sequences\n", __func__, n_sep[0], n_sep[1]);
if (n_sep[0]) {
tmp_opt.flag &= ~MEM_F_PE;
mem_process_seqs(&tmp_opt, idx->bwt, idx->bns, idx->pac, n_processed, n_sep[0], sep[0], 0);
for (i = 0; i < n_sep[0]; ++i)
seqs[sep[0][i].id].sam = sep[0][i].sam;
}
if (n_sep[1]) {
tmp_opt.flag |= MEM_F_PE;
mem_process_seqs(&tmp_opt, idx->bwt, idx->bns, idx->pac, n_processed + n_sep[0], n_sep[1], sep[1], pes0);
for (i = 0; i < n_sep[1]; ++i)
seqs[sep[1][i].id].sam = sep[1][i].sam;
}
free(sep[0]); free(sep[1]);
} else mem_process_seqs(opt, idx->bwt, idx->bns, idx->pac, n_processed, n, seqs, pes0);
n_processed += n;
for (i = 0; i < n; ++i) {
if (seqs[i].sam) err_fputs(seqs[i].sam, stdout);
free(seqs[i].name); free(seqs[i].comment); free(seqs[i].seq); free(seqs[i].qual); free(seqs[i].sam);
}
free(seqs);
}

bwa_print_sam_hdr(aux.idx->bns, hdr_line);
aux.actual_chunk_size = fixed_chunk_size > 0? fixed_chunk_size : opt->chunk_size * opt->n_threads;
kt_pipeline(no_mt_io? 1 : 2, process, &aux, 3);
free(hdr_line);
free(opt);
bwa_idx_destroy(idx);
kseq_destroy(ks);
bwa_idx_destroy(aux.idx);
kseq_destroy(aux.ks);
err_gzclose(fp); kclose(ko);
if (ks2) {
kseq_destroy(ks2);
if (aux.ks2) {
kseq_destroy(aux.ks2);
err_gzclose(fp2); kclose(ko2);
}
return 0;
Expand Down
104 changes: 97 additions & 7 deletions kthread.c
Original file line number Diff line number Diff line change
@@ -1,23 +1,30 @@
#include <pthread.h>
#include <stdlib.h>
#include <limits.h>

/************
* kt_for() *
************/

struct kt_for_t;

typedef struct {
struct kt_for_t *t;
int i;
long i;
} ktf_worker_t;

typedef struct kt_for_t {
int n_threads, n;
int n_threads;
long n;
ktf_worker_t *w;
void (*func)(void*,int,int);
void (*func)(void*,long,int);
void *data;
} kt_for_t;

static inline int steal_work(kt_for_t *t)
static inline long steal_work(kt_for_t *t)
{
int i, k, min = 0x7fffffff, min_i = -1;
int i, min_i = -1;
long k, min = LONG_MAX;
for (i = 0; i < t->n_threads; ++i)
if (min > t->w[i].i) min = t->w[i].i, min_i = i;
k = __sync_fetch_and_add(&t->w[min_i].i, t->n_threads);
Expand All @@ -27,7 +34,7 @@ static inline int steal_work(kt_for_t *t)
static void *ktf_worker(void *data)
{
ktf_worker_t *w = (ktf_worker_t*)data;
int i;
long i;
for (;;) {
i = __sync_fetch_and_add(&w->i, w->t->n_threads);
if (i >= w->t->n) break;
Expand All @@ -38,7 +45,7 @@ static void *ktf_worker(void *data)
pthread_exit(0);
}

void kt_for(int n_threads, void (*func)(void*,int,int), void *data, int n)
void kt_for(int n_threads, void (*func)(void*,long,int), void *data, long n)
{
int i;
kt_for_t t;
Expand All @@ -51,3 +58,86 @@ void kt_for(int n_threads, void (*func)(void*,int,int), void *data, int n)
for (i = 0; i < n_threads; ++i) pthread_create(&tid[i], 0, ktf_worker, &t.w[i]);
for (i = 0; i < n_threads; ++i) pthread_join(tid[i], 0);
}

/*****************
* kt_pipeline() *
*****************/

struct ktp_t;

typedef struct {
struct ktp_t *pl;
int step, running;
void *data;
} ktp_worker_t;

typedef struct ktp_t {
void *shared;
void *(*func)(void*, int, void*);
int n_workers, n_steps;
ktp_worker_t *workers;
pthread_mutex_t mutex;
pthread_cond_t cv;
} ktp_t;

static void *ktp_worker(void *data)
{
ktp_worker_t *w = (ktp_worker_t*)data;
ktp_t *p = w->pl;
while (w->step < p->n_steps) {
// test whether we can kick off the job with this worker
pthread_mutex_lock(&p->mutex);
for (;;) {
int i;
// test whether another worker is doing the same step
for (i = 0; i < p->n_workers; ++i) {
if (w == &p->workers[i]) continue; // ignore itself
if (p->workers[i].running && p->workers[i].step == w->step)
break;
}
if (i == p->n_workers) break; // no other workers doing w->step; then this worker will
pthread_cond_wait(&p->cv, &p->mutex);
}
w->running = 1;
pthread_mutex_unlock(&p->mutex);

// working on w->step
w->data = p->func(p->shared, w->step, w->step? w->data : 0); // for the first step, input is NULL

// update step and let other workers know
pthread_mutex_lock(&p->mutex);
w->step = w->step == p->n_steps - 1 || w->data? (w->step + 1) % p->n_steps : p->n_steps;
w->running = 0;
pthread_cond_broadcast(&p->cv);
pthread_mutex_unlock(&p->mutex);
}
pthread_exit(0);
}

void kt_pipeline(int n_threads, void *(*func)(void*, int, void*), void *shared_data, int n_steps)
{
ktp_t aux;
pthread_t *tid;
int i;

if (n_threads < 1) n_threads = 1;
aux.n_workers = n_threads;
aux.n_steps = n_steps;
aux.func = func;
aux.shared = shared_data;
pthread_mutex_init(&aux.mutex, 0);
pthread_cond_init(&aux.cv, 0);

aux.workers = alloca(n_threads * sizeof(ktp_worker_t));
for (i = 0; i < n_threads; ++i) {
ktp_worker_t *w = &aux.workers[i];
w->step = w->running = 0; w->pl = &aux; w->data = 0;
}

tid = alloca(n_threads * sizeof(pthread_t));
for (i = 0; i < n_threads; ++i) pthread_create(&tid[i], 0, ktp_worker, &aux.workers[i]);
for (i = 0; i < n_threads; ++i) pthread_join(tid[i], 0);

pthread_mutex_destroy(&aux.mutex);
pthread_cond_destroy(&aux.cv);
}
2 changes: 1 addition & 1 deletion main.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "utils.h"

#ifndef PACKAGE_VERSION
#define PACKAGE_VERSION "0.7.10-r1005-dirty"
#define PACKAGE_VERSION "0.7.10-r1017-dirty"
#endif

int bwa_fa2pac(int argc, char *argv[]);
Expand Down

0 comments on commit 309d1c5

Please sign in to comment.