Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pagination #4

Merged
merged 4 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "orb_fdw"
version = "0.0.2"
version = "0.0.3"
edition = "2021"

[lib]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ This is a simple open-source data wrapper that bridges the gap between your Post

### Pre-requisistes

- have the v0.0.2 of `orb_fdw` extension enabled in your instance
- have the v0.0.3 of `orb_fdw` extension enabled in your instance

Create the foreign data wrapper:

Expand Down
2 changes: 1 addition & 1 deletion Trunk.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description = "Postgres Foreign Data Wrapper for withorb.com Backend API"
homepage = "https://github.com/tembo-io/orb_fdw"
documentation = "https://github.com/tembo-io/orb_fdw"
categories = ["connectors"]
version = "0.0.2"
version = "0.0.3"

[build]
postgres_version = "15"
Expand Down
Empty file added sql/orb_fdw--0.0.2--0.03.sql
Empty file.
89 changes: 57 additions & 32 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ fn body_to_rows(
}

#[wrappers_fdw(
version = "0.0.2",
author = "Jay",
website = "https://github.com/",
version = "0.0.3",
author = "Jay Kothari",
website = "https://github.com/orb_fdw",
error_type = "OrbFdwError"
)]
pub(crate) struct OrbFdw {
Expand All @@ -175,24 +175,36 @@ impl OrbFdw {
// TODO: will have to incorporate offset at some point
const PAGE_SIZE: usize = 500;

fn build_url(&self, obj: &str, _offset: usize) -> String {
fn build_url(&self, obj: &str, cursor: Option<String>) -> String {
let base_url = Self::DEFAULT_BASE_URL.to_owned();
let cursor_param = if let Some(ref cur) = cursor {
format!("&cursor={}", cur)
} else {
String::new()
};

match obj {
"customers" => {
let ret = format!("{}/customers?limit={}", base_url, Self::PAGE_SIZE);
ret
}
"subscriptions" => {
let ret = format!("{}/subscriptions?limit={}", base_url, Self::PAGE_SIZE);
ret
}
"invoices" => {
let ret = format! {"{}/invoices?limit={}", base_url, Self::PAGE_SIZE};
ret
}
"customers" => format!(
"{}/customers?limit={}{}",
base_url,
Self::PAGE_SIZE,
cursor_param
),
"subscriptions" => format!(
"{}/subscriptions?limit={}{}",
base_url,
Self::PAGE_SIZE,
cursor_param
),
"invoices" => format!(
"{}/invoices?limit={}{}",
base_url,
Self::PAGE_SIZE,
cursor_param
),
_ => {
warning!("unsupported object: {:#?}", obj);
return "".to_string();
"".to_string()
}
}
}
Expand Down Expand Up @@ -251,23 +263,36 @@ impl ForeignDataWrapper for OrbFdw {

if let Some(client) = &self.client {
let mut result = Vec::new();
let mut cursor: Option<String> = None;

let url = self.build_url(&obj, 0);

let body = self
.rt
.block_on(client.get(&url).send())
.and_then(|resp| {
resp.error_for_status()
.and_then(|resp| self.rt.block_on(resp.text()))
.map_err(reqwest_middleware::Error::from)
})
.unwrap();

let json: JsonValue = serde_json::from_str(&body).unwrap();
let mut rows = resp_to_rows(&obj, &json, columns);
result.append(&mut rows);
loop {
let url = self.build_url(&obj, cursor.clone()); // Ensure build_url handles None as initial cursor
let body = self
.rt
.block_on(client.get(&url).send())
.and_then(|resp| {
resp.error_for_status()
.and_then(|resp| self.rt.block_on(resp.text()))
.map_err(reqwest_middleware::Error::from)
})
.unwrap();

let json: serde_json::Value = serde_json::from_str(&body).unwrap();
let rows = resp_to_rows(&obj, &json, columns); // Assuming this function exists and works as intended
if rows.is_empty() {
break;
}
result.append(&mut rows.clone());
cursor = json
.get("pagination_metadata")
.and_then(|pm| pm.get("next_cursor"))
.and_then(|nc| nc.as_str())
.map(String::from);
// Break if there is no next cursor
if cursor.is_none() {
break;
}
}
self.scan_result = Some(result);
}
}
Expand Down
Loading