Skip to content

Commit

Permalink
sample slave in worker1, 20 timers faster (but seems some space need …
Browse files Browse the repository at this point in the history
…to free, accessed wrong address when file is big)
  • Loading branch information
yanlifeng committed Mar 4, 2024
1 parent 98acd58 commit d57be25
Show file tree
Hide file tree
Showing 47 changed files with 11,650 additions and 42 deletions.
20 changes: 15 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ CFLAGS= -g -Wall -Wno-unused-function -O2 -I ~/online/ylf/init_swlu/swlu/include
#WRAP_MALLOC=-DUSE_MALLOC_WRAPPERS
AR= ar
DFLAGS= -DHAVE_PTHREAD $(WRAP_MALLOC)

LOBJS= utils.o kthread.o kstring.o ksw.o bwt.o bntseq.o bwa.o bwamem.o bwamem_pair.o bwamem_extra.o malloc_wrap.o \
QSufSort.o bwt_gen.o rope.o rle.o is.o bwtindex.o
AOBJS= bwashm.o bwase.o bwaseqio.o bwtgap.o bwtaln.o bamlite.o \
Expand All @@ -13,22 +14,31 @@ AOBJS= bwashm.o bwase.o bwaseqio.o bwtgap.o bwtaln.o bamlite.o \
bwtsw2_chain.o fastmap.o bwtsw2_pair.o
PROG= bwa
INCLUDES=
LIBS= ~/online/ylf/init_swlu/swlu/lib/libswlu_mpi.a -lm -lz -lpthread
LIBS= ~/online/ylf/init_swlu/swlu/lib/libswlu_mpi.a -lm -lz -lpthread -lm_slave
SUBDIRS= .


SLAVE_DIR = slave
SLAVE_SOURCES = $(wildcard $(SLAVE_DIR)/*.c)
SLAVE_OBJECTS = $(SLAVE_SOURCES:.c=.o)

ifeq ($(shell uname -s),Linux)
LIBS += -lrt
endif

.SUFFIXES:.c .o .cc

.c.o:
$(CC) -c $(CFLAGS) $(DFLAGS) $(INCLUDES) $(CPPFLAGS) $< -o $@
$(CC) -mhost -c $(CFLAGS) $(DFLAGS) $(INCLUDES) $(CPPFLAGS) $< -o $@

$(SLAVE_DIR)/%.o: $(SLAVE_DIR)/%.c
$(CC) -mslave -c $(CFLAGS) $< -o $@


all:$(PROG)

bwa:libbwa.a $(AOBJS) main.o
$(CC) $(CFLAGS) $(LDFLAGS) $(AOBJS) main.o -o $@ -L. -lbwa $(LIBS)
bwa:libbwa.a $(AOBJS) main.o $(SLAVE_OBJECTS)
$(CC) $(CFLAGS) $(LDFLAGS) $(AOBJS) main.o $(SLAVE_OBJECTS) -o $@ -L. -lbwa $(LIBS)

bwamem-lite:libbwa.a example.o
$(CC) $(CFLAGS) $(LDFLAGS) example.o -o $@ -L. -lbwa $(LIBS)
Expand All @@ -37,7 +47,7 @@ libbwa.a:$(LOBJS)
$(AR) -csru $@ $(LOBJS)

clean:
rm -f gmon.out *.o a.out $(PROG) *~ *.a
rm -f gmon.out *.o a.out $(PROG) *~ *.a slave/*.o

depend:
( LC_ALL=C ; export LC_ALL; makedepend -Y -- $(CFLAGS) $(DFLAGS) $(CPPFLAGS) -- *.c )
Expand Down
256 changes: 236 additions & 20 deletions bwamem.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
# include "malloc_wrap.h"
#endif


#include<athread.h>

/* Theory on probability and scoring *ungapped* alignment
*
* s'(a,b) = log[P(b|a)/P(b)] = log[4P(b|a)], assuming uniform base distribution
Expand Down Expand Up @@ -74,11 +77,32 @@ static const bntseq_t *global_bns = 0; // for debugging only
extern double t_work1;
extern double t_work2;

extern double t_work2_1;
extern double t_work2_2;
extern double t_work2_3;
extern double t_work1_3;
extern double t_work1_3_1;


extern long long s_reg_sum;
extern long long c_px2;
extern long long s_px2;



#define bwt_sa_slave
//extern void SLAVE_FUN(bwt_sa_s());
extern void SLAVE_FUN(worker1_s());

typedef struct{
long nn;
void* data;
} Para_worker1_s;


typedef struct{
bwt_t *bwt;
bwtint_t *rbegs;
int loop_size;
bwtint_t* k_ids;
} Para_bwa_sa;



Expand Down Expand Up @@ -210,7 +234,10 @@ typedef struct {

typedef struct {
int n, m, first, rid;
uint32_t w:29, kept:2, is_alt:1;
//uint32_t w:29, kept:2, is_alt:1;
uint32_t w;
uint32_t kept;
uint32_t is_alt;
float frac_rep;
int64_t pos;
mem_seed_t *seeds;
Expand Down Expand Up @@ -298,15 +325,151 @@ mem_chain_v mem_chain(const mem_opt_t *opt, const bwt_t *bwt, const bntseq_t *bn
tree = kb_init(chn, KB_DEFAULT_SIZE);

aux = buf? (smem_aux_t*)buf : smem_aux_init();
int max_s_size = opt->max_occ;
mem_collect_intv(opt, bwt, len, seq, aux);
for (i = 0, b = e = l_rep = 0; i < aux->mem.n; ++i) { // compute frac_rep
bwtintv_t *p = &aux->mem.a[i];
if(p->x[2] > max_s_size) max_s_size = p->x[2];
int sb = (p->info>>32), se = (uint32_t)p->info;
if (p->x[2] <= opt->max_occ) continue;
if (sb > e) l_rep += e - b, b = sb, e = se;
else e = e > se? e : se;
}
l_rep += e - b;

//double t0 = GetTime();

//#ifdef bwt_sa_slave
//
// int loop_size = 0;
// for (i = 0; i < aux->mem.n; ++i) {
// bwtintv_t *p = &aux->mem.a[i];
// int step = p->x[2] > opt->max_occ? p->x[2] / opt->max_occ : 1;
// int size = (p->x[2] + step - 1) / step;
// if(size > opt->max_occ) size = opt->max_occ;
// loop_size += size;
// }
// //int64_t rbegs[aux->mem.n][max_s_size];
// int64_t rbegs[loop_size];
// int i_ids[loop_size];
// bwtint_t k_ids[loop_size];
// int id = 0;
// for(i = 0; i < aux->mem.n; i++) {
// bwtintv_t *p = &aux->mem.a[i];
// int step, count, slen = (uint32_t)p->info - (p->info>>32); // seed length
// int64_t k;
// step = p->x[2] > opt->max_occ? p->x[2] / opt->max_occ : 1;
// for (k = count = 0; k < p->x[2] && count < opt->max_occ; k += step, ++count) {
// i_ids[id] = i;
// k_ids[id++] = p->x[0] + k;
// }
// }
// if(id != loop_size) {
// fprintf(stderr, "GGG\n");
// exit(0);
// }
//
// c_px2++;
// s_px2 += loop_size;
//
//
// Para_bwa_sa para_bwa_sa;
// para_bwa_sa.bwt = bwt;
// para_bwa_sa.rbegs = rbegs;
// para_bwa_sa.k_ids = k_ids;
// para_bwa_sa.loop_size = loop_size;
//
// double t1 = GetTime();
// athread_spawn64_arg(bwt_sa_s, &para_bwa_sa);
// athread_join64_arg();
// t_work1_3_1 += GetTime() - t1;
//
// //double t1 = GetTime();
// //for(i = 0; i < loop_size; i++) {
// // rbegs[i] = bwt_sa(bwt, k_ids[i]);
// //}
// //t_work1_3_1 += GetTime() - t1;
//
//
// id = 0;
// for (i = 0; i < aux->mem.n; ++i) {
// bwtintv_t *p = &aux->mem.a[i];
// int step, count, slen = (uint32_t)p->info - (p->info>>32); // seed length
// int64_t k;
// // if (slen < opt->min_seed_len) continue; // ignore if too short or too repetitive
// step = p->x[2] > opt->max_occ? p->x[2] / opt->max_occ : 1;
// for (k = count = 0; k < p->x[2] && count < opt->max_occ; k += step, ++count) {
// mem_chain_t tmp, *lower, *upper;
// mem_seed_t s;
// int rid, to_add = 0;
// s.rbeg = tmp.pos = rbegs[id++];
// //s.rbeg = tmp.pos = bwt_sa(bwt, k_ids[id++]);
// s.qbeg = p->info>>32;
// s.score= s.len = slen;
// rid = bns_intv2rid(bns, s.rbeg, s.rbeg + s.len);
// if (rid < 0) continue; // bridging multiple reference sequences or the forward-reverse boundary; TODO: split the seed; don't discard it!!!
// if (kb_size(tree)) {
// kb_intervalp(chn, tree, &tmp, &lower, &upper); // find the closest chain
// if (!lower || !test_and_merge(opt, l_pac, lower, &s, rid)) to_add = 1;
// } else to_add = 1;
// if (to_add) { // add the seed as a new chain
// tmp.n = 1; tmp.m = 4;
// tmp.seeds = calloc(tmp.m, sizeof(mem_seed_t));
// tmp.seeds[0] = s;
// tmp.rid = rid;
// tmp.is_alt = !!bns->anns[rid].is_alt;
// kb_putp(chn, tree, &tmp);
// }
// }
// }
//
// //for (i = 0; i < aux->mem.n; ++i) {
// // bwtintv_t *p = &aux->mem.a[i];
// // int step, count, slen = (uint32_t)p->info - (p->info>>32); // seed length
// // int64_t k;
// // // if (slen < opt->min_seed_len) continue; // ignore if too short or too repetitive
// // step = p->x[2] > opt->max_occ? p->x[2] / opt->max_occ : 1;
//
// //
// // int loop_size = (p->x[2] + step - 1) / step;
// // if (loop_size > opt->max_occ) loop_size = opt->max_occ;
//
// // Para_bwa_sa para_bwa_sa;
// // para_bwa_sa.bwt = bwt;
// // para_bwa_sa.rbegs = rbegs;
// // para_bwa_sa.loop_size = loop_size;
// // para_bwa_sa.step = step;
// // para_bwa_sa.x0 = p->x[0];
// //
// // __real_athread_spawn((void *aslave_bwt_sa_s, &para_bwa_sa, 1);
// // athread_join();
// // for (k = count = 0; k < p->x[2] && count < opt->max_occ; k += step, ++count) {
// // mem_chain_t tmp, *lower, *upper;
// // mem_seed_t s;
// // int rid, to_add = 0;
// // s.rbeg = tmp.pos = rbegs[count];
// // s.qbeg = p->info>>32;
// // s.score= s.len = slen;
// // rid = bns_intv2rid(bns, s.rbeg, s.rbeg + s.len);
// // if (rid < 0) continue; // bridging multiple reference sequences or the forward-reverse boundary; TODO: split the seed; don't discard it!!!
// // if (kb_size(tree)) {
// // kb_intervalp(chn, tree, &tmp, &lower, &upper); // find the closest chain
// // if (!lower || !test_and_merge(opt, l_pac, lower, &s, rid)) to_add = 1;
// // } else to_add = 1;
// // if (to_add) { // add the seed as a new chain
// // tmp.n = 1; tmp.m = 4;
// // tmp.seeds = calloc(tmp.m, sizeof(mem_seed_t));
// // tmp.seeds[0] = s;
// // tmp.rid = rid;
// // tmp.is_alt = !!bns->anns[rid].is_alt;
// // kb_putp(chn, tree, &tmp);
// // }
// // }
//
// //}
//
//#else

for (i = 0; i < aux->mem.n; ++i) {
bwtintv_t *p = &aux->mem.a[i];
int step, count, slen = (uint32_t)p->info - (p->info>>32); // seed length
Expand Down Expand Up @@ -336,6 +499,9 @@ mem_chain_v mem_chain(const mem_opt_t *opt, const bwt_t *bwt, const bntseq_t *bn
}
}
}

//#endif
//t_work1_3 += GetTime() - t0;
if (buf == 0) smem_aux_destroy(aux);

kv_resize(mem_chain_t, chain, kb_size(tree));
Expand Down Expand Up @@ -550,7 +716,9 @@ static void mem_mark_primary_se_core(const mem_opt_t *opt, int n, mem_alnreg_t *
}
}
}
if (k == z->n) kv_push(int, *z, i);
if (k == z->n) {
kv_push(int, *z, i);
}
else a[i].secondary = z->a[k];
}
}
Expand Down Expand Up @@ -1230,24 +1398,17 @@ static void worker2(void *data, int i, int tid)
extern int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], mem_alnreg_v a[2]);
extern void mem_reg2ovlp(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, bseq1_t *s, mem_alnreg_v *a);
worker_t *w = (worker_t*)data;
double t0;
if (!(w->opt->flag&MEM_F_PE)) {
if (bwa_verbose >= 4) printf("=====> Finalizing read '%s' <=====\n", w->seqs[i].name);
t0 = GetTime();
mem_mark_primary_se(w->opt, w->regs[i].n, w->regs[i].a, w->n_processed + i);
t_work2_1 += GetTime() - t0;
if (w->opt->flag & MEM_F_PRIMARY5) mem_reorder_primary5(w->opt->T, &w->regs[i]);
t0 = GetTime();
mem_reg2sam(w->opt, w->bns, w->pac, &w->seqs[i], &w->regs[i], 0, 0);
t_work2_2 += GetTime() - t0;
free(w->regs[i].a);
} else {
if (bwa_verbose >= 4) printf("=====> Finalizing read pair '%s' <=====\n", w->seqs[i<<1|0].name);
t0 = GetTime();
mem_alnreg_v a = w->regs[i<<1];
s_reg_sum += a.n + a.m;
if(a.n > 0) s_reg_sum += a.n + a.a[0].rid;
mem_sam_pe(w->opt, w->bns, w->pac, w->pes, (w->n_processed>>1) + i, &w->seqs[i<<1], &w->regs[i<<1]);
t_work2_3 += GetTime() - t0;
free(w->regs[i<<1|0].a); free(w->regs[i<<1|1].a);
}
}
Expand All @@ -1264,25 +1425,80 @@ void mem_process_seqs(const mem_opt_t *opt, const bwt_t *bwt, const bntseq_t *bn
ctime = cputime(); rtime = realtime();
global_bns = bns;
w.regs = malloc(n * sizeof(mem_alnreg_v));
for(i = 0; i < n; i++) {
w.regs[i].a = malloc(sizeof(mem_alnreg_t) * 1024);
w.regs[i].m = 1024;
w.regs[i].n = 0;
}
w.opt = opt; w.bwt = bwt; w.bns = bns; w.pac = pac;
w.seqs = seqs; w.n_processed = n_processed;
w.pes = &pes[0];
w.aux = malloc(opt->n_threads * sizeof(smem_aux_t));
for (i = 0; i < opt->n_threads; ++i)
w.aux = malloc(64 * sizeof(smem_aux_t));
for (i = 0; i < 64; ++i) {
w.aux[i] = smem_aux_init();
//kt_for(opt->n_threads, worker1, &w, (opt->flag&MEM_F_PE)? n>>1 : n); // find mapping positions
//kv_resize(bwtintv_t, w.aux[i]->mem, 1024);
//kv_resize(bwtintv_t, w.aux[i]->mem1, 1024);
//kv_resize(bwtintv_t, *w.aux[i]->tmpv[0], 1024);
//kv_resize(bwtintv_t, *w.aux[i]->tmpv[1], 1024);
}
//kt_for(opt->n_threads, worker1, &w, (opt->flag&MEM_F_PE)? n>>1 : n); // find mapping positions
double t0 = GetTime();
#ifdef use_swlu
swlu_prof_start();
kt_for_single(opt->n_threads, worker1, &w, (opt->flag&MEM_F_PE)? n>>1 : n); // find mapping positions
#endif
#ifdef bwt_sa_slave
athread_init();
//athread_enter64_arg();
#endif
long nn = (opt->flag&MEM_F_PE)? n>>1 : n;
c_px2++;
s_px2 += nn;
Para_worker1_s para;
para.nn = nn;
para.data = (void*)&w;

fprintf(stderr, "nn %d\n", nn);
//athread_spawn64_arg(worker1_s, &para);
//athread_join64_arg();
__real_athread_spawn((void*)slave_worker1_s, &para, 1);
athread_join();

//for(long ii = 0; ii < nn; ii++) {
// worker1(&w, ii, i % 64);
//}

//kt_for_single(opt->n_threads, worker1, &w, (opt->flag&MEM_F_PE)? n>>1 : n); // find mapping positions
#ifdef bwt_sa_slave
//athread_leave64_arg();
athread_halt();
#endif
#ifdef use_swlu
swlu_prof_stop();
#endif
t_work1 += GetTime() - t0;
for (i = 0; i < opt->n_threads; ++i)
for (i = 0; i < 64; ++i)
smem_aux_destroy(w.aux[i]);
free(w.aux);
FILE *file = fopen("regs_info.txt", "w");
if (file == NULL) {
perror("Error opening file");
return EXIT_FAILURE;
}
//for(long ii = 0; ii < nn; ii++) {
// fprintf(file, "regs %d size: %d\n", ii, w.regs[ii].n);
// for(int j = 0; j < w.regs[ii].n; j++) {
// fprintf(file, "[%lld %lld] ", w.regs[ii].a[j].rb, w.regs[ii].a[j].re);
// }
// fprintf(file, "\n");
//}

if (opt->flag&MEM_F_PE) { // infer insert sizes if not provided
if (pes0) memcpy(pes, pes0, 4 * sizeof(mem_pestat_t)); // if pes0 != NULL, set the insert-size distribution as pes0
else mem_pestat(opt, bns->l_pac, n, w.regs, pes); // otherwise, infer the insert size distribution from data
fprintf(file, "111\n");
if (pes0) { fprintf(file, "222\n"); memcpy(pes, pes0, 4 * sizeof(mem_pestat_t));}
else { fprintf(file, "333\n");mem_pestat(opt, bns->l_pac, n, w.regs, pes);}
}
fclose(file);

//kt_for(opt->n_threads, worker2, &w, (opt->flag&MEM_F_PE)? n>>1 : n); // generate alignment
t0 = GetTime();
kt_for_single(opt->n_threads, worker2, &w, (opt->flag&MEM_F_PE)? n>>1 : n); // generate alignment
Expand Down
Loading

0 comments on commit d57be25

Please sign in to comment.