Skip to content

Commit

Permalink
Created stage files for the pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
jpmikhail committed Dec 18, 2024
1 parent 819c11c commit b1f426d
Show file tree
Hide file tree
Showing 9 changed files with 443 additions and 2 deletions.
8 changes: 8 additions & 0 deletions stages/00_update_biobricks_deps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import os
import shutil
import subprocess
import urllib3

# Clean up and initialize biobricks
shutil.rmtree('.bb', ignore_errors=True)
subprocess.run('biobricks init && biobricks add ecotox', shell=True)
2 changes: 0 additions & 2 deletions stages/01-stage.R

This file was deleted.

157 changes: 157 additions & 0 deletions stages/01_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import biobricks as bb
import pandas as pd
import pyarrow.parquet as pq
import json
import pathlib
import shutil
from tqdm import tqdm
from rdflib import Graph, Literal, Namespace, RDF, URIRef
import subprocess

tqdm.pandas()

# cachedir for ttl files, if needed
cachedir = pathlib.Path('cache/process')
cachedir.mkdir(parents=True, exist_ok=True)
# remove unneeded files after processing

# outdir should be brick (hdt file only)
outdir = pathlib.Path('./brick')
outdir.mkdir(parents=True, exist_ok=True)

print('Reading annotations ...')
pa_brick = bb.assets('ecotox')
print("Done reading annotations")
print(pa_brick.annotations_parquet)
# pa_brick has a single table `annotations_parquet`
# use pyarrow to read the parquet file in chunks
rawpa = pq.ParquetFile(pa_brick.annotations_parquet)
n_row = rawpa.metadata.num_rows
print(f"Number of rows: {n_row}")

# get row0 and make it json for a pretty print
row_group0 = rawpa.read_row_group(0).to_pandas()
row0 = row_group0.iloc[0]
print(json.dumps(row0.apply(str).to_dict(), indent=4))



# Define namespaces
namespaces_sources = {
"rdf" : "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"ecotoxcompound" : "http://rdf.ncbi.nlm.nih.gov/ecotox/compound/",
"ecotoxsubstance" : "http://rdf.ncbi.nlm.nih.gov/ecotox/substance/",
"ecotoxannotation" : "http://rdf.ncbi.nlm.nih.gov/ecotox/annotation/",
"oa" : "http://www.w3.org/ns/oa#",
"dc" : "http://purl.org/dc/elements/1.1/",
}

namespaces = {key: Namespace(val) for key, val in namespaces_sources.items()}

''' VISUAL REPRESENTATION OF GRAPH COMPONENT
*markup is short for string_with_markup
+------------------+
+-------| annotation_iri |
| +------------------+
| | |
RDF.type | OA.hasBody
| | |
v | | +-------------------+
+-----------------+ | +->| body |
| OA.Annotation | | +-------------------+
+-----------------+ | | |
| RDF.value DC["format"]
| | |
| v v
| +-----------------+ +-----------------------+
| | Literal(markup) | | Literal("text/plain") |
| +-----------------+ +-----------------------+
|
OA.hasTarget / DC.subject
|
+---------+--o--------------+-------------+
| | | |
v | v |
+-------------------+ | +-------------------+ |
| compound_iri_1 | | ... | compound_iri_m | |
+-------------------+ | +-------------------+ |
v v
+-------------------+ +-------------------+
| substance_iri_1 | ... | substance_iri_n |
+-------------------+ +-------------------+
'''

batch_size = 10000
n_batch = n_row // batch_size + 1
batch_num = -1
# loop through rawpa, creating a chemical for each row
for batch in tqdm(rawpa.iter_batches(batch_size), total = n_batch, desc = "Processing batches"):
batch_num += 1
batch_df = batch.to_pandas()
# Create a new RDF graph
g = Graph()
# Bind namespaces
for key, val in namespaces.items():
g.bind(key, val)

for index, row in batch_df.iterrows():
cid = row['EcotoxCID']
sid = row['EcotoxSID']
anid = row['ANID']

# Create URIs
annotation_iri = URIRef(namespaces["ecotoxannotation"] + f"ANID{anid}")
compound_iri = [URIRef(namespaces["ecotoxcompound"] + f"CID{c}") for c in cid]
substance_iri = [URIRef(namespaces["ecotoxsubstance"] + f"CID{s}") for s in sid]

# create the value for the annotation
# # Parse the Data Field as JSON
data = json.loads(row['Data'])
# # annotation may have multiple values
string_with_markup_list = [markup.get('String', '') for markup in data.get('Value', {}).get('StringWithMarkup', [])]

# add triples to the graph
g.add((annotation_iri, RDF.type, namespaces["oa"].Annotation))

# add the CID to the annotation, skip if there are no CIDs
for ci in compound_iri:
g.add((annotation_iri, namespaces["oa"].hasTarget, ci))
g.add((annotation_iri, namespaces["dc"].subject, ci))

# add SID to the annotation, skip if there are no SIDs
for si in substance_iri:
g.add((annotation_iri, namespaces["oa"].hasTarget, si))
g.add((annotation_iri, namespaces["dc"].subject, si))

body = URIRef(f"{annotation_iri}/body")
g.add((annotation_iri, namespaces["oa"].hasBody, body))
# triple quotes used to allow multi-line strings
for swm in string_with_markup_list:
g.add((body, RDF.value, Literal(swm)))

g.add((body, namespaces["dc"]["format"], Literal("text/plain")))

# Serialize the graph to a string in Turtle format
turtle_file = str(cachedir / f"annotations_{batch_num}.ttl")
g.serialize(destination=turtle_file, format='turtle')

# Convert the Turtle file into an HDT file
hdt_file = str(cachedir / f"annotations_{batch_num}.hdt")
subprocess.run(["rdf2hdt.sh", "-rdftype", "turtle", turtle_file, hdt_file], check=True)


print("Combining HDT files ...")
hdt_combined = str(outdir / 'annotations.hdt')
subprocess.run(
[
"hdtCat.sh",
str(cachedir) + "/annotations_*.hdt",
hdt_combined
],
check=True
)
print(f"Done writing HDT file to {hdt_file}")

# delete cache directory
shutil.rmtree(pathlib.Path('cache'))
183 changes: 183 additions & 0 deletions stages/01_process_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import biobricks as bb
import pandas as pd
import pyarrow.parquet as pq
import json
import pathlib
import shutil
from tqdm import tqdm
from rdflib import Graph, Literal, Namespace, RDF, URIRef
import subprocess
from multiprocessing import Pool

def process_single_batch(args):
"""
Process a single batch of data into Turtle and HDT files.
"""
batch_num, batch, namespaces, cachedir = args

# Convert the Arrow table batch to a pandas DataFrame
batch_df = batch.to_pandas()

# Create a new RDF graph
g = Graph()

# Bind namespaces
for key, val in namespaces.items():
g.bind(key, val)

# Iterate over rows in the batch DataFrame
for index, row in batch_df.iterrows():
cid = row['EcotoxCID']
sid = row['EcotoxSID']
anid = row['ANID']

# Create URIs
annotation_iri = URIRef(namespaces["ecotoxannotation"] + f"ANID{anid}")
compound_iri = [URIRef(namespaces["ecotoxcompound"] + f"CID{c}") for c in cid]
substance_iri = [URIRef(namespaces["ecotoxsubstance"] + f"CID{s}") for s in sid]

# Parse the data field as JSON
data = json.loads(row['Data'])
# annotation may have multiple values
string_with_markup_list = [markup.get('String', '') for markup in data.get('Value', {}).get('StringWithMarkup', [])]

# Add triples to the graph
g.add((annotation_iri, RDF.type, namespaces["oa"].Annotation))

# Add the CID to the annotation
for ci in compound_iri:
g.add((annotation_iri, namespaces["oa"].hasTarget, ci))
g.add((annotation_iri, namespaces["dc"].subject, ci))

# Add the SID to the annotation
for si in substance_iri:
g.add((annotation_iri, namespaces["oa"].hasTarget, si))
g.add((annotation_iri, namespaces["dc"].subject, si))

# Add body
body = URIRef(f"{annotation_iri}/body")
g.add((annotation_iri, namespaces["oa"].hasBody, body))
for swm in string_with_markup_list:
g.add((body, RDF.value, Literal(swm)))

g.add((body, namespaces["dc"]["format"], Literal("text/plain")))

# Serialize the graph to Turtle format
turtle_file = str(cachedir / f"annotations_{batch_num}.ttl")
g.serialize(destination=turtle_file, format='turtle')

# Convert the Turtle file into an HDT file
hdt_file = str(cachedir / f"annotations_{batch_num}.hdt")
subprocess.run(["rdf2hdt.sh", "-rdftype", "turtle", turtle_file, hdt_file], check=True)

return None

# Instead of listing all batches, use a generator function
def batch_generator(rawpa, batch_size, namespaces, cachedir):
for i, batch in enumerate(rawpa.iter_batches(batch_size), start=1):
yield (i, batch, namespaces, cachedir)

tqdm.pandas()

# cachedir for ttl files, if needed
cachedir = pathlib.Path('cache/process')
cachedir.mkdir(parents=True, exist_ok=True)
# remove unneeded files after processing

# outdir should be brick (hdt file only)
outdir = pathlib.Path('./brick')
outdir.mkdir(parents=True, exist_ok=True)

print('Reading annotations ...')
pa_brick = bb.assets('ecotox')
print("Done reading annotations")
print(pa_brick.annotations_parquet)
# pa_brick has a single table `annotations_parquet`
# use pyarrow to read the parquet file in chunks
rawpa = pq.ParquetFile(pa_brick.annotations_parquet)
n_row = rawpa.metadata.num_rows
print(f"Number of rows: {n_row}")

# get row0 and make it json for a pretty print
row_group0 = rawpa.read_row_group(0).to_pandas()
row0 = row_group0.iloc[0]
print(json.dumps(row0.apply(str).to_dict(), indent=4))



# Define namespaces
namespaces_sources = {
"rdf" : "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
"ecotoxcompound" : "http://rdf.ncbi.nlm.nih.gov/ecotox/compound/",
"ecotoxsubstance" : "http://rdf.ncbi.nlm.nih.gov/ecotox/substance/",
"ecotoxannotation" : "http://rdf.ncbi.nlm.nih.gov/ecotox/annotation/",
"oa" : "http://www.w3.org/ns/oa#",
"dc" : "http://purl.org/dc/elements/1.1/",
}

namespaces = {key: Namespace(val) for key, val in namespaces_sources.items()}

''' VISUAL REPRESENTATION OF GRAPH COMPONENT
*markup is short for string_with_markup
+------------------+
+-------| annotation_iri |
| +------------------+
| | |
RDF.type | OA.hasBody
| | |
v | | +-------------------+
+-----------------+ | +->| body |
| OA.Annotation | | +-------------------+
+-----------------+ | | |
| RDF.value DC["format"]
| | |
| v v
| +-----------------+ +-----------------------+
| | Literal(markup) | | Literal("text/plain") |
| +-----------------+ +-----------------------+
|
OA.hasTarget / DC.subject
|
+---------+--o--------------+-------------+
| | | |
v | v |
+-------------------+ | +-------------------+ |
| compound_iri_1 | | ... | compound_iri_m | |
+-------------------+ | +-------------------+ |
v v
+-------------------+ +-------------------+
| substance_iri_1 | ... | substance_iri_n |
+-------------------+ +-------------------+
'''

batch_size = 10000
n_batch = n_row // batch_size + 1

## Use a Pool and imap to process batches in parallel
#with Pool() as pool:
# # Pass the generator directly to pool.imap
# for _ in tqdm(
# pool.imap(
# process_single_batch,
# batch_generator(rawpa, batch_size, namespaces, cachedir)
# ),
# total = n_batch,
# desc="Processing batches"
# ):
# pass

print("Combining HDT files ...")
hdt_combined = str(outdir / 'annotations.hdt')
subprocess.run(
[
"hdtCat.sh",
str(cachedir) + "/annotations_*.hdt",
hdt_combined
],
check=True
)
print(f"Done writing HDT file to {hdt_file}")

# delete cache directory
shutil.rmtree(pathlib.Path('cache'))
42 changes: 42 additions & 0 deletions stages/02_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import rdflib
import pathlib
from rdflib_hdt import HDTStore

outdir = pathlib.Path('cache/test')
outdir.mkdir(parents=True, exist_ok=True)

# Path to the HDT file
hdt_file = pathlib.Path('brick/annotations.hdt')

try:
# Load the HDT file into an RDFLib graph
store = HDTStore(hdt_file.as_posix())
graph = rdflib.Graph(store=store)

# Count the number of triples
triple_count = sum(1 for _ in graph.triples((None, None, None)))

# Generate metadata
metadata = {
"triple_count": triple_count,
"namespaces": list(graph.namespaces()),
"sample_triples": list(graph.triples((None, None, None)))[:5] # Limit to first 5 triples
}

# Write metadata to a file
metadata_file = outdir / "test.txt"
with open(metadata_file, "w") as f:
f.write(f"Triple Count: {metadata['triple_count']}\n")
f.write("Namespaces:\n")
for prefix, uri in metadata['namespaces']:
f.write(f" {prefix}: {uri}\n")
f.write("Sample Triples:\n")
for s, p, o in metadata['sample_triples']:
f.write(f" {s} {p} {o}\n")

print(f"Metadata written to {metadata_file}")

except Exception as e:
# Explicitly fail if the graph fails to load
print(f"Failed to parse the graph: {e}")
raise
Loading

0 comments on commit b1f426d

Please sign in to comment.