Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2024 05 26 0 #22

Merged
merged 4 commits into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 133 additions & 127 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 10 additions & 7 deletions rapiddb/src/db/mmav_db/mmav.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::db::mmav_db::mmav_unit::MMAVUnit;
use crate::errors::MMAVError;
use crate::errors::Error;

/// Memory Mapped Append-only Vector
///
Expand Down Expand Up @@ -148,12 +148,15 @@ impl MMAV {
return unit_map[&index].len();
}

let unit = MMAVUnit::new(&format!("{id}/{index}"), size, data_start_index);
let result = unit.len();
let mut result: usize = Default::default();

unit_map.insert(index, unit);
let _ = MMAVUnit::new(&format!("{id}/{index}"), size, data_start_index)
.map(|unit| {
result = unit.len();
unit_map.insert(index, unit);
});

result
return result;
}

/// Load unit that contains `index`
Expand Down Expand Up @@ -231,11 +234,11 @@ impl MMAV {
pub fn push(&mut self, value: &[u8]) {
self.unit_map.get_mut(&self.index).unwrap().push(value).unwrap_or_else(
|error| match error {
MMAVError::ArrayFull => {
Error::ArrayFull => {
self.expand();
self.push(value);
}
MMAVError::FileFull => {
Error::FileFull => {
self.expand();
self.push(value);
}
Expand Down
8 changes: 6 additions & 2 deletions rapiddb/src/db/mmav_db/mmav_async_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl MMAVAsyncDatabase {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&file_name)
.unwrap_or_else(|error| {
if error.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -126,6 +127,7 @@ impl MMAVAsyncDatabase {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(file_name)
.unwrap()
});
Expand Down Expand Up @@ -155,7 +157,7 @@ impl Default for MMAVAsyncDatabase {
#[async_trait::async_trait]
impl IAsyncDatabase for MMAVAsyncDatabase {
async fn contains(&self, id: &str) -> bool {
self.sensors.get(id).is_some()
self.sensors.contains_key(id)
}

async fn get(&mut self, id: &str, rec_id: usize) -> Vec<u8> {
Expand Down Expand Up @@ -215,6 +217,7 @@ impl IAsyncDatabase for MMAVAsyncDatabase {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&file_name)
.unwrap_or_else(|error| {
if error.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -234,6 +237,7 @@ impl IAsyncDatabase for MMAVAsyncDatabase {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(file_name)
.unwrap()
});
Expand All @@ -243,7 +247,7 @@ impl IAsyncDatabase for MMAVAsyncDatabase {
}

async fn get_aggregates(&self, id: &str) -> Vec<u8> {
if self.aggregates.get(id).is_none() {
if !self.aggregates.contains_key(id) {
return Default::default();
}

Expand Down
8 changes: 6 additions & 2 deletions rapiddb/src/db/mmav_db/mmav_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl MMAVDatabase {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&file_name)
.unwrap_or_else(|error| {
if error.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -126,6 +127,7 @@ impl MMAVDatabase {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(file_name)
.unwrap()
});
Expand Down Expand Up @@ -154,7 +156,7 @@ impl Default for MMAVDatabase {
}
impl IDatabase for MMAVDatabase {
fn contains(&self, id: &str) -> bool {
self.sensors.get(id).is_some()
self.sensors.contains_key(id)
}

fn get(&mut self, id: &str, rec_id: usize) -> Vec<u8> {
Expand Down Expand Up @@ -214,6 +216,7 @@ impl IDatabase for MMAVDatabase {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&file_name)
.unwrap_or_else(|error| {
if error.kind() == std::io::ErrorKind::NotFound {
Expand All @@ -233,6 +236,7 @@ impl IDatabase for MMAVDatabase {
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(file_name)
.unwrap()
});
Expand All @@ -242,7 +246,7 @@ impl IDatabase for MMAVDatabase {
}

fn get_aggregates(&self, id: &str) -> Vec<u8> {
if self.aggregates.get(id).is_none() {
if !self.aggregates.contains_key(id) {
return Default::default();
}

Expand Down
96 changes: 37 additions & 59 deletions rapiddb/src/db/mmav_db/mmav_unit.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::errors::MMAVError;
use crate::errors::Error;

/// Memory Mapped Append-only Vector Unit
///
Expand All @@ -24,7 +24,6 @@ impl MMAVUnit {
/// Memory Mapped Append-only Vector Unit Constructor
///
/// ## Default params:
///
/// `size` = 4000000
///
/// `data_start_index` = 80008
Expand All @@ -37,59 +36,53 @@ impl MMAVUnit {
/// unit.push(data).unwrap_or_default();
/// assert_eq!(unit.last(), data);
/// ```
pub fn new(file_name: &str, size: usize, data_start_index: usize) -> Self {
let file_did_exist = std::path::Path::new(file_name).exists();
pub fn new(
file_name: &str,
size: usize,
data_start_index: usize,
) -> Result<Self, Error> {
let file_path = std::path::Path::new(file_name);
let file_exists = file_path.exists();

if (!file_exists) {
file_path.parent().map(std::fs::create_dir_all);
}

let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(file_name)
.unwrap_or_else(|error| {
if error.kind() == std::io::ErrorKind::NotFound {
std::fs::create_dir(file_name.split('/').collect::<Vec<_>>()[0])
.unwrap_or_default();
}

std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(file_name)
.unwrap()
});

file.set_len(size as u64).unwrap_or_default();

let mut mmap = unsafe { memmap2::MmapMut::map_mut(&file).unwrap() };
.truncate(false)
.open(file_name)?;

file.set_len(size as u64)?;

let mut mmap = unsafe { memmap2::MmapMut::map_mut(&file)? };
mmap.advise(memmap2::Advice::Random).unwrap_or_default();

let mut seek = data_start_index;
if file_did_exist {
seek = u32::from_ne_bytes(mmap[0..4].try_into().unwrap()) as usize;
if (file_exists) {
seek = u32::from_ne_bytes(mmap[0..4].try_into()?) as usize;

if seek > mmap.len() {
panic!(
"seek_index must be between {data_start_index} and {}",
mmap.len()
);
return Err(Error::IndexOutOfRange);
}
}

let mut seek_index = 8;
if file_did_exist {
seek_index = u32::from_ne_bytes(mmap[4..8].try_into().unwrap()) as usize;
if (file_exists) {
seek_index = u32::from_ne_bytes(mmap[4..8].try_into()?) as usize;

if seek_index > data_start_index {
panic!("seek_index must be between 8 and {data_start_index}");
return Err(Error::IndexOutOfRange);
}
}

if mmap[seek] == 0 {
if (mmap[seek] == 0) {
mmap[seek] = 0;
}

Self { seek, seek_index, mmap, data_start_index }
return Ok(Self { seek, seek_index, mmap, data_start_index });
}

/// Set seek to `len`
Expand All @@ -116,12 +109,6 @@ impl MMAVUnit {

/// Push `value` to vector
///
/// ## Errors
/// ```ignore
/// MMAVError::ArrayFull
/// MMAVError::FileFull
/// ```
///
/// ## Examples
/// ```ignore
/// let mut unit = MMAVUnit::new("test-0/0", 4000000, 80008);
Expand All @@ -130,13 +117,13 @@ impl MMAVUnit {
/// unit.push(data).unwrap_or_default();
/// assert_eq!(unit.last(), data);
/// ```
pub fn push(&mut self, value: &[u8]) -> Result<(), MMAVError> {
pub fn push(&mut self, value: &[u8]) -> Result<(), Error> {
if self.len() > 9999 {
return Err(MMAVError::ArrayFull);
return Err(Error::ArrayFull);
}

if self.seek + value.len() > self.mmap.len() {
return Err(MMAVError::FileFull);
return Err(Error::FileFull);
}

self.mmap[self.seek..self.seek + value.len()].clone_from_slice(value);
Expand All @@ -147,13 +134,6 @@ impl MMAVUnit {

/// Get `index` from vector
///
/// ## Errors
/// ```ignore
/// MMAVError::ArrayEmpty
/// MMAVError::IndexOutOfRange
/// MMAVError::IndexOutOfBounds
/// ```
///
/// ## Examples
/// ```ignore
/// let mut unit = MMAVUnit::new("test-0/0", 4000000, 80008);
Expand All @@ -162,32 +142,30 @@ impl MMAVUnit {
/// unit.push(data).unwrap_or_default();
/// assert_eq!(unit.get(0), data);
/// ```
pub fn get(&self, index: usize) -> Result<Vec<u8>, MMAVError> {
pub fn get(&self, index: usize) -> Result<Vec<u8>, Error> {
if self.seek_index == 8 {
return Err(MMAVError::ArrayEmpty);
return Err(Error::ArrayEmpty);
}

if index > 9999 {
return Err(MMAVError::IndexOutOfRange);
return Err(Error::IndexOutOfRange);
}

if index > self.len() - 1 {
return Err(MMAVError::IndexOutOfBounds);
return Err(Error::IndexOutOfBounds);
}

let i = 8 * index + 8;

let start =
u32::from_ne_bytes(self.mmap[i..i + 4].try_into().unwrap()) as usize;
let end =
u32::from_ne_bytes(self.mmap[i + 4..i + 8].try_into().unwrap()) as usize;
let start = u32::from_ne_bytes(self.mmap[i..i + 4].try_into()?) as usize;
let end = u32::from_ne_bytes(self.mmap[i + 4..i + 8].try_into()?) as usize;

if start < self.data_start_index || start > self.mmap.len() {
return Err(MMAVError::IndexOutOfRange);
return Err(Error::IndexOutOfRange);
}

if end < self.data_start_index || end > self.mmap.len() {
return Err(MMAVError::IndexOutOfRange);
return Err(Error::IndexOutOfRange);
}

Ok((self.mmap[start..end]).to_vec())
Expand Down
32 changes: 0 additions & 32 deletions rapiddb/src/errors/mmav_error.rs

This file was deleted.

Loading
Loading