Skip to content

Commit

Permalink
[Fix Bug] Improve the performance of the importer.
Browse files Browse the repository at this point in the history
  • Loading branch information
yjcyxky committed Mar 13, 2024
1 parent 496fb4f commit 4fba06b
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 109 deletions.
33 changes: 22 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,39 +64,50 @@ docker-compose up -d

`Step 3`: Init Database, Check and Import Data

- Init database

```bash
# Init database
export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && biomedgps-cli initdb

# Check and import data, -t is the table name, -f is the data file path, -D is the delete flag
# Import entity data
## Check and import data, -t is the table name, -f is the data file path, -D is the delete flag
## Import entity data
export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && biomedgps-cli importdb -f /data/entity.tsv -t entity -D

# Import relation data
## Import relation data
export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && biomedgps-cli importdb -f /data/relation.tsv -t relation -D --dataset drkg

# Generate metadata for entity
## Generate metadata for entity
export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && biomedgps-cli importdb -f /data/entity.tsv -t entity_metadata -D

# Generate metadata for relation
## Generate metadata for relation
export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && biomedgps-cli importdb -f /data/relation_types.tsv -t relation_metadata -D

# Import entity2d data
## Import entity2d data
export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && biomedgps-cli importdb -f /data/entity2d.tsv -t entity2d -D

# Import entity embeddings
## Import entity embeddings
export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && biomedgps-cli importdb -f /data/entity_embeddings.tsv -t entity_embedding -D

# Import relation embeddings
## Import relation embeddings
export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && biomedgps-cli importdb -f /data/relation_embeddings.tsv -t relation_embedding -D
```

# Import entity data to graph database
- Init Graph Database

```bash
## Import entity data to graph database
export NEO4J_URL=neo4j://neo4j:password@localhost:7687 && biomedgps-cli importgraph -f /data/entities.tsv -t entity -b 1000

# Import relation data to graph database
## Import relation data to graph database
export NEO4J_URL=neo4j://neo4j:password@localhost:7687 && biomedgps-cli importgraph -f /data/relations.tsv -t relation -b 1000
```

- Make several cache tables for performance

```bash
export DATABASE_URL=postgres://postgres:password@localhost:5432/test_biomedgps && export NEO4J_URL=neo4j://neo4j:password@localhost:7687 && biomedgps-cli cachetable --table knowledge-score -T biomedgps
```

`Step 4`: Launch the platform, see more details on usage [here](#usage).

```bash
Expand Down
85 changes: 76 additions & 9 deletions src/bin/biomedgps-cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ extern crate log;
use biomedgps::model::init_db::create_kg_score_table;
use biomedgps::model::kge::{init_kge_models, DEFAULT_MODEL_NAME};
use biomedgps::model::{
init_db::{create_score_table, kg_score_table2graphdb},
init_db::{create_score_table, get_kg_score_table_name, kg_score_table2graphdb},
util::read_annotation_file,
};
use biomedgps::{
build_index, connect_graph_db, import_data, import_graph_data, import_kge, init_logger,
run_migrations,
};
use log::*;
use regex::Regex;
use sqlx::Row;
use std::path::PathBuf;
use std::sync::Arc;
use structopt::StructOpt;
Expand All @@ -32,14 +34,16 @@ struct Opt {
enum SubCommands {
#[structopt(name = "initdb")]
InitDB(InitDbArguments),
#[structopt(name = "inittable")]
InitTable(InitTableArguments),
#[structopt(name = "cachetable")]
CacheTable(CacheTableArguments),
#[structopt(name = "importdb")]
ImportDB(ImportDBArguments),
#[structopt(name = "importgraph")]
ImportGraph(ImportGraphArguments),
#[structopt(name = "importkge")]
ImportKGE(ImportKGEArguments),
#[structopt(name = "cleandb")]
CleanDB(CleanDBArguments),
}

/// Init database.
Expand All @@ -51,6 +55,19 @@ pub struct InitDbArguments {
database_url: Option<String>,
}

/// Clean database
#[derive(StructOpt, PartialEq, Debug)]
#[structopt(setting=structopt::clap::AppSettings::ColoredHelp, name="BioMedGPS - cleandb", author="Jingcheng Yang <[email protected]>")]
pub struct CleanDBArguments {
/// Database url, such as postgres://postgres:postgres@localhost:5432/rnmpdb or neo4j://<username>:<password>@localhost:7687, if not set, use the value of environment variable DATABASE_URL or NEO4J_URL.
#[structopt(name = "database_url", short = "d", long = "database-url")]
database_url: Option<String>,

/// Which table to clean. e.g. entity, relation, entity_metadata, relation_metadata, knowledge_curation, subgraph, entity2d, compound-disease-symptom, knowledge-score, embedding, graph etc.
#[structopt(name = "table", short = "t", long = "table")]
table: String,
}

/// Import data files into database.
#[derive(StructOpt, PartialEq, Debug)]
#[structopt(setting=structopt::clap::AppSettings::ColoredHelp, name="BioMedGPS - importdb", author="Jingcheng Yang <[email protected]>")]
Expand Down Expand Up @@ -108,14 +125,18 @@ pub struct ImportDBArguments {
show_all_errors: bool,
}

/// Init tables for performance. You must run this command after the importdb command.
/// Cache tables for performance. You must run this command after the importdb command.
#[derive(StructOpt, PartialEq, Debug)]
#[structopt(setting=structopt::clap::AppSettings::ColoredHelp, name="BioMedGPS - inittable", author="Jingcheng Yang <[email protected]>")]
pub struct InitTableArguments {
#[structopt(setting=structopt::clap::AppSettings::ColoredHelp, name="BioMedGPS - cachetable", author="Jingcheng Yang <[email protected]>")]
pub struct CacheTableArguments {
/// [Required] Database url, such as postgres://postgres:postgres@localhost:5432/rnmpdb, if not set, use the value of environment variable DATABASE_URL.
#[structopt(name = "database_url", short = "d", long = "database-url")]
database_url: Option<String>,

/// [Optional] Database host, such as postgres-ml:5432. Only needed when you run your application in a docker container and the database is in another container.
#[structopt(name = "db_host", short = "D", long = "db-host")]
db_host: Option<String>,

/// [Optional] Database url, such as neo4j://<username>:<password>@localhost:7687, if not set, use the value of environment variable NEO4J_URL.
#[structopt(name = "neo4j_url", short = "n", long = "neo4j-url")]
neo4j_url: Option<String>,
Expand All @@ -136,6 +157,15 @@ pub struct InitTableArguments {
default_value = DEFAULT_MODEL_NAME
)]
table_prefix: String,

/// [Optional] The batch size for caching table.
#[structopt(
name = "batch_size",
short = "b",
long = "batch-size",
default_value = "10000"
)]
batch_size: usize,
}

/// Import data files into a graph database.
Expand Down Expand Up @@ -290,7 +320,7 @@ async fn main() {
Err(e) => error!("Init database failed: {}", e),
}
}
SubCommands::InitTable(arguments) => {
SubCommands::CacheTable(arguments) => {
let database_url = arguments.database_url;

let database_url = if database_url.is_none() {
Expand Down Expand Up @@ -390,9 +420,43 @@ async fn main() {
error!("{}", "NEO4J_URL is not set, skip to import kg score table to graph database.");
std::process::exit(0);
} else {
let table_prefix = &arguments.table_prefix;
let table_name = get_kg_score_table_name(table_prefix);
let total = match sqlx::query(&format!(
"SELECT count(*) FROM {}",
table_name
))
.fetch_one(&pool)
.await
{
Ok(row) => row.get::<i64, _>("count"),
Err(e) => {
error!(
"Failed to get the total number of the records in the score table: {}",
e
);
std::process::exit(1);
}
};
// Use the regex to replace the database host and port.
let re = Regex::new(r"(.*//.*?@)[^/]*(/.*)").unwrap();
let database_url = if arguments.db_host.is_none() {
database_url
} else {
let caps = re.captures(&database_url).unwrap();
let db_host = arguments.db_host.unwrap();
format!("{}{}{}", &caps[1], db_host, &caps[2])
};
let graph = Arc::new(connect_graph_db(&neo4j_url).await);
match kg_score_table2graphdb(&pool, &graph, Some(&arguments.table_prefix))
.await
match kg_score_table2graphdb(
&database_url,
&graph,
Some(table_prefix),
total as usize,
arguments.batch_size,
false,
)
.await
{
Ok(_) => {
info!("Import kg score table to graph database successfully.")
Expand Down Expand Up @@ -572,5 +636,8 @@ async fn main() {
)
.await
}
SubCommands::CleanDB(arguments) => {
info!("To be implemented.")
}
}
}
19 changes: 11 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ use std::os::unix::fs::PermissionsExt;
use std::vec;

use crate::model::core::{
CheckData, Entity, Entity2D, KnowledgeCuration, Relation, RelationMetadata,
Subgraph,
CheckData, Entity, Entity2D, KnowledgeCuration, Relation, RelationMetadata, Subgraph,
};
use crate::model::graph::Node;
use crate::model::graph::{Edge, Node};
use crate::model::kge::{EntityEmbedding, LegacyRelationEmbedding, RelationEmbedding};
use crate::model::util::{
drop_records, drop_table, get_delimiter, import_file_in_loop, show_errors,
Expand Down Expand Up @@ -269,15 +268,15 @@ pub async fn prepare_relation_queries(
let query_string = if check_exist {
format!(
"MATCH (e1:{} {{idx: $source_idx}})
MATCH (e2:{} {{idx: $target_idx}})
MERGE (e1)-[r:`{}` {{resource: $resource, key_sentence: $key_sentence, pmids: $pmids, dataset: $dataset}}]->(e2)",
MATCH (e2:{} {{idx: $target_idx}})
MERGE (e1)-[r:`{}` {{resource: $resource, key_sentence: $key_sentence, pmids: $pmids, dataset: $dataset, idx: $relation_idx}}]->(e2)",
record.source_type, record.target_type, label
)
} else {
format!(
"MATCH (e1:{} {{idx: $source_idx}})
MATCH (e2:{} {{idx: $target_idx}})
CREATE (e1)-[r:`{}` {{resource: $resource, key_sentence: $key_sentence, pmids: $pmids, dataset: $dataset}}]->(e2)",
MATCH (e2:{} {{idx: $target_idx}})
CREATE (e1)-[r:`{}` {{resource: $resource, key_sentence: $key_sentence, pmids: $pmids, dataset: $dataset, idx: $relation_idx}}]->(e2)",
record.source_type, record.target_type, label
)
};
Expand All @@ -294,7 +293,11 @@ pub async fn prepare_relation_queries(
.param("pmids", pmids)
.param("resource", record.resource)
.param("key_sentence", key_sentence)
.param("dataset", dataset);
.param("dataset", dataset)
.param(
"relation_idx",
Edge::format_id(&record.source_id, &label, &record.target_id),
);

queries.push(query);
}
Expand Down
24 changes: 12 additions & 12 deletions src/model/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,10 @@ pub struct Edge {
}

impl Edge {
pub fn format_id(source_id: &str, relation_type: &str, target_id: &str) -> String {
format!("{}-{}-{}", source_id, relation_type, target_id)
}

/// Create a new edge.
pub fn new(
relation_type: &str,
Expand All @@ -527,7 +531,7 @@ impl Edge {
target_type: &str,
distance: Option<f64>,
) -> Self {
let relid = format!("{}-{}-{}", source_id, relation_type, target_id);
let relid = Edge::format_id(source_id, relation_type, target_id);

Edge {
relid: relid.clone(),
Expand All @@ -554,10 +558,7 @@ impl Edge {
/// Create a new edge from an EdgeData struct.
pub fn from_edge_data(edge: &EdgeData) -> Self {
Edge {
relid: format!(
"{}-{}-{}",
edge.source_id, edge.relation_type, edge.target_id
),
relid: Edge::format_id(&edge.source_id, &edge.relation_type, &edge.target_id),
source: Node::format_id(&edge.source_type, &edge.source_id),
category: "edge".to_string(),
target: Node::format_id(&edge.target_type, &edge.target_id),
Expand All @@ -569,10 +570,7 @@ impl Edge {

/// It will convert the [`Relation`](struct.Relation.html) struct to the [`Edge`](struct.Edge.html) struct.
pub fn from_relation(relation: &Relation) -> Self {
let relid = format!(
"{}-{}-{}",
relation.source_id, relation.relation_type, relation.target_id
);
let relid = Edge::format_id(&relation.source_id, &relation.relation_type, &relation.target_id);
Edge {
relid: relid.clone(),
source: Node::format_id(&relation.source_type, &relation.source_id),
Expand All @@ -585,10 +583,12 @@ impl Edge {
}

pub fn from_curated_knowledge(knowledge: &KnowledgeCuration) -> Self {
let relid = format!(
"{}-{}-{}",
knowledge.source_id, knowledge.relation_type, knowledge.target_id
let relid = Edge::format_id(
&knowledge.source_id,
&knowledge.relation_type,
&knowledge.target_id,
);

Edge {
relid: relid.clone(),
source: Node::format_id(&knowledge.source_type, &knowledge.source_id),
Expand Down
Loading

0 comments on commit 4fba06b

Please sign in to comment.