Skip to content

Commit

Permalink
replicate: implement server side data model
Browse files Browse the repository at this point in the history
  • Loading branch information
TheButlah committed Jun 23, 2024
1 parent 49e55be commit a75cd51
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions crates/replicate/client/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@
use eyre::{bail, Result, WrapErr};
use futures::{SinkExt, StreamExt};
use replicate_common::data_model::{
entity::EntityId, DataModel, LocalChanges, RemoteChanges, State,
use replicate_common::{
data_model::{entity::EntityId, DataModel, LocalChanges, RemoteChanges, State},
ClientId,
};
use url::Url;

Expand Down Expand Up @@ -130,7 +131,7 @@ impl Instance {

// Do handshake before anything else
let local_namespace = {
rpc.send(Sb::HandshakeRequest)
rpc.send(Sb::HandshakeRequest(ClientId::random()))
.await
.wrap_err("failed to send handshake request")?;
let Some(msg) = rpc.next().await else {
Expand Down
2 changes: 1 addition & 1 deletion crates/replicate/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ description = "Shared code between replicate-client and replicate-server"
publish = false

[dependencies]
bytes.workspace = true
bytes = { workspace = true, features = ["serde"] }
futures.workspace = true
pin-project.workspace = true
rand.workspace = true
Expand Down
6 changes: 0 additions & 6 deletions crates/replicate/common/src/data_model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ use self::entity::{EntityId, Index, Namespace};

pub type EntityMap<T> = HashMap<EntityId, T>;

#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum SpawnedBy {
Local,
Remote,
}

/// The state of an entity.
pub type State = bytes::Bytes;

Expand Down
38 changes: 34 additions & 4 deletions crates/replicate/common/src/messages/instance.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,45 @@
use std::collections::{HashMap, HashSet};

use serde::{Deserialize, Serialize};

use crate::data_model::entity::{EntityId, Index, Namespace};
use crate::{
data_model::{
entity::{Index, Namespace},
State,
},
ClientId,
};

#[derive(Serialize, Deserialize, Eq, PartialEq)]
pub enum Serverbound {
HandshakeRequest,
SpawnEntities { idxs: Vec<Index> },
HandshakeRequest(ClientId),
SpawnEntities {
// Namespace is implied, clients always spawn on their own namespace
states: HashMap<Index, State>,
},
UpdateEntities {
namespace: Namespace,
states: HashMap<Index, State>,
},
DespawnEntities {
namespace: Namespace,
entities: HashSet<Index>,
},
}

#[derive(Serialize, Deserialize, Eq, PartialEq)]
pub enum Clientbound {
HandshakeResponse(Namespace),
SpawnEntities { ids: Vec<EntityId> },
SpawnEntities {
namespace: Namespace,
states: HashMap<Index, State>,
},
UpdateEntities {
namespace: Namespace,
states: HashMap<Index, State>,
},
DespawnEntities {
namespace: Namespace,
states: HashSet<Index>,
},
}
3 changes: 2 additions & 1 deletion crates/replicate/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ publish = false

[dependencies]
base64.workspace = true
bytes.workspace = true
bytes = { workspace = true, features = ["serde"] }
clap.workspace = true
color-eyre.workspace = true
dashmap = "5.5.3"
Expand All @@ -20,6 +20,7 @@ futures.workspace = true
rand.workspace = true
replicate-common.path = "../common"
serde.workspace = true
thiserror.workspace = true
tokio-serde = { workspace = true, features = ["json"] }
tokio-util = { workspace = true, features = ["codec"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
Expand Down
6 changes: 3 additions & 3 deletions crates/replicate/server/src/instance/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ use dashmap::DashMap;
use replicate_common::InstanceId;
use tracing::debug;

use super::Instance;
use super::DataModel;

#[derive(Default, Debug)]
pub struct InstanceManager {
instances: DashMap<InstanceId, Instance>,
instances: DashMap<InstanceId, DataModel>,
}

impl InstanceManager {
pub fn instance_create(&self) -> InstanceId {
let instance = Instance::default();
let instance = DataModel::default();
// TODO: seed random numbers for determinism?
let id = InstanceId::random();
self.instances.insert(id, instance);
Expand Down
192 changes: 164 additions & 28 deletions crates/replicate/server/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,93 @@ pub mod manager;

use std::fmt::Debug;

use eyre::{bail, ensure, Result, WrapErr as _};
use dashmap::DashMap;
use eyre::{bail, Result, WrapErr as _};
use futures::{SinkExt as _, StreamExt as _};
use replicate_common::{
data_model::entity::Namespace,
data_model::{
entity::{EntityId, Namespace},
State,
},
messages::instance::{Clientbound as Cb, Serverbound as Sb},
ClientId,
};
use tracing::{info, instrument};
use tracing::{info, instrument, trace};
use wtransport::endpoint::SessionRequest;

pub const URL_PREFIX: &str = "instances";

type Framed = replicate_common::Framed<wtransport::stream::BiStream, Sb, Cb>;

#[derive(thiserror::Error, Debug, Eq, PartialEq)]
#[error("rejected client's attempt to claim authority")]
pub struct DeniedAuthorityErr;

/// Data stored for each entity
#[derive(Debug, Eq, PartialEq)]
struct EntityData {
current_state: State,
authority: ClientId,
}

#[derive(Debug, Default)]
pub struct Instance {}
pub struct DataModel {
data: DashMap<EntityId, EntityData>,
}

impl DataModel {
fn spawn(&self, entity: EntityId, state: State, authority: ClientId) {
let insert_result = self.data.insert(
entity,
EntityData {
current_state: state,
authority,
},
);
if let Some(already_exists) = insert_result {
if already_exists.authority != authority {
panic!("should have been impossible, we should only spawn on validated namespaces");
}
// The spawn event is out of date, so we ignore it
trace!("stale spawn message");
}
}

fn update(
&self,
entity: EntityId,
state: State,
authority: ClientId,
) -> Result<(), DeniedAuthorityErr> {
let insert_result = self.data.insert(
entity,
EntityData {
current_state: state,
authority,
},
);
if insert_result.is_none() {
trace!("missed a spawn message, spawning entity and updating state anyway.")
}

Ok(())
}

fn despawn(
&self,
entity: EntityId,
_authority: ClientId,
) -> Result<(), DeniedAuthorityErr> {
let remove_result = self.data.remove(&entity);
if remove_result.is_none() {
trace!("tried to despawn a nonexistent entity, ignoring");
}
Ok(())
}
}

#[derive(derive_more::Deref, derive_more::DerefMut)]
struct Rng(pub Box<dyn rand::RngCore + Send>);
struct Rng(pub Box<dyn rand::RngCore + Send + Sync>);

impl Debug for Rng {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -37,24 +106,27 @@ impl Debug for Rng {
#[derive(Debug)]
pub struct InstanceCtx {
rng: Rng,
dm: DataModel,
}

/// Information specific to a particular client, looked up via
pub struct ClientCtx {
id: ClientId,
namespace: Namespace,
}

impl InstanceCtx {
pub fn new(rng: impl rand::Rng + Sized + Send + 'static) -> Self {
pub fn new(rng: impl rand::Rng + Sized + Send + Sync + 'static) -> Self {
Self {
rng: Rng(Box::new(rng)),
dm: DataModel::default(),
}
}
}

fn _assert_send() {
fn helper(_: impl Send) {}
helper(InstanceCtx::new(rand::rngs::OsRng));
}

#[instrument(skip_all, name = "instance")]
pub async fn handle_connection(
mut ctx: InstanceCtx,
mut instance_ctx: InstanceCtx,
session_request: SessionRequest,
) -> Result<()> {
debug_assert!(session_request
Expand All @@ -72,31 +144,95 @@ pub async fn handle_connection(
let mut framed = Framed::new(bi);

// Do handshake before anything else
{
let client_ctx = {
let Some(msg) = framed.next().await else {
bail!("Client disconnected before completing handshake");
};
let msg = msg.wrap_err("error while receiving handshake message")?;
ensure!(
msg == Sb::HandshakeRequest,
"invalid message during handshake"
);
let Sb::HandshakeRequest(client_id) =
msg.wrap_err("error while receiving handshake message")?
else {
bail!("invalid message during handshake")
};
let namespace = Namespace(instance_ctx.rng.next_u64());
framed
.send(Cb::HandshakeResponse(Namespace(ctx.rng.next_u64())))
.send(Cb::HandshakeResponse(namespace))
.await
.wrap_err("failed to send handshake response")?;
}

while let Some(request) = framed.next().await {
let request: Sb = request.wrap_err("error while receiving message")?;
match request {
Sb::HandshakeRequest => {
bail!("already did handshake, another handshake is unexpected")
ClientCtx {
id: client_id,
namespace,
}
};

let reliable_fut = async {
while let Some(request) = framed.next().await {
let request: Sb = request.wrap_err("error while receiving message")?;
handle_state_update(&instance_ctx, &client_ctx, request)?;
}
info!("Client disconnected");
Ok::<(), eyre::Report>(())
};

let unreliable_fut = async { Ok::<(), eyre::Report>(()) };

let _: ((), ()) = tokio::try_join!(reliable_fut, unreliable_fut)?;

Ok(())
}

fn handle_state_update(
instance_ctx: &InstanceCtx,
client_ctx: &ClientCtx,
message: Sb,
) -> Result<()> {
match message {
Sb::HandshakeRequest(_) => {
bail!("already did handshake, another handshake is unexpected");
}
Sb::SpawnEntities { states } => {
for (idx, state) in states {
instance_ctx.dm.spawn(
EntityId {
namespace: client_ctx.namespace,
idx,
},
state,
client_ctx.id,
)
}
}
Sb::UpdateEntities { namespace, states } => {
for (idx, state) in states {
// TODO: Decide how to handle authority problems.
let _ = instance_ctx.dm.update(
EntityId { namespace, idx },
state,
client_ctx.id,
);
}
}
Sb::DespawnEntities {
namespace,
entities,
} => {
for idx in entities {
// TODO: Decide how to handle authority problems.
let _ = instance_ctx
.dm
.despawn(EntityId { namespace, idx }, client_ctx.id);
}
_ => todo!(),
}
}

info!("Client disconnected");
Ok(())
}

#[cfg(test)]
mod test {
use super::*;

fn _assert_instance_ctx_send() {
fn helper(_: impl Send) {}
helper(InstanceCtx::new(rand::rngs::OsRng));
}
}

0 comments on commit a75cd51

Please sign in to comment.