diff --git a/pysnippets/Scrubs/README.md b/pysnippets/Scrubs/README.md new file mode 100644 index 0000000..c19d020 --- /dev/null +++ b/pysnippets/Scrubs/README.md @@ -0,0 +1,57 @@ +# Scrubs Directory - Python Code Snippets + +This directory contains a collection of Python scripts and modules designed to preprocess, clean, and transform data for further analysis or machine learning tasks. Below is a detailed description of each file and its purpose. + +## Files and Descriptions + +### `backup.py` +- **Purpose**: Handles the saving of cleaned dataframes to disk. +- **Key Functions**: + - `backup_df(df, path_clean)`: Saves a DataFrame to a CSV file after performing operations like fixing dates, imputing categoricals, and compressing numeric columns. + +### `clean.py` +- **Purpose**: Provides functions to clean and preprocess dataframes by removing unwanted columns and rows based on various criteria. +- **Key Functions**: + - `drop_unnamed(df)`: Removes columns that are unnamed. + - `drop_zv(df)`: Drops columns with zero variance. + - `drop_nzv(df, nzv_threshold)`: Drops near-zero variance categorical columns. + - `drop_missings(df, NA_threshold)`: Drops columns with missing values exceeding a specified threshold. + - `remove_zv_missings(df, NA_threshold, nzv_threshold)`: Combines the functionalities of dropping columns based on missing values and near-zero variance. + +### `clip.py` +- **Purpose**: Manages categorical variables by reducing the number of levels in categorical features. +- **Key Functions**: + - `clip_categorical(ser, MIN_LEVELS, MIN_FREQ, COVERAGE)`: Clips categorical variables to reduce their levels based on frequency and coverage thresholds. + +### `compress.py` +- **Purpose**: Compresses numeric data types and encodes categorical variables. +- **Key Functions**: + - `compress_numeric(COL)`: Downcasts numeric columns to the smallest appropriate data type. + - `compress_categorical(COL)`: Encodes categorical variables with more than two classes and persists the encoder. + +### `dummies.py` +- **Purpose**: Handles the creation of dummy variables for categorical data. +- **Key Functions**: + - `make_dummies(ser, DROP_ONE)`: Creates dummy variables for a categorical series. + - `create_dummified_df(df, drop_one)`: Applies `make_dummies` to each categorical column in a DataFrame. + +### `pipeline.py` +- **Purpose**: Orchestrates the entire data cleaning and preprocessing pipeline. +- **Key Functions**: + - `engineer_features(df)`: Performs feature engineering on the DataFrame. + - `aggregate_df(df, df_y, cols_flags)`: Aggregates data based on specified flags and other criteria. + - `get_aggregated_df(df, aggfunc)`: Applies an aggregation function to the DataFrame grouped by a key. + +### `utils.py` +- **Purpose**: Provides utility functions. +- **Key Functions**: + - `time_my_func(my_func)`: Decorator that measures the execution time of functions. + +### `__init__.py` +- **Purpose**: Marks the directory as a Python package module, allowing its contents to be imported elsewhere in Python projects. + +## Usage + +To use these scripts, ensure that your data meets the expected formats and types as required by each function. Most functions expect a pandas DataFrame as input. You can import these modules into your Python scripts or Jupyter notebooks and use them to preprocess and clean your data effectively. + +### Example Usage \ No newline at end of file diff --git a/pysnippets/Scrubs/__init__.py b/pysnippets/Scrubs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pysnippets/Scrubs/backup.py b/pysnippets/Scrubs/backup.py new file mode 100644 index 0000000..07644ac --- /dev/null +++ b/pysnippets/Scrubs/backup.py @@ -0,0 +1,32 @@ +import pandas as pd + +from compress import compress_numeric +from utils import time_my_func + +@time_my_func +def backup_df(df, path_clean): + """ + Writes a dataframe to disk after + cleaning dates + imputing categoricals + compressing numerics + """ + print("Fixing dates...") + date_cols = df.columns.map(lambda i: i if 'date' in i else None).dropna().tolist() + if len(date_cols) >= 1: + for COL in date_cols: + df.loc[:, COL] = pd.to_datetime(df[COL]) + + print("Fixing categoricals...") + # categoricals + catg_cols = df.select_dtypes(include=object).columns.tolist() + if len(catg_cols) >= 1: + for COL in catg_cols: + df.loc[:, COL] = df[COL].fillna('_missing_') + + print("Fixing numerics...") + df = df.apply(compress_numeric) + + print("Saving cleaned file to {}".format(path_clean)) + df.to_csv(path_clean) + return None \ No newline at end of file diff --git a/pysnippets/Scrubs/clean.py b/pysnippets/Scrubs/clean.py new file mode 100644 index 0000000..5a4d60d --- /dev/null +++ b/pysnippets/Scrubs/clean.py @@ -0,0 +1,128 @@ +import numpy as np + +from utils import time_my_func + +def drop_unnamed(df): + """ + """ + cols_unnamed = [x for x in df.columns if x.lower().startswith('unnamed')] + + if len(cols_unnamed) >= 1: + df.drop(cols_unnamed, axis=1, inplace=True) + else: + pass + return df + +def drop_zv(df_): + """ + Drop columns that have zero-variance + For Categoricals, if nunique == 1 + For Numeric, if std == 0 + """ + cols_catg_zv = \ + (df_ + .select_dtypes(include='object') + .nunique() + .where(lambda i: i == 1) + .dropna() + .index + .tolist() + ) + + cols_numeric_zv = \ + (df_ + .select_dtypes(include=np.number) + .std() + .where(lambda i: i == 0) + .dropna() + .index + .tolist() + ) + + cols_zv = cols_catg_zv + cols_numeric_zv + + if len(cols_zv) >= 1: + print("The following columns have zero-variance and will be dropped \n{}".format(cols_zv)) + df_.drop(cols_zv, axis=1, inplace=True) + else: + print("No columns with zero-variance.") + return df_ + +def drop_nzv(df_, nzv_threshold=0.95): + """ + Drop categorical columns that have near-zero variance + Such variables have very little predictive power + i.e., if frequency of mode > threshold + """ + cols_catg_nzv = \ + (df_ + .select_dtypes(include='object') + .apply(lambda c: c.value_counts(normalize=True).agg(['max', 'idxmax'])) + .T + .query("max > {}".format(nzv_threshold)) + .index + .tolist() + ) + + if len(cols_catg_nzv) >= 1: + print("The mode of these columns has a frequency higher than {}. Dropping these. \n{}" + .format(nzv_threshold, cols_catg_nzv)) + df_.drop(cols_catg_nzv, axis=1, inplace=True) + else: + print("No categorical columns with near-zero variance found.") + return df_ + +def drop_missings(df_, NA_threshold=0.8): + """ + Drop columns that have more missings than threshold + """ + cols_missings = \ + (df_ + .isnull() + .mean() + .where(lambda i: i > NA_threshold) + .dropna() + .index + .tolist() + ) + + if len(cols_missings) >= 1: + print("The following columns have more than {:.2f}% missings and will be dropped...\n{}" + .format(NA_threshold * 100, cols_missings)) + df_.drop(cols_missings, inplace=True, axis=1) + else: + print("No columns have more than {:.2f}% missings.".format(NA_threshold)) + return df_ + +@time_my_func +def remove_zv_missings(df, NA_threshold=0.85, nzv_threshold=0.95): + """ + Clean passed dataset by removing columns with + * gt NA_threshold percentage of missing values + * gt nzv_threshold frequency of the mode + * zero variance + + Parameters + --------- + df: DataFrame + The input dataset + + NA_threshold: float + Acceptable limit for missings + + nzv_threshold: float + Acceptable limit for frequency of mode + + Returns + ------- + Cleaned DataFrame + """ + df_ = \ + (df + .copy() + .pipe(drop_unnamed) + .pipe(drop_missings, NA_threshold=NA_threshold) + .pipe(drop_zv) + .pipe(drop_nzv, nzv_threshold=nzv_threshold) + ) + return df_ \ No newline at end of file diff --git a/pysnippets/Scrubs/clip.py b/pysnippets/Scrubs/clip.py new file mode 100644 index 0000000..9adfb81 --- /dev/null +++ b/pysnippets/Scrubs/clip.py @@ -0,0 +1,47 @@ +def clip_categorical(ser, MIN_LEVELS=5, MIN_FREQ=0.05, COVERAGE=0.95): + """ + Manage Categoricals with too many levels + If the categorical has only 2 levels, it will be returned as-is + Parameters + ---------- + SR: pandas.Series + the input Categorical series with >= MIN_LEVELS + MIN_FREQ: float + Levels with at least MIN_FREQ %cases will survive + COVERAGE: float + Levels that make up COVERAGE% of the data will survive + Returns + ------- + A pandas.Series object with + retained labels for levels that account for COVERAGE% of the data + replaced labels (with 'Other') for rare levels + """ + sr = ser.copy() + if sr.nunique() >= MIN_LEVELS: + KEEP_1 = \ + (sr + .value_counts(normalize=True) + .where(lambda i: i >= MIN_FREQ) + .dropna() + .index + .tolist() + ) + + KEEP_2 = \ + (sr + .value_counts(normalize=True) + .cumsum() + .where(lambda x: x <= COVERAGE) + .dropna() + .index + .tolist() + ) + + KEEP = set(KEEP_1).union(set(KEEP_2)) + + sr[-sr.isin(KEEP)] = 'Other' + sr = sr.map(lambda x: '_'.join(str(x).split())) + print("{} now has {} Levels and {} % Coverage".format(sr.name, sr.nunique(), 100 * COVERAGE)) + else: + print("{} doesn't have more than {} levels. Returning as-is.".format(sr.name, MIN_LEVELS)) + return sr \ No newline at end of file diff --git a/pysnippets/Scrubs/compress.py b/pysnippets/Scrubs/compress.py new file mode 100644 index 0000000..9703843 --- /dev/null +++ b/pysnippets/Scrubs/compress.py @@ -0,0 +1,50 @@ +import pandas as pd +import json +from sklearn.preprocessing import LabelEncoder +from .clip import clip_categorical + +def compress_numeric(COL): + """ + If the passed COL is numeric, + downcast it to the lowest size. + Else, + Return as-is. + Parameters + ----------- + COL: pandas.Series + The Series to shrink + Returns + ------- + if numeric, a compressed series + """ + if 'float' in str(COL.dtype): + print("Downcasting {} to float".format(COL.name)) + result = pd.to_numeric(COL, downcast='float', errors='ignore') + elif 'int' in str(COL.dtype): + print("Downcasting {} to int".format(COL.name)) + result = pd.to_numeric(COL, downcast='integer', errors='ignore') + else: + print("{} is not numeric. Returning as-is".format(COL.name)) + result = COL + return result + +def compress_categorical(COL): + """ + Encode categorical variables with >2 classes + Persist the encoder as JSON + """ + if COL.nunique() > 8: + print("{} has too many levels, clipping it.") + COL_clipped = clip_categorical(COL, MIN_LEVELS=8) + else: + COL_clipped = COL.copy() + + le = LabelEncoder() + lookup = pd.Series(le.classes_).to_dict() + COL_encoded = pd.Series(le.transform(COL_clipped), index=COL.index, name=COL.name) + + path_persist = "data/interim/{}_lookup.json".format(COL.name) + print("Persisting encoder at {}".format(path_persist)) + with open(path_persist, 'w') as fp: + json.dump(lookup, fp) + return COL_encoded \ No newline at end of file diff --git a/pysnippets/Scrubs/dummies.py b/pysnippets/Scrubs/dummies.py new file mode 100644 index 0000000..d679f25 --- /dev/null +++ b/pysnippets/Scrubs/dummies.py @@ -0,0 +1,60 @@ +import pandas as pd +from utils import time_my_func + +def make_dummies(ser, DROP_ONE=True): + """ + Create dummies for different levels of a clipped categorical + Drop one to avoid the trap + Parameters + ---------- + ser: input categorical series + pandas.Series + Returns + ------- + df_dum: dummy variables with one level dropped + pandas.DataFrame + """ + if ser.nunique() > 10: + print("Categorical has too many levels, consider clipping") + df_dum = None + else: + PREFIX = 'flag_' + ser.name + '_' + df_dum = pd.get_dummies(ser, prefix=PREFIX) + if DROP_ONE: + other_col = [c for c in df_dum if 'Other' in c] + to_drop_ = other_col if other_col else df_dum.mean().idxmin() + print("Dropping {}".format(to_drop_)) + df_dum.drop(to_drop_, axis=1, inplace=True) + return df_dum + +@time_my_func +def create_dummified_df(df, drop_one=True): + """ + For each (clipped) categorical column + * Create dummmy DataFrame + * Concat to input df + * Drop the categorical + + Returns + ------- + Passed df with flag_* columns replacing categoricals + """ + df_ = df.copy() + + cols_dummies = \ + (df_ + .select_dtypes(include=object) + .columns + .tolist()) + print("Creating dummies for \n{}".format(cols_dummies)) + + list_dummies_df = \ + [make_dummies(df_[COL], DROP_ONE=drop_one) for COL in cols_dummies] + + df_2 = \ + pd.concat([ + df_.drop(cols_dummies, axis=1), + pd.concat(list_dummies_df, axis=1) + ], axis=1) + + return df_2 \ No newline at end of file diff --git a/pysnippets/Scrubs/pipeline.py b/pysnippets/Scrubs/pipeline.py new file mode 100644 index 0000000..dc81555 --- /dev/null +++ b/pysnippets/Scrubs/pipeline.py @@ -0,0 +1,215 @@ +import numpy as np +import pandas as pd +from pandas import DataFrame, Series + +from utils import time_my_func +from obtain import load_file_to_db # type: ignore +from scrub import import_filter_df, get_target_df # type: ignore +from scrub import remove_zv_missings # type: ignore +from scrub import create_dummified_df # type: ignore +from scrub import clip_categorical # type: ignore +from backup import backup_df # type: ignore + +path_raw = "data/raw/gravity_contact_20180406.csv" +path_clean = "data/raw/clean_contact.csv" +path_clean_db = "data/interim/clean.db" + +# --- Declare Helper Objects --- + +dict_replace_1 = {} +def replace_spaces(i): + return "_".join([x.lower().strip() for x in i.split()]) + +def get_x_from_y(): + """ + """ + pass + +# --- Declare Data Processing Functions --- + +@time_my_func +def engineer_features(df): + """ + """ + print("Scrubbing Cell Description") + num_items_cellDescr = df['CELL_DESCRIPTION'].map(lambda i: len(str(i).split("|"))) + indexes_to_drop = \ + (num_items_cellDescr + .where(lambda i: i != 9) + .dropna() + .index + .tolist()) + + df.drop(indexes_to_drop, inplace=True) + + dict_replace_cellDescr = { + k:v for k, v in zip( + df['CELL_DESCRIPTION'].drop_duplicates().values, + df['CELL_DESCRIPTION'].drop_duplicates().map(lambda i: Series(i.split("|")).to_dict()).values + ) + } + + df_cellDescr = DataFrame(df['CELL_DESCRIPTION'].map(lambda i: dict_replace_cellDescr.get(i, None)).tolist()) + df.drop('CELL_DESCRIPTION', axis=1, inplace=True) + + cols_df_cellDescr = { + 0: 'CAMPAIGN_BRAND_CDS', + 1: 'CAMPAIGN_STATUS_CDS', + 2: 'CAMPAIGN_TYPE_CDS', + 3: 'CAMPAIGN_CONTENT_1_CDS', + 4: 'CAMPAIGN_CONTENT_2_CDS', + 5: 'CAMPAIGN_CONTENT_3_CDS' + } + + df_cellDescr.drop(range(6, 9), axis=1, inplace=True) + df_cellDescr.rename(columns=cols_df_cellDescr, inplace=True) + + print("Creating Campaign Brand") + if 'CAMPAIGN_BRAND' in df.columns: + df.drop('CAMPAIGN_BRAND', axis=1, inplace=True) + else: + pass + + df.loc[:, 'CAMPAIGN_BRAND'] = df_cellDescr['CAMPAIGN_BRAND_CDS'].values + + print("Creating Campaign Status") + df.loc[:, 'CAMPAIGN_STATUS'] = df_cellDescr['CAMPAIGN_STATUS_CDS'].values + + print("Creating Campaign Type") + dict_replace_CampaignType = { + "00": "Welcome_Email", + "01": "Email_w_ItemRaffle", + "02": "Event_Email_wo_Item", + "03": "Event_Email_w_Item", + "04": "Email_w_Pack", + "05": "Email_w_eVoucher", + "06": "Email_wo_Incentive", + "07": "SMS_w_eVoucher", + "08": "SMS_Info", + "09": "SMS_w_REG_Code", + "10": "Postal_Mail", + "11": "Pack_Mail", + "12": "Unknown", + "13": "Postal_Mail_w_eVoucher", + "14": "Postal_Mail_w_item", + "15": "Postal_Mail_w_REG_Code", + "16": "Email_w_Everything" + } + + df.loc[:, 'CAMPAIGN_TYPE'] = \ + (df_cellDescr['CAMPAIGN_TYPE_CDS'] + .fillna('_missing_') + .map(lambda i: str(i).zfill(2)) + .replace(dict_replace_CampaignType) + .pipe(clip_categorical, COVERAGE=0.99) + .values + ) + + print("Creating Campaign Content") + dict_replace_campaign_content = { + 'Other': 'Other', + 'day_00': 'day_00', + 'ipsos': 'ipsos', + 'ipsos_panel': 'ipsos', + 'iqos_national': 'iqos_national', + 'mgm_march_transition': 'mgm', + 'mgm_spring_last_march_push': 'mgm', + 'ob01_better2018_care': 'ob01_betterCare', + 'ob01_betterstories_2018_care': 'ob01_betterCare', + 'personicx_main_accessoires': 'personicx', + 'pr_amplification_newsarticle': 'pr_amplification', + 'valentines_day_2018': 'valentines_day', + 'valentines_day_white_mail_evoucher': 'valentines_day', + 'valentinesday_pack_mail': 'valentines_day' + } + + df.loc[:, 'CAMPAIGN_CONTENT'] = \ + (df_cellDescr['CAMPAIGN_CONTENT_1_CDS'] + .map(lambda i: i.strip().lower()) + .pipe(clip_categorical, COVERAGE=0.88) + .replace(dict_replace_campaign_content) + .values + ) + + del df_cellDescr + + print("Scrubbing Channel") + df.loc[:, 'CHANNEL'] = \ + (df['CHANNEL'] + .map(replace_spaces) + .pipe(clip_categorical) + .values + ) + + + df.drop(['CONTACT_HISTORY_ID'], axis=1, inplace=True) + return df + +def aggregate_df(df, df_y, cols_flags): + """ + """ + ckey = df.CONSUMER_KEY.sample(1).iloc[0] + try: + _, conversion_measure, date_survey = df_y.query("CONSUMER_KEY == {}".format(ckey)).values[0] + dfrp = df.query("CONSUMER_KEY == {}".format(ckey)) + dfrp = dfrp[dfrp.SELECTION_DATE <= date_survey] + + s1 = dfrp[cols_flags].mean() + + if len(s1) > 1: + pass + else: + s1 = Series(0, index=cols_flags) + + weekend_responses = \ + (df + .SELECTION_DATE + .dt.strftime("%a") + .value_counts() + .reindex(['Sat', 'Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri']) + ) + + s2 = Series({ + 'num_contacts': dfrp.shape[0], + 'num_months_active_contacts': dfrp.SELECTION_DATE.dt.strftime("%b_%Y").nunique(), + 'num_days_bw_lastContact_survey': (date_survey - dfrp.SELECTION_DATE.max())/np.timedelta64(1, 'D'), + 'perc_contacts_weekend': weekend_responses.loc[['Sat', 'Sun']].sum()/weekend_responses.sum(), + 'y': conversion_measure + }) + return pd.concat([s1, s2]) + except: + errors_aggregate_df.append(ckey) + pass + +@time_my_func +def get_aggregated_df(df, aggfunc): + """ + """ + df_y = get_target_df() + cols_flags = [x for x in df.columns if x.startswith('flag_')] + + df_aggregated = \ + (df + .groupby("CONSUMER_KEY") + .apply(aggfunc, df_y=df_y, cols_flags=cols_flags) + ) + + return df_aggregated + + +if __name__ == '__main__': + errors_aggregate_df = [] + df_aggregated = (import_filter_df(path_raw) + .pipe(remove_zv_missings) + .pipe(engineer_features) + .pipe(create_dummified_df) + .pipe(get_aggregated_df, aggfunc=aggregate_df) + ) + + backup_df(df=df_aggregated, path_clean=path_clean) + + tbl_ = path_clean.split('/')[-1].replace('clean_', '').replace('.csv', '').strip() + load_file_to_db(path_to_file=path_clean, + path_to_db=path_clean_db, + table_name=tbl_, + delim=',') \ No newline at end of file diff --git a/pysnippets/Scrubs/utils.py b/pysnippets/Scrubs/utils.py new file mode 100644 index 0000000..6786690 --- /dev/null +++ b/pysnippets/Scrubs/utils.py @@ -0,0 +1,19 @@ +import time + +def time_my_func(my_func): + def timed(*args, **kwargs): + """ + Decorates a function to print its execution time. + """ + message_top = "\nStarting {}".format(my_func.__name__) + print(message_top) + print("-" * len(message_top)) + t0 = time.time() + + result = my_func(*args, **kwargs) + + message_bot = "\nCompleted in {:.2f} minutes.".format((time.time() - t0)/60) + print(message_bot) + print('-' * len(message_bot)) + return result + return timed \ No newline at end of file