Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
JHawk0224 committed Dec 12, 2023
1 parent 7a304b7 commit 412dee7
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 4 deletions.
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# Comanche Franz

## An attempt at a Kafka client + cluster manager in Rust

#### [NOTE] Much work was doing using VSCode's Live Share feature, so the commit history is not an accurate representation of the work done.

## a rookie attempt at a Kafka client + cluster manager in Rust
Welcome to our final project! Here is a list of the key feature in this project, and the structure of our Kafka service.

First of all, the service is split into the following parts:
- Broker Leader
- Broker
- Producer
- Consumer

The broker leader is responsible for coordinating all of the other brokers, and it also assigns the partitions and sends this info out.

Here is a diagram demonstrating what parts talk to what:
..AD HERE..
50 changes: 47 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,11 @@ mod tests {

use super::*;

fn run_kafka(lead_addr: ServerId) {
fn run_kafka(lead_addr: ServerId, broker_addr_start: ServerId) {
let partition_count: usize = 3;
let broker_count: usize = 3;
let broker_ids = (0..broker_count)
.map(|i| (8080 + i) as u16)
.map(|i| broker_addr_start + i as u16)
.collect::<Vec<_>>();
let broker_ids_clone = broker_ids.clone();
tokio::spawn(async move {
Expand All @@ -172,11 +172,55 @@ mod tests {
async fn test_producer() {
// WOULD BE SPUN UP IN BACKGROUND FOR RUNNING KAFKA, NOT BY CLIENT
let lead_addr: ServerId = 8000;
run_kafka(lead_addr);
run_kafka(lead_addr, 8080);

// How a client would make a producer
let mut producer = Producer::new(lead_addr).await;
producer.add_topic("Best foods".to_string()).await.unwrap();
producer.send_message("Best foods".to_string(), "pizza is a good food".to_string()).await.unwrap();
}

#[tokio::test]
async fn test_multiple_topics() {
// WOULD BE SPUN UP IN BACKGROUND FOR RUNNING KAFKA, NOT BY CLIENT
let lead_addr: ServerId = 8000;
run_kafka(lead_addr, 8080);

// How a client would make a producer
let mut producer = Producer::new(lead_addr).await;
producer.add_topic("Best foods".to_string()).await.unwrap();
producer.add_topic("Best drinks".to_string()).await.unwrap();
producer.send_message("Best foods".to_string(), "pizza is a good food".to_string()).await.unwrap();
producer.send_message("Best drinks".to_string(), "water is the only good drink".to_string()).await.unwrap();
producer.send_message("Best foods".to_string(), "chicken is a good food".to_string()).await.unwrap();
producer.send_message("Best drinks".to_string(), "milk is a good drink".to_string()).await.unwrap();
}

#[tokio::test]
async fn test_producer_consumer() {
// WOULD BE SPUN UP IN BACKGROUND FOR RUNNING KAFKA, NOT BY CLIENT
let lead_addr: ServerId = 8000;
run_kafka(lead_addr, 8080);

// How a client would make a producer
let mut producer = Producer::new(lead_addr).await;
producer.add_topic("Best foods".to_string()).await.unwrap();


let mut consumer = consumer::Consumer::new(8001, lead_addr);
let res = consumer.poll().await.unwrap();
let message = format!("{:?}", res);
assert_eq!(message, "None"); // fix this

producer.send_message("Best foods".to_string(), "pizza is a good food".to_string()).await.unwrap();

let res = consumer.poll().await.unwrap();
let message = format!("{:?}", res);
assert_eq!(message, "None"); // fix this

consumer.subscribe("Best foods".to_string()).await.unwrap();
let res = consumer.poll().await.unwrap();
let message = format!("{:?}", res);
assert_eq!(message, "Some(\"pizza is a good food\")"); // fix this
}
}

0 comments on commit 412dee7

Please sign in to comment.