Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HW6 #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
FROM python:3.11-slim

# Install Poetry
RUN pip install poetry

# Set the working directory in the container
WORKDIR /usr/src/app

# Copy the pyproject.toml and poetry.lock files into the container
COPY pyproject.toml poetry.lock* /usr/src/app/

# Disable virtual environments created by poetry,
# as the docker container itself provides isolation
RUN poetry config virtualenvs.create false

# Install project dependencies
RUN poetry install --no-dev --no-interaction --no-ansi

# Copy the rest of your application's code
COPY . /usr/src/app

# Make port 8501 available to the world outside this container
EXPOSE 8501

# Run main.py when the container launches using Streamlit
CMD ["streamlit", "run", "main.py"]
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# Ниже описания будут критерии

* Проект представляет собой прототип системы для анализа и обучения uplift модели, интегрированной с дэшбордом Streamlit и автоматизированной через Airflow с использованием DockerOperator.
* Автоматизация процесса дообучения модели на ежедневной основе осуществляется с помощью Airflow и DockerOperator. Для деплоймента Airflow используется Helm, а синхронизация с Git обеспечивается через gitsync.
* Модель реализована быстро и может не являться оптимальной с точки зрения качества предсказаний или эффективности вычислений. Такой подход был выбран для демонстрации основных принципов работы с uplift моделированием и не предназначен для использования в продакшене без дополнительной оптимизации и тестирования.
* Датасет для обучения модели скачивается локально и не подвергается изменениям в рамках данного проекта, за исключением механизма случайного выбора части данных для имитации обновления набора данных.
* kind-cluster.yaml взят без изменений


# 2024-spring-ab-python-ads-HW-6

Реализовать дэшборд на Streamlit с обучением uplift модели и EDA на датасете X5. Для имитации обновления данных реализовать случайный выбор 80% датасета train. В дэшборде реализовать возможность выбора из подходов Solo Model и Two Model и классификаторов CatBoostClassifier и RandomForestClassifier (аналогично семинару). С помощью DockerOperator реализовать ежедневное дообучение модели в Airflow.
Expand Down
400,163 changes: 400,163 additions & 0 deletions clients.csv

Large diffs are not rendered by default.

46 changes: 46 additions & 0 deletions dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from datetime import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
import pendulum
from airflow.providers.docker.operators.docker import DockerOperator

# Defining the DAG's start date
start_date: datetime = pendulum.today('Europe/Moscow').subtract(days=1)

with DAG(
dag_id="uplift_model_retraining",
default_args={
"owner": "me",
},
schedule_interval="@daily",
start_date=start_date,
tags=["uplift_model", "streamlit", "docker"],
catchup=False,
) as dag:
"""
DAG for daily retraining of the uplift model using Docker containers.

This DAG includes tasks for starting the process, training and evaluating the models using
a Docker container, and then concluding the process. The `train_and_evaluate_models` task runs
a Streamlit application inside a Docker container, which presumably trains and evaluates
uplift models.
"""

# Start of the DAG's tasks
start: EmptyOperator = EmptyOperator(task_id="start")

# Task for training and evaluating models inside a Docker container
train_and_evaluate: DockerOperator = DockerOperator(
task_id="train_and_evaluate_models",
image="streamlit_homework:latest", # Docker image to use
docker_url='unix://var/run/docker.sock', # URL to Docker daemon
network_mode="bridge", # Network mode for Docker container
command="streamlit run main.py", # Command to run in the container
auto_remove=True, # Automatically remove the container when the task is finished
)

# End of the DAG's tasks
end: EmptyOperator = EmptyOperator(task_id="end")

# Defining the task dependencies
start >> train_and_evaluate >> end
130 changes: 130 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Main application for Uplift Modeling Dashboard.

This application integrates data processing, exploratory data analysis (EDA), and model training
and evaluation for uplift modeling using Streamlit for the user interface.
"""

import streamlit as st
from sklift.models import SoloModel, TwoModels
from sklift.metrics import qini_auc_score
from catboost import CatBoostClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from typing import List, Tuple, Union

# Load the datasets
uplift_train_df: pd.DataFrame = pd.read_csv('uplift_train.csv')
clients_df: pd.DataFrame = pd.read_csv('clients.csv')
Comment on lines +19 to +20

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io операции стоит вызывать внутри функции. Иначе при импорте файла будет происходить выполнение.


# Merge the training data with client information
train_data_merged: pd.DataFrame = pd.merge(uplift_train_df, clients_df, on='client_id', how='left')

# Implement random selection of 80% of the dataset for training to simulate data updates
train_data_sample: pd.DataFrame = train_data_merged.sample(frac=0.8)

# Frequency Encoding for categorical variables
for col in train_data_sample.select_dtypes(include=['object', 'category']).columns:
if col not in ['client_id']:
freq: pd.Series = train_data_sample.groupby(col).size() / len(train_data_sample)
train_data_sample[col + '_freq'] = train_data_sample[col].map(freq)
train_data_sample.drop(col, axis=1, inplace=True)

# Streamlit UI setup
st.title("Uplift Modeling Dashboard")

# Selection of modeling approaches and classifiers
model_approaches: List[str] = st.multiselect(
"Choose one or more uplift modeling approaches:", ["Solo Model", "Two Model"], default=["Solo Model"]
)
classifier_choices: List[str] = st.multiselect(
"Choose one or more classifiers:", ["CatBoostClassifier", "RandomForestClassifier"], default=["CatBoostClassifier"]
)

def get_classifier(classifier_name: str) -> Union[CatBoostClassifier, RandomForestClassifier]:
"""Fetches the classifier based on the classifier name.

Args:
classifier_name (str): The name of the classifier.

Returns:
Union[CatBoostClassifier, RandomForestClassifier]: An instance of the requested classifier.
"""
if classifier_name == "CatBoostClassifier":
return CatBoostClassifier(verbose=0, thread_count=2)
elif classifier_name == "RandomForestClassifier":
return RandomForestClassifier()

def train_and_evaluate(
model_approach: str,
classifier_name: str,
X_train: pd.DataFrame,
X_test: pd.DataFrame,
y_train: pd.Series,
y_test: pd.Series,
treatment_train: pd.Series,
treatment_test: pd.Series
) -> float:
"""Trains and evaluates the uplift model based on the selected approach and classifier.

Args:
model_approach (str): The uplift modeling approach ("Solo Model" or "Two Model").
classifier_name (str): The name of the classifier to use.
X_train (pd.DataFrame): Training features.
X_test (pd.DataFrame): Testing features.
y_train (pd.Series): Training target variable.
y_test (pd.Series): Testing target variable.
treatment_train (pd.Series): Training treatment indicator.
treatment_test (pd.Series): Testing treatment indicator.

Returns:
float: The AUUC score for the evaluated model.
"""
if model_approach == "Two Model":
estimator_trmnt = get_classifier(classifier_name)
estimator_ctrl = get_classifier(classifier_name)
model = TwoModels(estimator_trmnt=estimator_trmnt, estimator_ctrl=estimator_ctrl, method='vanilla')
else:
estimator = get_classifier(classifier_name)
model = SoloModel(estimator)

model.fit(X_train, y_train, treatment_train)
uplift_pred = model.predict(X_test)
auuc_score = qini_auc_score(y_test, uplift_pred, treatment_test)
return auuc_score

# EDA: Visualizations for Treatment Flag and Target Variable distributions
st.header("Exploratory Data Analysis")
st.subheader("Distribution of Treatment Flag")
fig, ax = plt.subplots()
sns.countplot(x='treatment_flg', data=train_data_sample, ax=ax)
st.pyplot(fig)

st.subheader("Distribution of Target Variable")
fig, ax = plt.subplots()
sns.countplot(x='target', data=train_data_sample, ax=ax)
st.pyplot(fig)

# Model Training and Evaluation
st.header("Model Training and Evaluation")
if st.button("Train and Evaluate Models"):
with st.spinner('Model training in progress...'):
# Extract features and target variables
X: pd.DataFrame = train_data_sample.drop(['client_id', 'target', 'treatment_flg'], axis=1)
y: pd.DataFrame = train_data_sample['target']
treatment: pd.DataFrame = train_data_sample['treatment_flg']
X_train, X_test, y_train, y_test, treatment_train, treatment_test = train_test_split(X, y, treatment, test_size=0.3)

results: List[Tuple[str, str, float]] = []
for model_approach in model_approaches:
for classifier_name in classifier_choices:
auuc_score = train_and_evaluate(model_approach, classifier_name, X_train, X_test, y_train, y_test, treatment_train, treatment_test)
results.append((model_approach, classifier_name, auuc_score))

st.success("Model training completed.")
# Display AUUC Scores for comparison
st.subheader("Model Comparison by AUUC Scores")
for result in results:
st.write(f"{result[0]} + {result[1]}: AUUC Score = {result[2]:.4f}")
Loading