Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tiledb unified ingest #9

Merged
merged 16 commits into from
Nov 18, 2024
56,511 changes: 56,511 additions & 0 deletions data/ensembl_gene_annotations.txt

Large diffs are not rendered by default.

153 changes: 58 additions & 95 deletions gwasstudio/cli/export.py
Original file line number Diff line number Diff line change
@@ -1,139 +1,102 @@
import click
import cloup
import tiledbvcf
from gwasstudio import logger
from gwasstudio.methods.genome_windows import create_genome_windows
from gwasstudio.methods.locus_breaker import locus_breaker
from gwasstudio.methods.extract_snp import extract_snp


#from methods.locus_breaker import locus_breaker
import tiledb
from scipy import stats
help_doc = """
Exports data from a TileDB-VCF dataset.
Exports data from a TileDB dataset.
"""


@cloup.command("export", no_args_is_help=True, help=help_doc)
@cloup.option_group(
"TileDBVCF options",
cloup.option("--uri", default="None", help="TileDB-VCF dataset URI"),
cloup.option("--output-path", default="output", help="The name of the output file"),
cloup.option("--genome-version", help="Genome version to be used (either hg19 or hg38)", default="hg19"),
cloup.option("--columns", default="CHROMOSOME,POSITION,ALLELES,BETA,SE,SAMPLES,LP", help="List of columns to keep, provided as a single string comma separated"),
cloup.option("--chromosome", help="Chromosomes list to use during processing. This can be a list of chromosomes separated by comma (Example: 1,2,3,4)", default="1"),
cloup.option("--window-size", default=50000000, help="Window size used by tiledbvcf for later queries"),
cloup.option("--sample-partitions", help="how many partitions to divide the sample list with for computation (default is 1)", default=1),
cloup.option("--region-partitions", help="how many partitions to divide the region list with for computation (default is 20)", default=20)
"TileDB mandatory options",
cloup.option("--uri", default="None", help="TileDB dataset URI"),
cloup.option("--output_path", default="None", help="The chromosome used for the analysis"),
cloup.option("--chromosome", default="None", help="The chromosome used for the analysis"),
cloup.option("--trait_id_file", default="None", help="The chromosome used for the analysis"),
)
@cloup.option_group(
"Options for Locusbreaker",
cloup.option("--locusbreaker", default=False, is_flag=True, help="Option to run locusbreaker"),
cloup.option("--samples", default="None", help="A path of a txt file containing 1 sample name per line"),
cloup.option("--pvalue-sig", default=5.0, help="P-value threshold to use for filtering the data"),
cloup.option("--pvalue-limit", default=5.0, help="P-value threshold for loci borders"),
cloup.option("--hole-size", default=250000, help="Minimum pair-base distance between SNPs in different loci (default: 250000)")
)
@cloup.option_group(
"Options for filtering using a list of SNPs ids",
"Options for filtering using a genomic regions or a list of SNPs ids",
cloup.option("--snp-list", default="None", help="A txt file with a column containing the SNP ids")
)
@click.pass_context
def export(
ctx,
columns,
locusbreaker,
pvalue_limit,
uri,
trait_id_file,
chromosome,
output_path,
pvalue_sig,
pvalue_limit,
hole_size,
snp_list,
window_size,
output_path,
genome_version,
chromosome,
sample_partitions,
region_partitions,
samples,
uri
locusbreaker

):
cfg = ctx.obj["cfg"]
print(cfg)
ds = tiledbvcf.Dataset(uri, mode="r", tiledb_config=cfg)
logger.info("TileDBVCF dataset loaded")
samples_list = []
if samples != "None":
samples_file = open(samples, "r").readlines()
samples_list = [s.split("\n")[0] for s in samples_file]

tiledb_unified = tiledb.open(uri, mode="r")
logger.info("TileDB dataset loaded")
trait_id_list = open(trait_id_file, "r").read().rstrip().split("\n")
# Create a mapping of the user selected columns into TileDB
columns_attribute_mapping = {
"CHROMOSOME": "contig",
"POSITION": "pos_start",
"SNP": "id",
"ALLELES": "alleles",
"BETA": "fmt_ES",
"SE": "fmt_SE",
"LP": "fmt_LP",
"SAMPLES": "sample_name",
}
column_list_select = [columns_attribute_mapping[a] for a in columns.split(",")]
column_list_select.append("pos_end")

# Create bed regions for all the genome
bed_regions_all = create_genome_windows(style="NCBI", window=window_size, genome=genome_version)

# Filter bed regions by chromosome if selected
if chromosome:
bed_regions = [r for r in bed_regions_all if any(r.startswith(f"{chr_num}:") for chr_num in chromosome.split(","))]

else:
bed_regions = bed_regions_all
logger.info("bed regions created")

# If locus_breaker is selected, run locus_breaker
if locusbreaker:
print("running locus breaker")
dask_df = ds.map_dask(
lambda tiledb_data: locus_breaker(
tiledb_data,
pvalue_limit=pvalue_limit,
pvalue_sig=pvalue_sig,
hole_size=hole_size,
map_attributes=columns_attribute_mapping
),
attrs=column_list_select,
regions=bed_regions,
samples=samples_list,
#sample_partitions=sample_partitions,
#region_partitions=region_partitions
)
#dask_df = ds.map_dask(
# lambda tiledb_data: locus_breaker(
# tiledb_data,
# pvalue_limit=pvalue_limit,
# pvalue_sig=pvalue_sig,
# hole_size=hole_size,
# chromosome,
# trait_id_list
# ),
# attrs=columns_attribute_mapping.keys()
#
logger.info(f"Saving locus-breaker output in {output_path}")
dask_df.to_csv(output_path)
return

# If snp_list is selected, run extract_snp
if snp_list != "None":
extract_snp(
tiledb_data=ds,
snp_file_list=snp_list,
column_list_select=column_list_select,
samples_list=samples_list,
sample_partitions=sample_partitions,
output_path=output_path,
map_attributes=columns_attribute_mapping,
)
SNP_list = pd.read_csv(snp_list)
position_list = SNP_list[["position"]].to_list()
chromosome_list = SNP_list[["chromosome"]].to_list()
subset_SNPs = tiledb_s.query(dims=['chromosome','position','trait_id'], attrs=['SNP','beta', 'se', 'freq','alt']).df[chromosome_list, trait_id_list, position_list]
subset_SNPs["p-value"] = 1 - stats.chi2.cdf((subset_SNPs["beta"]/subset_SNPs["se"])**2, df=1)
subset_SNPs = subset_SNPs.merge(SNP_list, on = "position")
filtered_ddf.to_csv(output_path)
#filtered_ddf.to_parquet(output_path, engine="pyarrow", compression="snappy", schema = None)
logger.info(f"Saved filtered summary statistics by SNPs in {output_path}")
exit()

#tiledb_data_snp.to_parquet(output_path, engine="pyarrow", compression="snappy", schema = None)

# If neither locus_breaker nor snp_list is selected, filter the data by regions and samples
else:
filtered_ddf = ds.read_dask(
attrs=column_list_select,
regions=bed_regions,
region_partitions=region_partitions,
samples=samples_list,
sample_partitions=sample_partitions
)
if pvalue_sig:
subset_SNPs = tiledb_s.query(return_arrow = True, dims=['chromosome','position','trait_id'], attrs=['SNP','beta', 'se', 'freq','alt']).df[chromosome, : ,trait_id_list]
z_scores = pc.divide(subset_SNPs['beta'], subset_SNPs['se'])

# Calculate chi-square statistics (z_scores squared)
chi_square_stats = pc.multiply(z_scores, z_scores)

# Calculate p-values using scipy for efficiency
p_values = [1 - stats.chi2.cdf(stat, df=1) for stat in chi_square_stats.to_numpy()]

# Add p_values as a new column
subset_SNPs = subset_SNPs.append_column('p_value', pa.array(p_values))

# Filter the Arrow table based on the p-value threshold
filtered_data = subset_SNPs.filter(pc.less(subset_SNPs['p_value'], 0.05))

# Convert back to a table or DataFrame if needed
filtered_data_df = filtered_data.to_pandas()
filtered_data_df.to_parquet(output_path, engine="pyarrow", compression="snappy",schema=None)
logger.info(f"Saving filtered GWAS by regions and samples in {output_path}")
filtered_ddf.to_parquet(output_path, engine="pyarrow", compression="snappy",schema=None)

1 change: 0 additions & 1 deletion gwasstudio/cli/info.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import cloup

from gwasstudio import __appname__, __version__, config_dir, log_file

help_doc = """
Expand Down
75 changes: 44 additions & 31 deletions gwasstudio/cli/ingest.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,68 @@
import click
import cloup
import pathlib
import tiledbvcf
from utils import process_and_ingest
from utils import create_tiledb_schema

help_doc = """
Ingest GWAS-VCF data in a TileDB-VCF dataset.
Ingest data data in a TileDB-unified dataset.
"""


@cloup.command("ingest", no_args_is_help=True, help=help_doc)
@cloup.option_group(
"Ingestion options",
cloup.option(
"--gwas-vcf-path",
"--input-path",
default=None,
help="Path to a folder where all the vcf.gz files to ingest are stored",
),
cloup.option(
"-a",
"--attrs",
help="List of extra attributes to add, provided as a single string comma separated",
),
cloup.option(
"-to",
"--tiledb-out-path",
"-u",
"--uri",
default=None,
help="s3 path for the tiledb dataset",
help="S3 path for the tiledb dataset.",
),
cloup.option(
"-r",
"--restart",
default=False,
help="Restart the ingestion from a previous run.",
)
)
@cloup.option_group(
"TileDB configurations",
cloup.option("-b", "--mem-budget-mb", default=20480, help="The memory budget in MiB when ingesting the data"),
cloup.option("-tr", "--threads", default=16, help="The number fo threads used for ingestion"),
cloup.option("--batch-size", default=5, help="The number of files to ingest in parallel."),
)
@click.pass_context
def ingest(ctx, gwas_vcf_path, attrs, tiledb_out_path, mem_budget_mb, threads):
if ctx.obj["DISTRIBUTE"]:
pass
else:
_attrs = [a.strip() for a in attrs.split(",")]
def ingest(ctx, input_path,checksum_path, attrs, uri, mem_budget_mb, threads, batch_size, restart):
# Parse checksum for mapping ids to files
checksum = pd.read_csv(file_path + "checksum.txt", sep = "\t", header=None)
checksum.columns = ["hash","filename"]
checksum_dict = pd.Series(checksum.hash.values,index=checksum.filename).to_dict()

# Getting the file list and iterate through it using Dask
cfg = ctx.obj["cfg"]
# vfs = tiledb.VFS(config=cfg)
# if (vfs.is_dir(tiledb_path_out)):
# print(f"Deleting existing array '{tiledb_path_out}'")
# vfs.remove_dir(tiledb_path_out)
# print("Done.")
ds = tiledbvcf.Dataset(tiledb_out_path, mode="w", tiledb_config=cfg)
ds.create_dataset(extra_attrs=[_attrs])
p = pathlib.Path(gwas_vcf_path)
ds.ingest_samples(
total_memory_budget_mb=mem_budget_mb,
threads=threads,
sample_uris=[str(file) for file in list(p.glob("*.gz"))],
)
if restart:
test_tiledb = tiledb.open(uri, "r")
arrow_table = test_tiledb.query(return_arrow=True, dims=['trait_id'], attrs=[]).df[1, 1:10000000, :]
unique_arrow = (np.unique(arrow_table))
checksum_dict = pd.Series(checksum.filename.values,index=checksum.hash).to_dict()
file_list = []
checksum_dict_keys = checksum_dict.keys()
for record in checksum_dict_keys:
if record not in unique_arrow:
file_list.append(checksum_dict[record])

# Process files in batches
else:
create_tiledb_schema(uri, cfg)
file_list = os.listdir(file_path)

for i in range(0, len(file_list), batch_size):
batch_files = file_list[i:i + batch_size]
tasks = [dask.delayed(process_and_ingest)(file_path + file, uri, checksum_dict, dict_type, cfg) for file in batch_files]
# Submit tasks and wait for completion
dask.compute(*tasks)
logging.info(f"Batch {i // batch_size + 1} completed.", flush=True)

File renamed without changes.
26 changes: 26 additions & 0 deletions gwasstudio/functions/create_tiledb_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import tiledb
import numpy as np
import pandas as pd
dict_type = {"chrom":np.uint8, "pos":np.uint32, "trait_id":np.dtype('S64'), "beta":np.float32, "se":np.float32, "freq":np.float32, "alt":np.dtype('S5'), "SNP":np.dtype('S20')}
# Define the TileDB array schema with SNP, gene, and population dimensions
def create_tiledb_schema(uri,cfg):
chrom_domain = (1, 22)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be 24

pos_domain = (1, 3000000000)
dom = tiledb.Domain(
tiledb.Dim(name="chrom", domain = chrom_domain, dtype=np.uint8, var=False),
tiledb.Dim(name="pos", domain = pos_domain, dtype=np.uint32, var=False),
tiledb.Dim(name="trait_id", dtype=np.dtype('S64'), var=True)
)
schema = tiledb.ArraySchema(
domain=dom,
sparse=True,
allows_duplicates=True,
attrs=[
tiledb.Attr(name="beta", dtype=np.float32, var=False,filters=tiledb.FilterList([tiledb.ZstdFilter(level=5)]) ),
tiledb.Attr(name="se", dtype=np.float32, var=False,filters=tiledb.FilterList([tiledb.ZstdFilter(level=5)])),
tiledb.Attr(name="freq", dtype=np.float32, var=False,filters=tiledb.FilterList([tiledb.ZstdFilter(level=5)])),
tiledb.Attr(name="alt", dtype=np.dtype('S5'), var=True,filters=tiledb.FilterList([tiledb.ZstdFilter(level=5)])),
tiledb.Attr(name="SNP", dtype=np.dtype('S20'), var=True,filters=tiledb.FilterList([tiledb.ZstdFilter(level=5)]))
]
)
tiledb.Array.create(uri, schema, ctx=cfg)
29 changes: 29 additions & 0 deletions gwasstudio/functions/process_and_ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from utils import compute_sha256
def process_and_ingest(file_path, uri, checksum_dict, dict_type, renaming_columns, attributes_columns, cfg):
# Read file with Dask
df = pd.read_csv(
file_path,
compression="gzip",
sep="\t",
usecols = attributes_columns
#usecols=["Chrom", "Pos", "Name", "effectAllele", "Beta", "SE", "ImpMAF"]
)
sha256 = compute_sha256(file_path)
# Rename columns and modify 'chrom' field
df = df.rename(columns = renaming_columns)
df["chrom"] = df["chrom"].str.replace('chr', '')
df["chrom"] = df["chrom"].str.replace('X', '23')
df["chrom"] = df["chrom"].str.replace('Y', '24')
# Add trait_id based on the checksum_dict
file_name = file_path.split('/')[-1]
df["trait_id"] = sha256

# Store the processed data in TileDB
tiledb.from_pandas(
uri=uri,
dataframe=df,
index_dims=["chrom", "pos", "trait_id"],
mode="append",
column_types=dict_type,
ctx = ctx
)
Loading
Loading