Skip to content

Commit

Permalink
added cloning data files
Browse files Browse the repository at this point in the history
  • Loading branch information
Toan Quach committed Jan 13, 2025
1 parent beee00d commit f79d591
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 45 deletions.
2 changes: 1 addition & 1 deletion taipy/core/data/_data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,5 +188,5 @@ def _clone(
dn._owner_id = cls._get_owner_id(dn._scope, cycle_id, scenario_id)
dn._parent_ids = set()
cls._set(dn)
# dn._clone_data()
dn._clone_data()
return dn
53 changes: 36 additions & 17 deletions taipy/core/data/_file_datanode_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class _FileDataNodeMixin:
_PATH_KEY = "path"
_DEFAULT_PATH_KEY = "default_path"
_IS_GENERATED_KEY = "is_generated"
__TAIPY_CLONED_PREFIX = "TAIPY_CLONED"

__logger = _TaipyLogger._get_logger()

Expand Down Expand Up @@ -109,12 +110,14 @@ def _get_downloadable_path(self) -> str:

return ""

def _upload(self,
path: str,
upload_checker: Optional[Callable[[str, Any], bool]] = None,
editor_id: Optional[str] = None,
comment: Optional[str] = None,
**kwargs: Any) -> ReasonCollection:
def _upload(
self,
path: str,
upload_checker: Optional[Callable[[str, Any], bool]] = None,
editor_id: Optional[str] = None,
comment: Optional[str] = None,
**kwargs: Any,
) -> ReasonCollection:
"""Upload a file data to the data node.
Arguments:
Expand All @@ -136,20 +139,23 @@ def _upload(self,
from ._data_manager_factory import _DataManagerFactory

reasons = ReasonCollection()
if (editor_id
and self.edit_in_progress # type: ignore[attr-defined]
and self.editor_id != editor_id # type: ignore[attr-defined]
and (not self.editor_expiration_date # type: ignore[attr-defined]
or self.editor_expiration_date > datetime.now())): # type: ignore[attr-defined]
if (
editor_id
and self.edit_in_progress # type: ignore[attr-defined]
and self.editor_id != editor_id # type: ignore[attr-defined]
and (
not self.editor_expiration_date # type: ignore[attr-defined]
or self.editor_expiration_date > datetime.now()
)
): # type: ignore[attr-defined]
reasons._add_reason(self.id, DataNodeEditInProgress(self.id)) # type: ignore[attr-defined]
return reasons

up_path = pathlib.Path(path)
try:
upload_data = self._read_from_path(str(up_path))
except Exception as err:
self.__logger.error(f"Error uploading `{up_path.name}` to data "
f"node `{self.id}`:") # type: ignore[attr-defined]
self.__logger.error(f"Error uploading `{up_path.name}` to data " f"node `{self.id}`:") # type: ignore[attr-defined]
self.__logger.error(f"Error: {err}")
reasons._add_reason(self.id, UploadFileCanNotBeRead(up_path.name, self.id)) # type: ignore[attr-defined]
return reasons
Expand All @@ -161,7 +167,8 @@ def _upload(self,
self.__logger.error(
f"Error with the upload checker `{upload_checker.__name__}` "
f"while checking `{up_path.name}` file for upload to the data "
f"node `{self.id}`:") # type: ignore[attr-defined]
f"node `{self.id}`:"
) # type: ignore[attr-defined]
self.__logger.error(f"Error: {err}")
can_upload = False

Expand All @@ -171,9 +178,12 @@ def _upload(self,

shutil.copy(up_path, self.path)

self.track_edit(timestamp=datetime.now(), # type: ignore[attr-defined]
editor_id=editor_id,
comment=comment, **kwargs)
self.track_edit(
timestamp=datetime.now(), # type: ignore[attr-defined]
editor_id=editor_id,
comment=comment,
**kwargs,
)
self.unlock_edit() # type: ignore[attr-defined]

_DataManagerFactory._build_manager()._set(self) # type: ignore[arg-type]
Expand Down Expand Up @@ -212,3 +222,12 @@ def _migrate_path(self, storage_type, old_path) -> str:
if os.path.exists(old_path):
shutil.move(old_path, new_path)
return new_path

def _clone_data_file(self, id: str) -> Optional[str]:
if os.path.exists(self.path):
file_path, file_name = os.path.split(self.path)
new_file_path = os.path.join(file_path, f"TAIPY_CLONE_{id}_{file_name}")
shutil.copy(self.path, new_file_path)
return new_file_path
# TODO: update file path?????
return None
3 changes: 3 additions & 0 deletions taipy/core/data/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,6 @@ def _write(self, data: Any, columns: Optional[List[str]] = None):
encoding=properties[self.__ENCODING_KEY],
header=properties[self._HAS_HEADER_PROPERTY],
)

def _clone_data(self):
return self._clone_data_file(self.id)
52 changes: 32 additions & 20 deletions taipy/core/data/data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,22 +433,27 @@ def append(self, data, editor_id: Optional[str] = None, comment: Optional[str] =
corresponding to this write.
"""
from ._data_manager_factory import _DataManagerFactory
if (editor_id

if (
editor_id
and self.edit_in_progress
and self.editor_id != editor_id
and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())):
and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())
):
raise DataNodeIsBeingEdited(self.id, self.editor_id)
self._append(data)
self.track_edit(editor_id=editor_id, comment=comment, **kwargs)
self.unlock_edit()
_DataManagerFactory._build_manager()._set(self)

def write(self,
data,
job_id: Optional[JobId] = None,
editor_id: Optional[str] = None,
comment: Optional[str] = None,
**kwargs: Any):
def write(
self,
data,
job_id: Optional[JobId] = None,
editor_id: Optional[str] = None,
comment: Optional[str] = None,
**kwargs: Any,
):
"""Write some data to this data node.
once the data is written, the data node is unlocked and the edit is tracked.
Expand All @@ -461,10 +466,12 @@ def write(self,
**kwargs (Any): Extra information to attach to the edit document
corresponding to this write.
"""
if (editor_id
if (
editor_id
and self.edit_in_progress
and self.editor_id != editor_id
and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())):
and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())
):
raise DataNodeIsBeingEdited(self.id, self.editor_id)
self._write(data)
self.track_edit(job_id=job_id, editor_id=editor_id, comment=comment, **kwargs)
Expand All @@ -473,12 +480,14 @@ def write(self,

_DataManagerFactory._build_manager()._set(self)

def track_edit(self,
job_id: Optional[str] = None,
editor_id: Optional[str] = None,
timestamp: Optional[datetime] = None,
comment: Optional[str] = None,
**options: Any):
def track_edit(
self,
job_id: Optional[str] = None,
editor_id: Optional[str] = None,
timestamp: Optional[datetime] = None,
comment: Optional[str] = None,
**options: Any,
):
"""Creates and adds a new entry in the edits attribute without writing the data.
Arguments:
Expand Down Expand Up @@ -627,15 +636,15 @@ def _get_rank(self, scenario_config_id: str) -> int:
If the data node config is not part of the scenario config, 0xfffc is returned as an infinite rank.
"""
if not scenario_config_id:
return 0xfffb
return 0xFFFB
dn_config = Config.data_nodes.get(self._config_id, None)
if not dn_config:
self._logger.error(f"Data node config `{self.config_id}` for data node `{self.id}` is not found.")
return 0xfffd
return 0xFFFD
if not dn_config._ranks:
self._logger.error(f"Data node config `{self.config_id}` for data node `{self.id}` has no rank.")
return 0xfffe
return dn_config._ranks.get(scenario_config_id, 0xfffc)
return 0xFFFE
return dn_config._ranks.get(scenario_config_id, 0xFFFC)

@abstractmethod
def _read(self):
Expand Down Expand Up @@ -676,6 +685,9 @@ def _get_last_modified_datetime(cls, path: Optional[str] = None) -> Optional[dat

return last_modified_datetime

def _clone_data(self):
raise NotImplementedError

@staticmethod
def _class_map():
def all_subclasses(cls):
Expand Down
3 changes: 3 additions & 0 deletions taipy/core/data/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,6 @@ def _write(self, data: Any):
self._write_excel_with_single_sheet(
data.to_excel, self._path, index=False, header=properties[self._HAS_HEADER_PROPERTY] or None
)

def _clone_data(self):
return self._clone_data_file(self.id)
3 changes: 3 additions & 0 deletions taipy/core/data/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ def _write(self, data: Any):
with open(self._path, "w", encoding=self.properties[self.__ENCODING_KEY]) as f: # type: ignore
json.dump(data, f, indent=4, cls=self._encoder)

def _clone_data(self):
return self._clone_data_file(self.id)


class _DefaultJSONEncoder(json.JSONEncoder):
def default(self, o):
Expand Down
3 changes: 3 additions & 0 deletions taipy/core/data/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,6 @@ def _append(self, data: Any):

def _write(self, data: Any):
self._write_with_kwargs(data)

def _clone_data(self):
return self._clone_data_file(self.id)
3 changes: 3 additions & 0 deletions taipy/core/data/pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,6 @@ def _read_from_path(self, path: Optional[str] = None, **read_kwargs) -> Any:
def _write(self, data):
with open(self._path, "wb") as pf:
pickle.dump(data, pf)

def _clone_data(self):
return self._clone_data_file(self.id)
20 changes: 13 additions & 7 deletions taipy/core/task/_task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ def _set(cls, task: Task) -> None:
cls.__save_data_nodes(task.output.values())
super()._set(task)

@classmethod
def _get_owner_id(
cls, scope, cycle_id, scenario_id
) -> Union[Optional[SequenceId], Optional[ScenarioId], Optional[CycleId]]:
if scope == Scope.SCENARIO:
return scenario_id
elif scope == Scope.CYCLE:
return cycle_id
else:
return None

@classmethod
def _bulk_get_or_create(
cls,
Expand All @@ -79,13 +90,7 @@ def _bulk_get_or_create(
]
task_config_data_nodes = [data_nodes[dn_config] for dn_config in task_dn_configs]
scope = min(dn.scope for dn in task_config_data_nodes) if len(task_config_data_nodes) != 0 else Scope.GLOBAL
owner_id: Union[Optional[SequenceId], Optional[ScenarioId], Optional[CycleId]]
if scope == Scope.SCENARIO:
owner_id = scenario_id
elif scope == Scope.CYCLE:
owner_id = cycle_id
else:
owner_id = None
owner_id = cls._get_owner_id(scope, cycle_id, scenario_id)

tasks_configs_and_owner_id.append((task_config, owner_id))

Expand Down Expand Up @@ -234,6 +239,7 @@ def _clone(cls, task: Task, cycle_id: Optional[CycleId] = None, scenario_id: Opt
outputs = [data_manager._clone(o, cycle_id, scenario_id) for o in task.output.values()]
task.id = task._new_id(task.config_id)
task._parent_ids = set()
task._owner_id = cls._get_owner_id(task.scope, cycle_id, scenario_id)
for dn in set(inputs + outputs):
dn._parent_ids.update([task.id])
cls._set(task)
Expand Down

0 comments on commit f79d591

Please sign in to comment.