Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Zarr library directly as a parallel write sink. #60

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 88 additions & 62 deletions ALLCools/count_matrix/dataset.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import pathlib
import subprocess
import tempfile
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, as_completed
from functools import lru_cache
from shutil import rmtree

import numpy as np
import pandas as pd
import pybedtools
import pysam
import xarray as xr
import zarr
import zarr.convenience
import zarr.creation
import zarr.storage
from numcodecs import Blosc
from scipy import stats

from ALLCools.utilities import parse_chrom_size, parse_mc_pattern
Expand Down Expand Up @@ -68,11 +73,9 @@ def summary(self):
return mc_type_data


def _determine_datasets(regions, quantifiers, chrom_size_path, tmp_dir):
def _determine_datasets(regions, quantifiers, chrom_size_path):
"""Determine datasets for each region."""
tmp_dir = pathlib.Path(tmp_dir).absolute()
tmp_dir.mkdir(exist_ok=True, parents=True)

tmpdir = tempfile.mkdtemp()
chrom_sizes = parse_chrom_size(chrom_size_path)
datasets = {}
for pair in regions:
Expand Down Expand Up @@ -122,7 +125,7 @@ def _id(i, c=chrom):

except ValueError:
raise ValueError(f"Can not understand region specification {region_path}")
region_path = f"{tmp_dir}/{name}.regions.csv"
region_path = f"{tmpdir}/{name}.regions.csv"
region_bed_df.to_csv(region_path)
datasets[name] = {"regions": region_path, "quant": []}

Expand Down Expand Up @@ -152,7 +155,7 @@ def _id(i, c=chrom):
if quant_type not in ALLOW_QUANT_TYPES:
raise ValueError(f"QUANT_TYPE need to be in {ALLOW_QUANT_TYPES}, got {quant_type} in {quantifier}.")
datasets[name]["quant"].append(_Quant(mc_types=mc_types, quant_type=quant_type, kwargs=kwargs))
return datasets
return datasets, tmpdir


def _count_single_region_set(allc_table, region_config, obs_dim, region_dim):
Expand Down Expand Up @@ -207,16 +210,13 @@ def _calculate_pv(data, reverse_value, obs_dim, var_dim, cutoff=0.9):
return pv


def _count_single_zarr(
allc_table, region_config, obs_dim, region_dim, output_path, obs_dim_dtype, count_dtype="uint32"
):
def _count_single_zarr(allc_table, region_config, obs_dim, region_dim, chunk_start, regiongroup, count_dtype="uint32"):
"""Process single region set and its quantifiers."""
# count all ALLC and mC types that's needed for quantifiers if this region_dim
count_ds = _count_single_region_set(
allc_table=allc_table, region_config=region_config, obs_dim=obs_dim, region_dim=region_dim
)

total_ds = {}
# deal with count quantifiers
count_mc_types = []
for quant in region_config["quant"]:
Expand All @@ -227,8 +227,9 @@ def _count_single_zarr(
count_da = count_ds.sel(mc_type=count_mc_types)[f"{region_dim}_da"]
max_int = np.iinfo(count_dtype).max
count_da = xr.where(count_da > max_int, max_int, count_da)
total_ds[f"{region_dim}_da"] = count_da.astype(count_dtype)

regiongroup[f"{region_dim}_da"][chunk_start : chunk_start + allc_table.index.size, :, :, :] = count_da.astype(
count_dtype
).data
# deal with hypo-score, hyper-score quantifiers
for quant in region_config["quant"]:
if quant.quant_type == "hypo-score":
Expand All @@ -240,7 +241,9 @@ def _count_single_zarr(
var_dim=region_dim,
**quant.kwargs,
)
total_ds[f"{region_dim}_da_{mc_type}-hypo-score"] = data
regiongroup[f"{region_dim}_da_{mc_type}-hypo-score"][
chunk_start : chunk_start + allc_table.index.size, :
] = data.data
elif quant.quant_type == "hyper-score":
for mc_type in quant.mc_types:
data = _calculate_pv(
Expand All @@ -250,11 +253,11 @@ def _count_single_zarr(
var_dim=region_dim,
**quant.kwargs,
)
total_ds[f"{region_dim}_da_{mc_type}-hyper-score"] = data
total_ds = xr.Dataset(total_ds)
total_ds.coords[obs_dim] = total_ds.coords[obs_dim].astype(obs_dim_dtype)
total_ds.to_zarr(output_path, mode="w")
return output_path
regiongroup[f"{region_dim}_da_{mc_type}-hyper-score"][
chunk_start : chunk_start + allc_table.index.size, :
] = data.data

return True


@doc_params(
Expand Down Expand Up @@ -302,7 +305,6 @@ def generate_dataset(

# determine index length and str dtype
max_length = allc_table.index.map(lambda idx: len(idx)).max()
obs_dim_dtype = f"<U{max_length}"

# determine parallel chunk size
n_sample = allc_table.size
Expand All @@ -311,68 +313,91 @@ def generate_dataset(

# prepare regions and determine quantifiers
pathlib.Path(output_path).mkdir(exist_ok=True)
tmp_dir = f"{output_path}_tmp"
datasets = _determine_datasets(regions, quantifiers, chrom_size_path, tmp_dir)

z = zarr.storage.LocalStore(root=output_path)
root = zarr.open_group(store=z, mode="w")
datasets, tmpdir = _determine_datasets(regions, quantifiers, chrom_size_path)
# copy chrom_size_path to output_path
subprocess.run(["cp", "-f", chrom_size_path, f"{output_path}/chrom_sizes.txt"], check=True)

chunk_records = defaultdict(dict)
for region_dim, region_config in datasets.items():
regiongroup = root.create_group(region_dim)
# save region coords to the ds
bed = pd.read_csv(f"{tmpdir}/{region_dim}.regions.csv", index_col=0)
bed.columns = [f"{region_dim}_chrom", f"{region_dim}_start", f"{region_dim}_end"]
bed.index.name = region_dim
region_size = bed.index.size
dsobs = regiongroup.create_array(
name=obs_dim, data=allc_table.index.values, chunks=(chunk_size), dtype=f"<U{max_length}"
)
dsobs.update_attributes({"_ARRAY_DIMENSIONS": obs_dim})
# append region bed to the saved ds
ds = xr.Dataset()
for col, data in bed.items():
ds.coords[col] = data
ds.coords[region_dim] = bed.index.values
# change object dtype to string
for k in ds.coords.keys():
if ds.coords[k].dtype == "O":
ds.coords[k] = ds.coords[k].astype(str)
ds.to_zarr(f"{output_path}/{region_dim}", mode="w")
count_mc_types = []
for quant in region_config["quant"]:
if quant.quant_type == "count":
count_mc_types += quant.mc_types
count_mc_types = list(set(count_mc_types))
if len(count_mc_types) > 0:
DA = regiongroup.empty(
name=f"{region_dim}_da",
shape=(n_sample, region_size, len(count_mc_types), 2),
chunks=(chunk_size, region_size, len(count_mc_types), 2),
dtype="uint32",
)
DA.update_attributes({"_ARRAY_DIMENSIONS": (obs_dim, region_dim, "mc_type", "count_type")})
count = regiongroup.create_array(name="count_type", data=(["mc", "cov"]), dtype="<U3")
count.update_attributes({"_ARRAY_DIMENSIONS": "count_type"})
mc = regiongroup.create_array(name="mc_type", data=count_mc_types, dtype="<U3")
mc.update_attributes({"_ARRAY_DIMENSIONS": "mc_type"})
# deal with hypo-score, hyper-score quantifiers
for quant in region_config["quant"]:
if quant.quant_type == "hypo-score":
for mc_type in quant.mc_types:
hypo = regiongroup.empty(
name=f"{region_dim}_da_{mc_type}-hypo-score",
shape=(allc_table.size, region_size),
chunks=(chunk_size, region_size),
dtype="float16",
)
hypo.update_attributes({"_ARRAY_DIMENSIONS": (obs_dim, region_dim)})
elif quant.quant_type == "hyper-score":
for mc_type in quant.mc_types:
hyper = regiongroup.empty(
name=f"{region_dim}_da_{mc_type}-hyper-score",
shape=(allc_table.size, region_size),
chunks=(chunk_size, region_size),
dtype="float16",
)
hyper.update_attributes({"_ARRAY_DIMENSIONS": (obs_dim, region_dim)})
Blosc.use_threads = False
with ProcessPoolExecutor(cpu) as exe:
futures = {}
# parallel on allc chunks and region_sets levels
for i, chunk_start in enumerate(range(0, n_sample, chunk_size)):
allc_chunk = allc_table[chunk_start : chunk_start + chunk_size]
for region_dim, region_config in datasets.items():
chunk_path = f"{tmp_dir}/chunk_{region_dim}_{chunk_start}.zarr"
f = exe.submit(
_count_single_zarr,
allc_table=allc_chunk,
region_config=region_config,
obs_dim=obs_dim,
region_dim=region_dim,
output_path=chunk_path,
obs_dim_dtype=obs_dim_dtype,
chunk_start=chunk_start,
regiongroup=regiongroup,
)
futures[f] = (region_dim, i)

for f in as_completed(futures):
region_dim, i = futures[f]
chunk_path = f.result()
print(f"Chunk {i} of {region_dim} returned")
chunk_records[region_dim][i] = chunk_path

for region_dim, chunks in chunk_records.items():
# write chunk in order
chunk_paths = pd.Series(chunks).sort_index().tolist()
for i, chunk_path in enumerate(chunk_paths):
ds = xr.open_zarr(chunk_path).load()
# dump chunk to final place
if i == 0:
# first chunk
ds.to_zarr(f"{output_path}/{region_dim}", mode="w")
else:
# append
ds.to_zarr(f"{output_path}/{region_dim}", append_dim=obs_dim)
rmtree(chunk_path)

# save region coords to the ds
bed = pd.read_csv(f"{tmp_dir}/{region_dim}.regions.csv", index_col=0)
bed.columns = [f"{region_dim}_chrom", f"{region_dim}_start", f"{region_dim}_end"]
bed.index.name = region_dim
# append region bed to the saved ds
ds = xr.Dataset()
for col, data in bed.items():
ds.coords[col] = data
# change object dtype to string
for k in ds.coords.keys():
if ds.coords[k].dtype == "O":
ds.coords[k] = ds.coords[k].astype(str)
ds.to_zarr(f"{output_path}/{region_dim}", mode="a")

# delete tmp
rmtree(tmp_dir)

Blosc.use_threads = None
from ..mcds.utilities import update_dataset_config

update_dataset_config(
Expand All @@ -383,4 +408,5 @@ def generate_dataset(
"ds_sample_dim": {region_dim: obs_dim for region_dim in datasets.keys()},
},
)
zarr.convenience.consolidate_metadata(z)
return output_path
20 changes: 10 additions & 10 deletions docs/CONDUCT.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ In the interest of fostering an open and welcoming environment, we as contributo

Examples of behavior that contributes to creating a positive environment include:

- Using welcoming and inclusive language
- Being respectful of differing viewpoints and experiences
- Gracefully accepting constructive criticism
- Focusing on what is best for the community
- Showing empathy towards other community members
- Using welcoming and inclusive language
- Being respectful of differing viewpoints and experiences
- Gracefully accepting constructive criticism
- Focusing on what is best for the community
- Showing empathy towards other community members

Examples of unacceptable behavior by participants include:

- The use of sexualized language or imagery and unwelcome sexual attention or advances
- Trolling, insulting/derogatory comments, and personal or political attacks
- Public or private harassment
- Publishing others' private information, such as a physical or electronic address, without explicit permission
- Other conduct which could reasonably be considered inappropriate in a professional setting
- The use of sexualized language or imagery and unwelcome sexual attention or advances
- Trolling, insulting/derogatory comments, and personal or political attacks
- Public or private harassment
- Publishing others' private information, such as a physical or electronic address, without explicit permission
- Other conduct which could reasonably be considered inappropriate in a professional setting

## Our Responsibilities

Expand Down
14 changes: 7 additions & 7 deletions docs/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ Report bugs using GitHub issues.

If you are reporting a bug, please include:

- Your operating system name and version.
- Any details about your local setup that might be helpful in troubleshooting.
- Detailed steps to reproduce the bug.
- Your operating system name and version.
- Any details about your local setup that might be helpful in troubleshooting.
- Detailed steps to reproduce the bug.

## Fix Bugs

Expand All @@ -35,10 +35,10 @@ The best way to send feedback is to file an issue on GitHub.

If you are proposing a feature:

- Explain in detail how it would work.
- Keep the scope as narrow as possible, to make it easier to implement.
- Remember that this is a volunteer-driven project, and that contributions
are welcome :)
- Explain in detail how it would work.
- Keep the scope as narrow as possible, to make it easier to implement.
- Remember that this is a volunteer-driven project, and that contributions
are welcome :)

## Get Started

Expand Down
12 changes: 6 additions & 6 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ ALLCools documentation

If you'd like to develop on and build the ALLCools book, you should:

- Clone this repository and run
- Run `pip install -r requirements.txt` (it is recommended you do this within a virtual environment)
- (Recommended) Remove the existing `ALLCools/_build/` directory
- Run `jupyter-book build ALLCools/`
- Clone this repository and run
- Run `pip install -r requirements.txt` (it is recommended you do this within a virtual environment)
- (Recommended) Remove the existing `ALLCools/_build/` directory
- Run `jupyter-book build ALLCools/`

A fully-rendered HTML version of the book will be built in `ALLCools/_build/html/`.

Expand All @@ -21,8 +21,8 @@ The html version of the book is hosted on the `gh-pages` branch of this repo. A

If you wish to disable this automation, you may remove the GitHub actions workflow and build the book manually by:

- Navigating to your local build; and running,
- `ghp-import -n -p -f ALLCools/_build/html`
- Navigating to your local build; and running,
- `ghp-import -n -p -f ALLCools/_build/html`

This will automatically push your build to the `gh-pages` branch. More information on this hosting process can be found [here](https://jupyterbook.org/publish/gh-pages.html#manually-host-your-book-with-github-pages).

Expand Down
12 changes: 6 additions & 6 deletions docs/allcools/cell_level/basic/intro_basic_clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ The dataset we used for 100Kb clustering documentation comes from the hippocampu

#### Download Input Files

- Cell metadata: ADD DOWNLOAD URL
- single-cell ALLC files: ADD DOWNLOAD URL
- MCDS files: ADD DOWNLOAD URL
- Cell metadata: ADD DOWNLOAD URL
- single-cell ALLC files: ADD DOWNLOAD URL
- MCDS files: ADD DOWNLOAD URL

### For 5Kb bins clustering

The dataset we used for 5Kb clustering documentation comes from human PBMC (ADD REFERENCE).

#### Download Input Files

- Cell metadata: ADD DOWNLOAD URL
- single-cell ALLC files: ADD DOWNLOAD URL
- MCDS files: ADD DOWNLOAD URL
- Cell metadata: ADD DOWNLOAD URL
- single-cell ALLC files: ADD DOWNLOAD URL
- MCDS files: ADD DOWNLOAD URL

## Prepare your own datasets

Expand Down
16 changes: 8 additions & 8 deletions docs/allcools/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ ALLCools documentation organization.

## Authors

- Hanqing Liu, developer, initial conception
- Jingtian Zhou, developer, 5kb clustering algorithms
- Wei Tian
- Jiaying Xu
- Hanqing Liu, developer, initial conception
- Jingtian Zhou, developer, 5kb clustering algorithms
- Wei Tian
- Jiaying Xu

## Support

Expand All @@ -37,10 +37,10 @@ figclass: margin
Click on this to create a page specific issue.
```

- The source code is on [github](https://github.com/lhqing/ALLCools);
- For releases and changelog, please check out the [github releases page](https://github.com/lhqing/ALLCools/releases);
- For bugs and feature requests, please use the [issue tracker](https://github.com/lhqing/ALLCools/issues).
- For page-specific issues, please use the "open issue" button on the top-right toggle.
- The source code is on [github](https://github.com/lhqing/ALLCools);
- For releases and changelog, please check out the [github releases page](https://github.com/lhqing/ALLCools/releases);
- For bugs and feature requests, please use the [issue tracker](https://github.com/lhqing/ALLCools/issues).
- For page-specific issues, please use the "open issue" button on the top-right toggle.

## Citing ALLCools

Expand Down
Loading