Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
JHawk0224 committed Dec 12, 2023
2 parents 412dee7 + 6a48fd4 commit 04af3ad
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 46 deletions.
17 changes: 16 additions & 1 deletion src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,22 @@ impl Broker {
}
});

warp::serve(producer_sends_message.or(consumer_requests_message))
let consumer_get_offset = warp::get()
.and(warp::path!(String / "offset"))
.map({
let partitions = self.partitions.clone();
move |partition_id: String| {
eprintln!("Broker received consumer request for offset");
let partition_id = PartitionId::from_str(&partition_id);
let partitions = partitions.lock().unwrap();
let partition = partitions.get(&partition_id).unwrap();
let offset = partition.get_offset();
eprintln!("Broker received consumer request for offset");
warp::reply::json(&offset)
}
});

warp::serve(producer_sends_message.or(consumer_requests_message).or(consumer_get_offset))
.run(([127, 0, 0, 1], self.addr))
.await;
}
Expand Down
7 changes: 2 additions & 5 deletions src/broker/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,21 @@ impl Partition {
}

pub fn read(&mut self, offset: usize) -> String {
let res = if offset <= self.fileoffset {
let res = if offset < self.fileoffset {
let mut file = std::fs::OpenOptions::new()
.read(true)
.open(&self.filename)
.unwrap();
let mut buffer = String::new();
file.read_to_string(&mut buffer).unwrap();
buffer[offset..].to_string()
buffer[offset + 1..].to_string()
} else {
String::new()
};

res + &self.buffer
}

// TODO: technically, we need to call this by a consumer
// and keep track of the offset for each consumer group for each partition/topic
#[allow(dead_code)]
pub fn get_offset(&self) -> usize {
self.fileoffset + self.buffer.len()
}
Expand Down
15 changes: 8 additions & 7 deletions src/broker_lead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use warp::Filter;

use crate::{
consumer_group::ConsumerGroup,
listeners::{ConsumerAddGroup, ProducerAddsTopic},
listeners::{ConsumerAddGroup, ProducerAddsTopic, ConsumerSubscribes},
ConsumerGroupId, PartitionId, PartitionInfo, ServerId, Topic,
};

Expand Down Expand Up @@ -140,12 +140,13 @@ impl BrokerLead {
});

let consumer_subscribe = warp::post()
.and(warp::path!(ConsumerGroupId / "topics"))
.and(warp::path!("groups" / ConsumerGroupId / "topics"))
.and(warp::body::json())
.map({
let topic_to_partitions = self.topic_to_partitions.clone();
let consumer_group_id_to_groups = self.consumer_group_id_to_groups.clone();
move |consumer_group_id: ConsumerGroupId, topic: Topic| {
move |consumer_group_id: ConsumerGroupId, body: ConsumerSubscribes| {
let topic = body.topic;
eprintln!("BrokerLead received consumer subscribe: {:?}", topic);
let topic_to_partitions = topic_to_partitions.lock().unwrap();
let mut consumer_group_id_to_groups =
Expand All @@ -162,7 +163,7 @@ impl BrokerLead {
});

let consumer_unsubscribe = warp::delete()
.and(warp::path!(ConsumerGroupId / "topics" / Topic))
.and(warp::path!("groups" / ConsumerGroupId / "topics" / Topic))
.map({
let topic_to_partitions = self.topic_to_partitions.clone();
let consumer_group_id_to_groups = self.consumer_group_id_to_groups.clone();
Expand All @@ -183,7 +184,7 @@ impl BrokerLead {
});

let consumer_add_group = warp::post()
.and(warp::path!(ConsumerGroupId / "consumers"))
.and(warp::path!("groups" / ConsumerGroupId / "consumers"))
.and(warp::body::json())
.map({
let topic_to_partitions = self.topic_to_partitions.clone();
Expand All @@ -203,7 +204,7 @@ impl BrokerLead {
});

let consumer_remove_group = warp::delete()
.and(warp::path!(ConsumerGroupId / "consumers" / ServerId))
.and(warp::path!("groups" / ConsumerGroupId / "consumers" / ServerId))
.map({
let topic_to_partitions = self.topic_to_partitions.clone();
let consumer_group_id_to_groups = self.consumer_group_id_to_groups.clone();
Expand All @@ -221,7 +222,7 @@ impl BrokerLead {
});

let consumer_check_group = warp::get()
.and(warp::path!(ConsumerGroupId / "consumers" / ServerId))
.and(warp::path!("groups" / ConsumerGroupId / "consumers" / ServerId))
.map({
let consumer_group_id_to_groups = self.consumer_group_id_to_groups.clone();
move |consumer_group_id: ConsumerGroupId, server_id: ServerId| {
Expand Down
66 changes: 44 additions & 22 deletions src/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

use crate::{
listeners::{ConsumerAddGroup, ConsumerSubscribes},
ConsumerGroupId, ConsumerInformation, PartitionInfo, ServerId, Topic, Value,
listeners::{ConsumerAddGroup, ConsumerSubscribes, ConsumerRequestsMessage},
ConsumerGroupId, ConsumerInformation, ServerId, Topic, Value, PartitionInfoWithOffset, PartitionInfo,
};

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct ConsumerRequestsMessage {
pub offset: usize,
pub size: usize,
}

pub struct Consumer {
addr: ServerId,
broker_leader_addr: ServerId,
partitions: Vec<PartitionInfo>,
partitions: Vec<PartitionInfoWithOffset>,
consumer_group_id: Option<ConsumerGroupId>,
}

Expand All @@ -37,7 +31,7 @@ impl Consumer {
let message = ConsumerSubscribes { topic };
reqwest::Client::new()
.post(format!(
"http://127.0.0.1:{}/{}/topics",
"http://127.0.0.1:{}/groups/{}/topics",
self.broker_leader_addr,
self.consumer_group_id.as_ref().unwrap()
))
Expand All @@ -56,7 +50,7 @@ impl Consumer {

reqwest::Client::new()
.delete(format!(
"http://127.0.0.1:{}/{}/topics/{}",
"http://127.0.0.1:{}/groups/{}/topics/{}",
self.broker_leader_addr,
self.consumer_group_id.as_ref().unwrap(),
topic
Expand All @@ -82,7 +76,7 @@ impl Consumer {

reqwest::Client::new()
.post(format!(
"http://127.0.0.1:{}/{}/consumers",
"http://127.0.0.1:{}/groups/{}/consumers",
self.broker_leader_addr, consumer_group_id
))
.json(&message)
Expand All @@ -101,7 +95,7 @@ impl Consumer {

reqwest::Client::new()
.delete(format!(
"http://127.0.0.1:{}/{}/consumers/{}",
"http://127.0.0.1:{}/groups/{}/consumers/{}",
self.broker_leader_addr,
self.consumer_group_id.as_ref().unwrap(),
self.addr
Expand All @@ -113,6 +107,15 @@ impl Consumer {
Ok(())
}

pub async fn get_offset(&self, partition: &PartitionInfo) -> Result<usize, reqwest::Error> {
let res = reqwest::Client::new()
.get(format!("http://127.0.0.1:{}/{}/offset", partition.server_id(), partition.partition_id()))
.send()
.await?;

Ok(res.json::<usize>().await?)
}

pub async fn poll(&mut self) -> Result<Vec<Value>, reqwest::Error> {
if self.consumer_group_id.is_none() {
eprintln!("Not in consumer group.");
Expand All @@ -122,7 +125,7 @@ impl Consumer {
// first check if any changes to partitions
let res = reqwest::Client::new()
.get(format!(
"http://127.0.0.1:{}/{}/consumers/{}",
"http://127.0.0.1:{}/groups/{}/consumers/{}",
self.broker_leader_addr,
self.consumer_group_id.as_ref().unwrap(),
self.addr
Expand All @@ -133,24 +136,43 @@ impl Consumer {
let consumer_info = res.json::<ConsumerInformation>().await?;
if consumer_info.has_received_change {
eprintln!("Received change: {:?}", consumer_info);
self.partitions = consumer_info.partition_infos;
// get current offsets mapping from partition id to offset
let mut partition_id_to_offset = HashMap::new();
for partition_info_with_offset in self.partitions.iter() {
let id = partition_info_with_offset.partition_info.partition_id();
let offset = partition_info_with_offset.offset();
partition_id_to_offset.insert(id, offset);
}

let partition_infos = consumer_info.partition_infos;
let mut new_partitions = Vec::new();
for partition_info in partition_infos {
// check if we already have it
if let Some(offset) = partition_id_to_offset.get(&partition_info.partition_id()) {
new_partitions.push(PartitionInfoWithOffset::new(partition_info.clone(), *offset));
} else {
// if not, get the offset
let offset = self.get_offset(&partition_info).await?;
new_partitions.push(PartitionInfoWithOffset::new(partition_info, offset));
}
}
self.partitions = new_partitions;
}

let mut all_values = Vec::new();
for partition_info in self.partitions.iter() {
let msg: ConsumerRequestsMessage = ConsumerRequestsMessage { offset: 0, size: 1 };
for partition_info_with_offset in self.partitions.iter() {
let msg: ConsumerRequestsMessage = ConsumerRequestsMessage { offset: partition_info_with_offset.offset() };
let partition_info = &partition_info_with_offset.partition_info;
let res = reqwest::Client::new()
.get(format!(
"http://127.0.0.1:{}/{}/messages",
partition_info.server_id(),
partition_info.partition_id(),
partition_info.partition_id()
))
.json(&msg)
.send()
.await?;

let values = res.json::<Vec<Value>>().await?;
all_values.extend(values);
all_values.push(res.json::<Value>().await?);
}

Ok(all_values)
Expand Down
6 changes: 6 additions & 0 deletions src/consumer_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,17 @@ impl ConsumerGroup {

pub fn subscribe(&mut self, topic: &Topic, map: &TopicToPartitionInfo) {
self.topics.insert(topic.clone());

eprintln!("after add, topics subscribed to: {:?}", self.topics);

self.reorganize_partitions(map);
}

pub fn unsubscribe(&mut self, topic: &Topic, map: &TopicToPartitionInfo) {
self.topics.remove(topic);

eprintln!("after remove, topics subscribed to: {:?}", self.topics);

self.reorganize_partitions(map);
}

Expand Down
20 changes: 19 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ mod listeners {
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct ConsumerRequestsMessage {
pub offset: usize,
pub size: usize,
}

/****************** FOR THE BROKER LEADER LISTENERS ******/
Expand Down Expand Up @@ -83,6 +82,25 @@ impl std::fmt::Display for PartitionId {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartitionInfoWithOffset {
pub partition_info: PartitionInfo,
offset: usize,
}

impl PartitionInfoWithOffset {
pub fn new(partition_info: PartitionInfo, offset: usize) -> PartitionInfoWithOffset {
PartitionInfoWithOffset {
partition_info,
offset,
}
}

pub fn offset(&self) -> usize {
self.offset
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartitionInfo {
partition_id: PartitionId,
Expand Down
21 changes: 11 additions & 10 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async fn main() {
match service {
Service::Broker => {
// TODO: REMOVE THESE TEMPORARY VALUES AFTERWARDS
let addr: ServerId = 8080;
let addr: ServerId = 8001;
// let addr: ServerId = read("Enter server addr: ");

Broker::new(addr).listen().await;
Expand All @@ -60,14 +60,15 @@ async fn main() {
// let addr: ServerId = read("Enter server addr: ");
// let partition_count: usize = read("Enter number of partitions: ");

let mut broker_count: usize = read("Enter number of brokers: ");
while broker_count < 1 {
eprintln!("Invalid number of brokers.");
broker_count = read("Enter number of brokers: ");
}
let broker_ids = (0..broker_count)
.map(|_| read("Enter broker addr: "))
.collect();
let broker_ids = vec![8001];
// let mut broker_count: usize = read("Enter number of brokers: ");
// while broker_count < 1 {
// eprintln!("Invalid number of brokers.");
// broker_count = read("Enter number of brokers: ");
// }
// let broker_ids = (0..broker_count)
// .map(|_| read("Enter broker addr: "))
// .collect();
BrokerLead::new(addr, broker_ids, partition_count)
.listen()
.await;
Expand Down Expand Up @@ -110,7 +111,7 @@ async fn main() {

let mut consumer = consumer::Consumer::new(addr, broker_leader_addr);
loop {
let action: usize = read("Enter action (0: subscribe, 1: unsubscribe, 2: join consumer group, 3: leave consumer group): ");
let action: usize = read("Enter action (0: subscribe, 1: unsubscribe, 2: join consumer group, 3: leave consumer group, 4: poll): ");
match action {
0 => {
let topic: String = read("Enter topic: ");
Expand Down

0 comments on commit 04af3ad

Please sign in to comment.