Skip to content

Commit

Permalink
add kmer scan for chromvar-kmer
Browse files Browse the repository at this point in the history
  • Loading branch information
ruochiz committed Apr 7, 2024
1 parent 2457c22 commit da4196a
Showing 1 changed file with 80 additions and 0 deletions.
80 changes: 80 additions & 0 deletions scprinter/motifs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from functools import partial
from itertools import product
from pathlib import Path

import MOODS
Expand All @@ -20,6 +21,7 @@
import pandas as pd
from Bio import motifs
from pyfaidx import Fasta
from scipy.sparse import SparseEfficiencyWarning, csr_matrix, diags, hstack, vstack
from tqdm.auto import tqdm, trange
from typing_extensions import Literal

Expand Down Expand Up @@ -311,6 +313,84 @@ def _parse_scan_results(moods_scan_res, motifs, bed_coord, tfs):
return all_hits


def global_scan_kmer(seq, id):
global kmer2id
for k in kmer2id:
length = len(k)
break
res = np.zeros((len(kmer2id)))
for i in range(len(seq) - length):
kmer = seq[i : i + length]
if kmer in kmer2id:
res[kmer2id[kmer]] += 1
return res, id


class Kmers:
def __init__(
self,
ref_path_fa: str | Path,
k: int = 6,
# gap: int = 0,
reverse: bool = False,
nCores: int = 32,
):
self.k = k
# self.gap = gap
self.n_jobs = nCores
self.genome_seq = Fasta(ref_path_fa)
self.kmers = self._generate_kmers()
self.reverse = reverse

def _generate_kmers(self):
# Generate all possible k-mers of length k using ACTG
nucleotides = ["A", "C", "G", "T"]
return ["".join(kmer) for kmer in product(nucleotides, repeat=self.k)]

def chromvar_scan(self, adata, verbose: bool = True):
peaks = regionparser(list(adata.var_names))
peaks = np.array(peaks)
global kmer2id
kmer2id = {}
for i, kmer in enumerate(self.kmers):
kmer2id[kmer] = i
pool = ProcessPoolExecutor(max_workers=self.n_jobs)
p_list = []
bar = trange(len(peaks) if not self.reverse else int(len(peaks) * 2), disable=not verbose)
for peak_idx, peak in enumerate(peaks):
# peak = peaks_iter[peak_idx]
chr = peak[0]
start = int(peak[1])
end = int(peak[2])
seq = self.genome_seq[chr][start:end].seq.upper()
p_list.append(
pool.submit(
global_scan_kmer,
seq,
peak_idx,
)
)
if self.reverse:
seq = self.genome_seq[chr][start:end].reverse.complement.seq.upper()
p_list.append(
pool.submit(
global_scan_kmer,
seq,
peak_idx,
)
)
res = np.zeros((len(peaks), len(self.kmers)))
for p in as_completed(p_list):
bar.update(1)
v, id = p.result()
res[id] += v
pool.shutdown(wait=True)

res = pd.DataFrame(res, index=adata.var_names, columns=self.kmers)
adata.varm["motif_match"] = res.to_numpy()
adata.uns["motif_name"] = res.columns


class Motifs:
"""
A class for motif matching based on MOODS
Expand Down

0 comments on commit da4196a

Please sign in to comment.