Skip to content

Commit

Permalink
update content: add data pipeline project and tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
behnamyazdan committed Aug 14, 2024
1 parent 6a8b71c commit 115b23f
Show file tree
Hide file tree
Showing 10 changed files with 855 additions and 1 deletion.
2 changes: 1 addition & 1 deletion 03-GitAndGitHub/09-BranchingAndMerging.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ This hands-on exercise will demonstrate how to isolate changes in a dedicated br
6. **Stage and Commit Your Changes**:
- Stage the changes you’ve made using the `git add` command:
```sh
git add data_transformation.py
git add data_transform.py
```
- Commit the changes with a descriptive message:
```sh
Expand Down
558 changes: 558 additions & 0 deletions 08-BuildingDataPipelines/05-ProjectAndHandsOn.md

Large diffs are not rendered by default.

44 changes: 44 additions & 0 deletions 08-BuildingDataPipelines/Examples/iot_pipeline/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: iot_pipeline

services:
postgres:
image: postgres:13-alpine
container_name: postgres
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: iot_data
ports:
- "5433:5432"
volumes:
- postgres_data:/var/lib/postgresql/data

# ClickHouse service
clickhouse-server:
image: clickhouse/clickhouse-server:latest
container_name: clickhouse-server
ports:
- "8123:8123" # HTTP port
- "9000:9000" # Native client port
volumes:
- clickhouse-data:/var/lib/clickhouse
- clickhouse-config:/etc/clickhouse-server
environment:
- CLICKHOUSE_DB=mydatabase

# Optional: Grafana for dashboarding
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
depends_on:
- clickhouse-server
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin # Default admin password for Grafana
- GF_INSTALL_PLUGINS=grafana-clickhouse-datasource

volumes:
clickhouse-data:
clickhouse-config:
postgres_data:
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- Create the 'daily_temperature_avg' table
CREATE TABLE IF NOT EXISTS daily_temperature_avg (
date Date,
avg_daily_temperature Float32
) ENGINE = MergeTree()
ORDER BY date;

-- Create the 'daily_humidity_avg' table
CREATE TABLE IF NOT EXISTS daily_humidity_avg (
date Date,
avg_daily_humidity Float32
) ENGINE = MergeTree()
ORDER BY date;

-- Create the 'hourly_temperature_avg' table
CREATE TABLE IF NOT EXISTS hourly_temperature_avg (
date Date,
hour UInt8,
avg_hourly_temperature Float32
) ENGINE = MergeTree()
ORDER BY (date, hour);

-- Create the 'hourly_humidity_avg' table
CREATE TABLE IF NOT EXISTS hourly_humidity_avg (
date Date,
hour UInt8,
avg_hourly_humidity Float32
) ENGINE = MergeTree()
ORDER BY (date, hour);
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Create the 'temperature_sensor_data' table
CREATE TABLE IF NOT EXISTS temperature_sensor_data (
timestamp TIMESTAMPTZ NOT NULL,
sensor_id INT NOT NULL,
temperature DECIMAL(5, 2) NOT NULL,
PRIMARY KEY (timestamp, sensor_id)
);

-- Create the 'humidity_sensor_data' table
CREATE TABLE IF NOT EXISTS humidity_sensor_data (
timestamp TIMESTAMPTZ NOT NULL,
sensor_id INT NOT NULL,
humidity DECIMAL(5, 2) NOT NULL,
PRIMARY KEY (timestamp, sensor_id)
);
57 changes: 57 additions & 0 deletions 08-BuildingDataPipelines/Examples/iot_pipeline/src/data_extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import os
import pandas as pd
from sqlalchemy import create_engine
import shutil

# Database connection parameters
DB_USERNAME = 'postgres'
DB_PASSWORD = 'postgres'
DB_HOST = 'localhost'
DB_PORT = '5433'
DB_NAME = 'iot_data'

# Directory paths
DATA_DIR = '../data'
ARCHIVE_DIR = '../data/archive'

# Create a PostgreSQL database connection
engine = create_engine(f'postgresql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}')


def process_files():
# Ensure the archive directory exists
os.makedirs(ARCHIVE_DIR, exist_ok=True)

# Loop through all CSV files in the data directory
for filename in os.listdir(DATA_DIR):
if filename.endswith(".csv"):
file_path = os.path.join(DATA_DIR, filename)

# Determine the sensor type from the filename
if "temperature" in filename:
table_name = "temperature_sensor_data"
elif "humidity" in filename:
table_name = "humidity_sensor_data"
else:
print(f"Unknown sensor type in file: {filename}")
continue

# Read the CSV file into a DataFrame
df = pd.read_csv(file_path)

# Insert the data into the appropriate table in PostgreSQL
try:
df.to_sql(table_name, engine, if_exists='append', index=False)
print(f"Inserted data from {filename} into {table_name} table.")
except Exception as e:
print(f"Error inserting data from {filename}: {e}")
continue

# Move the processed file to the archive directory
archive_path = os.path.join(ARCHIVE_DIR, filename)
shutil.move(file_path, archive_path)
print(f"Moved {filename} to archive.")


if __name__ == "__main__":
process_files()
31 changes: 31 additions & 0 deletions 08-BuildingDataPipelines/Examples/iot_pipeline/src/data_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import pandas as pd
import clickhouse_connect

# Directory paths
RESULT_DIR = '../data/result/'

# ClickHouse connection
client = clickhouse_connect.get_client(host='localhost', port=8123)


def load_data():
# Load transformed data from CSV files
temp_avg_data_hourly = pd.read_csv(f'{RESULT_DIR}hourly_temperature_avg.csv')
temp_avg_data_daily = pd.read_csv(f'{RESULT_DIR}daily_temperature_avg.csv')
humidity_avg_data_hourly = pd.read_csv(f'{RESULT_DIR}hourly_humidity_avg.csv')
humidity_avg_data_daily = pd.read_csv(f'{RESULT_DIR}daily_humidity_avg.csv')

# Convert 'date' column to datetime.date type
temp_avg_data_hourly['date'] = pd.to_datetime(temp_avg_data_hourly['date']).dt.date
temp_avg_data_daily['date'] = pd.to_datetime(temp_avg_data_daily['date']).dt.date
humidity_avg_data_hourly['date'] = pd.to_datetime(humidity_avg_data_hourly['date']).dt.date
humidity_avg_data_daily['date'] = pd.to_datetime(humidity_avg_data_daily['date']).dt.date

# Load data into ClickHouse tables
client.insert_df('hourly_temperature_avg', temp_avg_data_hourly)
client.insert_df('daily_temperature_avg', temp_avg_data_daily)
client.insert_df('hourly_humidity_avg', humidity_avg_data_hourly)
client.insert_df('daily_humidity_avg', humidity_avg_data_daily)

# Execute data loading
load_data()
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import os
import pandas as pd
from sqlalchemy import create_engine

# Database connection parameters
DB_USERNAME = 'postgres'
DB_PASSWORD = 'postgres'
DB_HOST = 'localhost'
DB_PORT = '5432'
DB_NAME = 'iot_data'

# Directory paths
RESULT_DIR = '../data/result/'

# Create a PostgreSQL database connection
engine = create_engine(f'postgresql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}')


def transform_data():

# Ensure the archive directory exists
os.makedirs(RESULT_DIR, exist_ok=True)

# Load data from staging tables
temp_data = pd.read_sql('SELECT * FROM temperature_sensor_data', engine)
humidity_data = pd.read_sql('SELECT * FROM humidity_sensor_data', engine)

# Data Cleaning
temp_data.dropna(inplace=True)
humidity_data.dropna(inplace=True)
temp_data.drop_duplicates(inplace=True)
humidity_data.drop_duplicates(inplace=True)

# Data Aggregation
temp_data['date'] = temp_data['timestamp'].dt.date
humidity_data['date'] = humidity_data['timestamp'].dt.date
temp_data['hour'] = temp_data['timestamp'].dt.hour
humidity_data['hour'] = humidity_data['timestamp'].dt.hour

# Daily Mean Calculation
daily_temp_avg = temp_data.groupby('date')['temperature'].mean().reset_index()
daily_humidity_avg = humidity_data.groupby('date')['humidity'].mean().reset_index()

# Hourly Mean Calculation
hourly_temp_avg = temp_data.groupby(['date', 'hour'])['temperature'].mean().reset_index()
hourly_humidity_avg = humidity_data.groupby(['date', 'hour'])['humidity'].mean().reset_index()

# Round the mean values to 2 decimal places
daily_temp_avg['temperature'] = daily_temp_avg['temperature'].round(2)
daily_humidity_avg['humidity'] = daily_humidity_avg['humidity'].round(2)
hourly_temp_avg['temperature'] = hourly_temp_avg['temperature'].round(2)
hourly_humidity_avg['humidity'] = hourly_humidity_avg['humidity'].round(2)

# Data Formatting
daily_temp_avg.rename(columns={'temperature': 'avg_daily_temperature'}, inplace=True)
daily_humidity_avg.rename(columns={'humidity': 'avg_daily_humidity'}, inplace=True)
hourly_temp_avg.rename(columns={'temperature': 'avg_hourly_temperature'}, inplace=True)
hourly_humidity_avg.rename(columns={'humidity': 'avg_hourly_humidity'}, inplace=True)

# Save transformed data to CSV
daily_temp_avg.to_csv(f'{RESULT_DIR}daily_temperature_avg.csv', index=False)
daily_humidity_avg.to_csv(f'{RESULT_DIR}daily_humidity_avg.csv', index=False)
hourly_temp_avg.to_csv(f'{RESULT_DIR}hourly_temperature_avg.csv', index=False)
hourly_humidity_avg.to_csv(f'{RESULT_DIR}hourly_humidity_avg.csv', index=False)

# Load transformed data to the database or data warehouse
daily_temp_avg.to_sql('daily_temperature_avg', engine, if_exists='replace', index=False)
daily_humidity_avg.to_sql('daily_humidity_avg', engine, if_exists='replace', index=False)
hourly_temp_avg.to_sql('hourly_temperature_avg', engine, if_exists='replace', index=False)
hourly_humidity_avg.to_sql('hourly_humidity_avg', engine, if_exists='replace', index=False)


# Execute transformation
transform_data()
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import random
import pandas as pd
from datetime import datetime, timedelta
import time


# Directory paths
DATA_DIR = '../data/'


def generate_sensor_data(sensor_type, num_records):
data = []
for _ in range(num_records):
timestamp = datetime.now() - timedelta(minutes=random.randint(1, 1000))
sensor_id = random.randint(1, 10)
if sensor_type == "temperature":
value = round(random.uniform(15.0, 30.0), 2)
data.append([timestamp, sensor_id, value])
elif sensor_type == "humidity":
value = round(random.uniform(30.0, 80.0), 2)
data.append([timestamp, sensor_id, value])
df = pd.DataFrame(data, columns=["timestamp", "sensor_id", f"{sensor_type}"])
return df


def save_data_to_file():
while True:
# Generate data
temp_data = generate_sensor_data("temperature", 100)
humidity_data = generate_sensor_data("humidity", 100)

# Get the current timestamp to use in the filename
current_time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

# Save data to CSV files with timestamp in the filename
temp_data.to_csv(f"{DATA_DIR}temperature_data_{current_time}.csv", index=False)
humidity_data.to_csv(f"{DATA_DIR}humidity_data_{current_time}.csv", index=False)

print(f"Data saved at {current_time}")

# Wait for one minute before generating and saving new data
time.sleep(60)


if __name__ == "__main__":
save_data_to_file()
Binary file added _assets/iot_project_diagram.webp
Binary file not shown.

0 comments on commit 115b23f

Please sign in to comment.