Skip to content

Commit

Permalink
Release/1.3.1 (#361)
Browse files Browse the repository at this point in the history
* feat(MLOP-2236): add NTZ (#360)

* feat: NTZ and new tests
  • Loading branch information
ralphrass authored Jun 14, 2024
1 parent 99662f6 commit 14ff019
Show file tree
Hide file tree
Showing 95 changed files with 487 additions and 162 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ jobs:
Pipeline:
if: github.ref == 'refs/heads/master'

runs-on: ubuntu-22.04
container: quintoandar/python-3-7-java
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Preferably use **Added**, **Changed**, **Removed** and **Fixed** topics in each

## [Unreleased]

## [1.3.1](https://github.com/quintoandar/butterfree/releases/tag/1.3.1)
* Timestamp NTZ available ([#360](https://github.com/quintoandar/butterfree/pull/360))

## [1.3.0](https://github.com/quintoandar/butterfree/releases/tag/1.3.0)
* Bump versions ([#355](https://github.com/quintoandar/butterfree/pull/355))
* Sphinx version ([#356](https://github.com/quintoandar/butterfree/pull/356))
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ VERSION := $(shell grep __version__ setup.py | head -1 | cut -d \" -f2 | cut -d
.PHONY: environment
## create virtual environment for butterfree
environment:
@pyenv install -s 3.7.13
@pyenv virtualenv 3.7.13 butterfree
@pyenv install -s 3.9.19
@pyenv virtualenv 3.9.19 butterfree
@pyenv local butterfree
@PYTHONPATH=. python -m pip install --upgrade pip

Expand Down
2 changes: 1 addition & 1 deletion butterfree/_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from butterfree._cli import migrate

app = typer.Typer()
app = typer.Typer(no_args_is_help=True)
app.add_typer(migrate.app, name="migrate")

if __name__ == "__main__":
Expand Down
6 changes: 4 additions & 2 deletions butterfree/_cli/migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
from butterfree.migrations.database_migration import ALLOWED_DATABASE
from butterfree.pipelines import FeatureSetPipeline

app = typer.Typer(help="Apply the automatic migrations in a database.")
app = typer.Typer(
help="Apply the automatic migrations in a database.", no_args_is_help=True
)

logger = __logger("migrate", True)

Expand Down Expand Up @@ -89,7 +91,7 @@ def __fs_objects(path: str) -> Set[FeatureSetPipeline]:
instances.add(value)

logger.info("Creating instances...")
return set(value() for value in instances)
return set(value() for value in instances) # type: ignore


PATH = typer.Argument(
Expand Down
1 change: 1 addition & 0 deletions butterfree/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Holds connection clients."""

from butterfree.clients.abstract_client import AbstractClient
from butterfree.clients.cassandra_client import CassandraClient
from butterfree.clients.spark_client import SparkClient
Expand Down
5 changes: 3 additions & 2 deletions butterfree/clients/abstract_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Abstract class for database clients."""

from abc import ABC, abstractmethod
from typing import Any
from typing import Any, Optional


class AbstractClient(ABC):
Expand All @@ -25,7 +26,7 @@ def sql(self, query: str) -> Any:
pass

@abstractmethod
def get_schema(self, table: str, database: str = None) -> Any:
def get_schema(self, table: str, database: Optional[str] = None) -> Any:
"""Returns desired table schema.
Attributes:
Expand Down
5 changes: 4 additions & 1 deletion butterfree/clients/cassandra_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""CassandraClient entity."""

from ssl import CERT_REQUIRED, PROTOCOL_TLSv1
from typing import Dict, List, Optional

Expand Down Expand Up @@ -102,7 +103,9 @@ def sql(self, query: str) -> ResponseFuture:
"""
return self.conn.execute(query)

def get_schema(self, table: str, database: str = None) -> List[Dict[str, str]]:
def get_schema(
self, table: str, database: Optional[str] = None
) -> List[Dict[str, str]]:
"""Returns desired table schema.
Attributes:
Expand Down
25 changes: 16 additions & 9 deletions butterfree/clients/spark_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def read(

return df_reader.format(format).load(path=path, **options) # type: ignore

def read_table(self, table: str, database: str = None) -> DataFrame:
def read_table(self, table: str, database: Optional[str] = None) -> DataFrame:
"""Use the SparkSession.read interface to read a metastore table.
Args:
Expand Down Expand Up @@ -179,9 +179,9 @@ def write_table(
database: Optional[str],
table_name: str,
path: str,
format_: str = None,
mode: str = None,
partition_by: List[str] = None,
format_: Optional[str] = None,
mode: Optional[str] = None,
partition_by: Optional[List[str]] = None,
**options: Any,
) -> None:
"""Receive a spark DataFrame and write it as a table in metastore.
Expand Down Expand Up @@ -231,7 +231,10 @@ def create_temporary_view(dataframe: DataFrame, name: str) -> Any:
return dataframe.writeStream.format("memory").queryName(name).start()

def add_table_partitions(
self, partitions: List[Dict[str, Any]], table: str, database: str = None
self,
partitions: List[Dict[str, Any]],
table: str,
database: Optional[str] = None,
) -> None:
"""Add partitions to an existing table.
Expand Down Expand Up @@ -259,9 +262,11 @@ def add_table_partitions(
key_values_expr = [
", ".join(
[
"{} = {}".format(k, v)
if not isinstance(v, str)
else "{} = '{}'".format(k, v)
(
"{} = {}".format(k, v)
if not isinstance(v, str)
else "{} = '{}'".format(k, v)
)
for k, v in partition.items()
]
)
Expand Down Expand Up @@ -314,7 +319,9 @@ def _convert_schema(self, schema: DataFrame) -> List[Dict[str, str]]:

return converted_schema

def get_schema(self, table: str, database: str = None) -> List[Dict[str, str]]:
def get_schema(
self, table: str, database: Optional[str] = None
) -> List[Dict[str, str]]:
"""Returns desired table schema.
Attributes:
Expand Down
25 changes: 13 additions & 12 deletions butterfree/configs/db/cassandra_config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Holds configurations to read and write with Spark to Cassandra DB."""

from typing import Any, Dict, List, Optional

from butterfree.configs import environment
Expand Down Expand Up @@ -32,18 +33,18 @@ class CassandraConfig(AbstractWriteConfig):

def __init__(
self,
username: str = None,
password: str = None,
host: str = None,
keyspace: str = None,
mode: str = None,
format_: str = None,
stream_processing_time: str = None,
stream_output_mode: str = None,
stream_checkpoint_path: str = None,
read_consistency_level: str = None,
write_consistency_level: str = None,
local_dc: str = None,
username: Optional[str] = None,
password: Optional[str] = None,
host: Optional[str] = None,
keyspace: Optional[str] = None,
mode: Optional[str] = None,
format_: Optional[str] = None,
stream_processing_time: Optional[str] = None,
stream_output_mode: Optional[str] = None,
stream_checkpoint_path: Optional[str] = None,
read_consistency_level: Optional[str] = None,
write_consistency_level: Optional[str] = None,
local_dc: Optional[str] = None,
):
self.username = username
self.password = password
Expand Down
17 changes: 9 additions & 8 deletions butterfree/configs/db/kafka_config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Holds configurations to read and write with Spark to Kafka."""

from typing import Any, Dict, List, Optional

from butterfree.configs import environment
Expand All @@ -25,13 +26,13 @@ class KafkaConfig(AbstractWriteConfig):

def __init__(
self,
kafka_topic: str = None,
kafka_connection_string: str = None,
mode: str = None,
format_: str = None,
stream_processing_time: str = None,
stream_output_mode: str = None,
stream_checkpoint_path: str = None,
kafka_topic: Optional[str] = None,
kafka_connection_string: Optional[str] = None,
mode: Optional[str] = None,
format_: Optional[str] = None,
stream_processing_time: Optional[str] = None,
stream_output_mode: Optional[str] = None,
stream_checkpoint_path: Optional[str] = None,
):
self.kafka_topic = kafka_topic
self.kafka_connection_string = kafka_connection_string
Expand Down Expand Up @@ -147,4 +148,4 @@ def translate(self, schema: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
Kafka schema.
"""
pass
return [{}]
8 changes: 4 additions & 4 deletions butterfree/configs/db/metastore_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class MetastoreConfig(AbstractWriteConfig):

def __init__(
self,
path: str = None,
mode: str = None,
format_: str = None,
file_system: str = None,
path: Optional[str] = None,
mode: Optional[str] = None,
format_: Optional[str] = None,
file_system: Optional[str] = None,
):
self.path = path
self.mode = mode
Expand Down
5 changes: 4 additions & 1 deletion butterfree/configs/environment.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Holds functions for managing the running environment."""

import os
from typing import Optional

Expand Down Expand Up @@ -34,7 +35,9 @@ def __init__(self, variable_name: str):
)


def get_variable(variable_name: str, default_value: str = None) -> Optional[str]:
def get_variable(
variable_name: str, default_value: Optional[str] = None
) -> Optional[str]:
"""Gets an environment variable.
The variable comes from it's explicitly declared value in the running
Expand Down
1 change: 1 addition & 0 deletions butterfree/constants/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Holds constant attributes that are common for Butterfree."""

from butterfree.constants.data_type import DataType

__all__ = ["DataType"]
2 changes: 2 additions & 0 deletions butterfree/constants/data_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
IntegerType,
LongType,
StringType,
TimestampNTZType,
TimestampType,
)
from typing_extensions import final
Expand All @@ -21,6 +22,7 @@
class DataType(Enum):
"""Holds constants for data types within Butterfree."""

TIMESTAMP_NTZ = (TimestampNTZType(), "timestamp", "TIMESTAMP_NTZ")
TIMESTAMP = (TimestampType(), "timestamp", "TIMESTAMP")
BINARY = (BinaryType(), "boolean", "BINARY")
BOOLEAN = (BooleanType(), "boolean", "BOOLEAN")
Expand Down
1 change: 1 addition & 0 deletions butterfree/constants/migrations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Migrations' Constants."""

from butterfree.constants import columns

PARTITION_BY = [
Expand Down
1 change: 1 addition & 0 deletions butterfree/dataframe_service/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Dataframe optimization components regarding Butterfree."""

from butterfree.dataframe_service.incremental_strategy import IncrementalStrategy
from butterfree.dataframe_service.partitioning import extract_partition_values
from butterfree.dataframe_service.repartition import repartition_df, repartition_sort_df
Expand Down
17 changes: 13 additions & 4 deletions butterfree/dataframe_service/incremental_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

from typing import Optional

from pyspark.sql import DataFrame


Expand All @@ -18,7 +20,7 @@ class IncrementalStrategy:
filter can properly work with the defined upper and lower bounds.
"""

def __init__(self, column: str = None):
def __init__(self, column: Optional[str] = None):
self.column = column

def from_milliseconds(self, column_name: str) -> IncrementalStrategy:
Expand All @@ -32,7 +34,9 @@ def from_milliseconds(self, column_name: str) -> IncrementalStrategy:
"""
return IncrementalStrategy(column=f"from_unixtime({column_name}/ 1000.0)")

def from_string(self, column_name: str, mask: str = None) -> IncrementalStrategy:
def from_string(
self, column_name: str, mask: Optional[str] = None
) -> IncrementalStrategy:
"""Create a column expression from ts column defined as a simple string.
Args:
Expand Down Expand Up @@ -66,7 +70,9 @@ def from_year_month_day_partitions(
f"'-', string({day_column}))"
)

def get_expression(self, start_date: str = None, end_date: str = None) -> str:
def get_expression(
self, start_date: Optional[str] = None, end_date: Optional[str] = None
) -> str:
"""Get the incremental filter expression using the defined dates.
Both arguments can be set to defined a specific date interval, but it's
Expand Down Expand Up @@ -95,7 +101,10 @@ def get_expression(self, start_date: str = None, end_date: str = None) -> str:
return f"date({self.column}) <= date('{end_date}')"

def filter_with_incremental_strategy(
self, dataframe: DataFrame, start_date: str = None, end_date: str = None
self,
dataframe: DataFrame,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> DataFrame:
"""Filters the dataframe according to the date boundaries.
Expand Down
13 changes: 7 additions & 6 deletions butterfree/dataframe_service/repartition.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Module where there are repartition methods."""
from typing import List

from typing import List, Optional

from pyspark.sql.dataframe import DataFrame

Expand All @@ -10,7 +11,7 @@


def _num_partitions_definition(
num_processors: int = None, num_partitions: int = None
num_processors: Optional[int] = None, num_partitions: Optional[int] = None
) -> int:
num_partitions = (
num_processors * PARTITION_PROCESSOR_RATIO
Expand All @@ -24,8 +25,8 @@ def _num_partitions_definition(
def repartition_df(
dataframe: DataFrame,
partition_by: List[str],
num_partitions: int = None,
num_processors: int = None,
num_partitions: Optional[int] = None,
num_processors: Optional[int] = None,
) -> DataFrame:
"""Partition the DataFrame.
Expand All @@ -47,8 +48,8 @@ def repartition_sort_df(
dataframe: DataFrame,
partition_by: List[str],
order_by: List[str],
num_processors: int = None,
num_partitions: int = None,
num_processors: Optional[int] = None,
num_partitions: Optional[int] = None,
) -> DataFrame:
"""Partition and Sort the DataFrame.
Expand Down
1 change: 1 addition & 0 deletions butterfree/extract/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""The Source Component of a Feature Set."""

from butterfree.extract.source import Source

__all__ = ["Source"]
1 change: 1 addition & 0 deletions butterfree/extract/pre_processing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Pre Processing Components regarding Readers."""

from butterfree.extract.pre_processing.explode_json_column_transform import (
explode_json_column,
)
Expand Down
Loading

0 comments on commit 14ff019

Please sign in to comment.