Skip to content

Commit

Permalink
r338: pemerge - fixed memory leaks; multithreading
Browse files Browse the repository at this point in the history
pemerge is actually quite slow.
  • Loading branch information
lh3 committed Mar 7, 2013
1 parent 3e3236d commit 1cadfa1
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 84 deletions.
2 changes: 1 addition & 1 deletion main.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "utils.h"

#ifndef PACKAGE_VERSION
#define PACKAGE_VERSION "0.7.0-r337-beta"
#define PACKAGE_VERSION "0.7.0-r338-beta"
#endif

int bwa_fa2pac(int argc, char *argv[]);
Expand Down
195 changes: 112 additions & 83 deletions pemerge.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
#include <unistd.h>
#include <string.h>
#include <zlib.h>
#include <pthread.h>
#include "ksw.h"
#include "kseq.h"
#include "kstring.h"
#include "bwa.h"
KSEQ_DECLARE(gzFile)

Expand All @@ -23,14 +25,13 @@ static const char *err_msg[MAX_ERR+1] = {
"pairs with high sum of errors"
};

typedef struct {
kstring_t n, s, q;
} mseq_t;

typedef struct {
int a, b, q, r, w;
int q_def, q_thres;
int T;
int chunk_size;
int n_threads;
int flag; // bit 1: print merged; 2: print unmerged
int8_t mat[25];
} pem_opt_t;

Expand All @@ -42,67 +43,70 @@ pem_opt_t *pem_opt_init()
opt->T = opt->a * 10;
opt->q_def = 20;
opt->q_thres = 70;
opt->chunk_size = 10000000;
opt->n_threads = 1;
opt->flag = 3;
bwa_fill_scmat(opt->a, opt->b, opt->mat);
return opt;
}

int bwa_pemerge(const pem_opt_t *opt, mseq_t x[2], uint8_t **seq_, uint8_t **qual_)
int bwa_pemerge(const pem_opt_t *opt, bseq1_t x[2])
{
uint8_t *s[2], *q[2], *seq, *qual;
int i, xtra, l, l_seq, sum_q;
int i, xtra, l, l_seq, sum_q, ret = 0;
kswr_t r;

*seq_ = *qual_ = 0;
s[0] = malloc(x[0].s.l); q[0] = malloc(x[0].s.l);
s[1] = malloc(x[1].s.l); q[1] = malloc(x[1].s.l);
for (i = 0; i < x[0].s.l; ++i) {
int c = x[0].s.s[i];
s[0] = malloc(x[0].l_seq); q[0] = malloc(x[0].l_seq);
s[1] = malloc(x[1].l_seq); q[1] = malloc(x[1].l_seq);
for (i = 0; i < x[0].l_seq; ++i) {
int c = x[0].seq[i];
s[0][i] = c < 0 || c > 127? 4 : c <= 4? c : nst_nt4_table[c];
q[0][i] = x[0].q.l? x[0].q.s[i] - 33 : opt->q_def;
q[0][i] = x[0].qual? x[0].qual[i] - 33 : opt->q_def;
}
for (i = 0; i < x[1].s.l; ++i) {
int c = x[1].s.s[x[1].s.l - 1 - i];
for (i = 0; i < x[1].l_seq; ++i) {
int c = x[1].seq[x[1].l_seq - 1 - i];
c = c < 0 || c > 127? 4 : c < 4? c : nst_nt4_table[c];
s[1][i] = c < 4? 3 - c : 4;
q[1][i] = x[1].q.l? x[1].q.s[x[1].q.l - 1 - i] - 33 : opt->q_def;
q[1][i] = x[1].qual? x[1].qual[x[1].l_seq - 1 - i] - 33 : opt->q_def;
}

xtra = KSW_XSTART | KSW_XSUBO;
r = ksw_align(x[1].s.l, s[1], x[0].s.l, s[0], 5, opt->mat, opt->q, opt->r, xtra, 0);
r = ksw_align(x[1].l_seq, s[1], x[0].l_seq, s[0], 5, opt->mat, opt->q, opt->r, xtra, 0);
++r.qe; ++r.te; // change to the half-close-half-open coordinates

if (r.score < opt->T) return -1; // poor alignment
if (r.tb < r.qb) return -2; // no enough space for the left end
if (x[0].s.l - r.te > x[1].s.l - r.qe) return -3; // no enough space for the right end
if ((double)r.score2 / r.score >= MAX_SCORE_RATIO) return -4; // the second best score is too large
if (r.qe - r.qb != r.te - r.tb) return -5; // we do not allow gaps
if (r.score < opt->T) { ret = -1; goto pem_ret; } // poor alignment
if (r.tb < r.qb) { ret = -2; goto pem_ret; } // no enough space for the left end
if (x[0].l_seq - r.te > x[1].l_seq - r.qe) { ret = -3; goto pem_ret; } // no enough space for the right end
if ((double)r.score2 / r.score >= MAX_SCORE_RATIO) { ret = -4; goto pem_ret; } // the second best score is too large
if (r.qe - r.qb != r.te - r.tb) { ret = -5; goto pem_ret; } // we do not allow gaps

{ // test tandem match; O(n^2)
int max_m, max_m2, min_l, max_l, max_l2;
max_m = max_m2 = 0; max_l = max_l2 = 0;
min_l = x[0].s.l < x[1].s.l? x[0].s.l : x[1].s.l;
min_l = x[0].l_seq < x[1].l_seq? x[0].l_seq : x[1].l_seq;
for (l = 1; l < min_l; ++l) {
int m = 0, o = x[0].s.l - l;
int m = 0, o = x[0].l_seq - l;
uint8_t *s0o = &s[0][o], *s1 = s[1];
for (i = 0; i < l; ++i) // TODO: in principle, this can be done with SSE2. It is the bottleneck!
m += opt->mat[(s1[i]<<2) + s1[i] + s0o[i]]; // equivalent to s[1][i]*5 + s[0][o+i]
if (m > max_m) max_m2 = max_m, max_m = m, max_l2 = max_l, max_l = l;
else if (m > max_m2) max_m2 = m, max_l2 = l;
}
if (max_m < opt->T || max_l != x[0].s.l - (r.tb - r.qb)) return -6;
if (max_l2 < max_l && max_m2 >= opt->T && (double)(max_m2 + (max_l - max_l2) * opt->a) / max_m >= MAX_SCORE_RATIO)
return -7;
if (max_l2 > max_l && (double)max_m2 / max_m >= MAX_SCORE_RATIO) return -7;
if (max_m < opt->T || max_l != x[0].l_seq - (r.tb - r.qb)) { ret = -6; goto pem_ret; }
if (max_l2 < max_l && max_m2 >= opt->T && (double)(max_m2 + (max_l - max_l2) * opt->a) / max_m >= MAX_SCORE_RATIO) {
ret = -7; goto pem_ret;
}
if (max_l2 > max_l && (double)max_m2 / max_m >= MAX_SCORE_RATIO) { ret = -7; goto pem_ret; }
}

l = x[0].s.l - (r.tb - r.qb); // length to merge
l_seq = x[0].s.l + x[1].s.l - l;
l = x[0].l_seq - (r.tb - r.qb); // length to merge
l_seq = x[0].l_seq + x[1].l_seq - l;
seq = malloc(l_seq + 1);
qual = malloc(l_seq + 1);
memcpy(seq, s[0], x[0].s.l); memcpy(seq + x[0].s.l, &s[1][l], x[1].s.l - l);
memcpy(qual, q[0], x[0].s.l); memcpy(qual + x[0].s.l, &q[1][l], x[1].s.l - l);
memcpy(seq, s[0], x[0].l_seq); memcpy(seq + x[0].l_seq, &s[1][l], x[1].l_seq - l);
memcpy(qual, q[0], x[0].l_seq); memcpy(qual + x[0].l_seq, &q[1][l], x[1].l_seq - l);
for (i = 0, sum_q = 0; i < l; ++i) {
int k = x[0].s.l - l + i;
int k = x[0].l_seq - l + i;
if (s[0][k] == 4) { // ambiguous
seq[k] = s[1][i];
qual[k] = q[1][i];
Expand All @@ -118,51 +122,99 @@ int bwa_pemerge(const pem_opt_t *opt, mseq_t x[2], uint8_t **seq_, uint8_t **qua
}
if (sum_q>>1 > opt->q_thres) { // too many mismatches
free(seq); free(qual);
return -8;
ret = -8; goto pem_ret;
}

for (i = 0; i < l_seq; ++i) seq[i] = "ACGTN"[(int)seq[i]], qual[i] += 33;
seq[l_seq] = qual[l_seq] = 0;
*seq_ = seq, *qual_ = qual;
return l_seq;

free(x[1].name); free(x[1].seq); free(x[1].qual); free(x[1].comment);
memset(&x[1], 0, sizeof(bseq1_t));
free(x[0].seq); free(x[0].qual);
x[0].l_seq = l_seq; x[0].seq = (char*)seq; x[0].qual = (char*)qual;

pem_ret:
free(s[0]); free(s[1]); free(q[0]); free(q[1]);
return ret;
}

static inline void kstrcpy(kstring_t *dst, const kstring_t *src)
static inline void print_bseq(const bseq1_t *s, int rn)
{
dst->l = 0;
if (src->l == 0) return;
if (dst->m < src->l + 1) {
dst->m = src->l + 2;
kroundup32(dst->m);
dst->s = realloc(dst->s, dst->m);
}
dst->l = src->l;
memcpy(dst->s, src->s, src->l + 1);
putchar(s->qual? '@' : '>');
fputs(s->name, stdout);
if (rn == 1 || rn == 2) {
putchar('/'); putchar('0' + rn); putchar('\n');
} else puts(" merged");
puts(s->seq);
if (s->qual) {
puts("+"); puts(s->qual);
}
}

static inline void kseq2mseq(mseq_t *ms, const kseq_t *ks)
typedef struct {
int n, start;
bseq1_t *seqs;
int64_t cnt[MAX_ERR+1];
const pem_opt_t *opt;
} worker_t;

void *worker(void *data)
{
kstrcpy(&ms->n, &ks->name);
kstrcpy(&ms->s, &ks->seq);
kstrcpy(&ms->q, &ks->qual);
worker_t *w = (worker_t*)data;
int i;
for (i = w->start; i < w->n>>1; i += w->opt->n_threads)
++w->cnt[-bwa_pemerge(w->opt, &w->seqs[i<<1])];
return 0;
}

static inline void print_seq(const char *n, const char *s, const char *q)
static void process_seqs(const pem_opt_t *opt, int n_, bseq1_t *seqs, int64_t cnt[MAX_ERR+1])
{
putchar(q? '@' : '>');
puts(n); puts(s);
if (q) {
puts("+"); puts(q);
int i, j, n = n_>>1<<1;
worker_t *w;

w = calloc(opt->n_threads, sizeof(worker_t));
for (i = 0; i < opt->n_threads; ++i) {
worker_t *p = &w[i];
p->start = i; p->n = n;
p->opt = opt;
p->seqs = seqs;
}
if (opt->n_threads == 1) {
worker(w);
} else {
pthread_t *tid;
tid = (pthread_t*)calloc(opt->n_threads, sizeof(pthread_t));
for (i = 0; i < opt->n_threads; ++i) pthread_create(&tid[i], 0, worker, &w[i]);
for (i = 0; i < opt->n_threads; ++i) pthread_join(tid[i], 0);
free(tid);
}
for (i = 0; i < opt->n_threads; ++i) {
worker_t *p = &w[i];
for (j = 0; j <= MAX_ERR; ++j) cnt[j] += p->cnt[j];
}
free(w);
for (i = 0; i < n>>1; ++i) {
if (seqs[i<<1|1].l_seq != 0) {
if (opt->flag&2) {
print_bseq(&seqs[i<<1|0], 1);
print_bseq(&seqs[i<<1|1], 2);
}
} else if (opt->flag&1)
print_bseq(&seqs[i<<1|0], 0);
}
for (i = 0; i < n; ++i) {
bseq1_t *s = &seqs[i];
free(s->name); free(s->seq); free(s->qual); free(s->comment);
}
}

int main_pemerge(int argc, char *argv[])
{
int c, flag = 0, i;
int c, flag = 0, i, n;
int64_t cnt[MAX_ERR+1];
bseq1_t *bseq;
gzFile fp, fp2 = 0;
kseq_t *ks, *ks2 = 0;
mseq_t r[2];
pem_opt_t *opt;

opt = pem_opt_init();
Expand All @@ -172,6 +224,7 @@ int main_pemerge(int argc, char *argv[])
else if (c == 'Q') opt->q_thres = atoi(optarg);
}
if (flag == 0) flag = 3;
opt->flag = flag;

if (optind == argc) {
fprintf(stderr, "\n");
Expand All @@ -191,34 +244,10 @@ int main_pemerge(int argc, char *argv[])
ks2 = kseq_init(fp2);
}

memset(r, 0, sizeof(mseq_t)<<1);
memset(cnt, 0, 8 * (MAX_ERR+1));
while (kseq_read(ks) >= 0) {
uint8_t *seq, *qual;
int l_seq;
kseq2mseq(&r[0], ks);
if (ks2) {
if (kseq_read(ks2) < 0) break;
kseq2mseq(&r[1], ks2);
} else {
if (kseq_read(ks) < 0) break;
kseq2mseq(&r[1], ks);
}
l_seq = bwa_pemerge(opt, r, &seq, &qual);
if (l_seq > 0) {
++cnt[0];
if (flag & 1) {
if (r[0].n.l > 2 && (r[0].n.s[r[0].n.l-1] == '1' || r[0].n.s[r[0].n.l-1] == '2') && r[0].n.s[r[0].n.l-2] == '/')
r[0].n.s[r[0].n.l-2] = 0, r[0].n.l -= 2;
print_seq(r[0].n.s, (char*)seq, (char*)qual);
}
} else {
++cnt[-l_seq];
if (flag & 2) {
print_seq(r[0].n.s, r[0].s.s, r[0].q.l? r[0].q.s : 0);
print_seq(r[1].n.s, r[1].s.s, r[1].q.l? r[1].q.s : 0);
}
}
while ((bseq = bseq_read(opt->n_threads * opt->chunk_size, &n, ks, ks2)) != 0) {
process_seqs(opt, n, bseq, cnt);
free(bseq);
}

fprintf(stderr, "%12ld %s\n", (long)cnt[0], err_msg[0]);
Expand Down

0 comments on commit 1cadfa1

Please sign in to comment.