diff --git a/taipy/core/data/_data_manager.py b/taipy/core/data/_data_manager.py index 98cd5aa71c..a22f4734c4 100644 --- a/taipy/core/data/_data_manager.py +++ b/taipy/core/data/_data_manager.py @@ -188,5 +188,5 @@ def _clone( dn._owner_id = cls._get_owner_id(dn._scope, cycle_id, scenario_id) dn._parent_ids = set() cls._set(dn) - # dn._clone_data() + dn._clone_data() return dn diff --git a/taipy/core/data/_file_datanode_mixin.py b/taipy/core/data/_file_datanode_mixin.py index ff87146756..78f4ab4d0d 100644 --- a/taipy/core/data/_file_datanode_mixin.py +++ b/taipy/core/data/_file_datanode_mixin.py @@ -42,6 +42,7 @@ class _FileDataNodeMixin: _PATH_KEY = "path" _DEFAULT_PATH_KEY = "default_path" _IS_GENERATED_KEY = "is_generated" + __TAIPY_CLONED_PREFIX = "TAIPY_CLONED" __logger = _TaipyLogger._get_logger() @@ -109,12 +110,14 @@ def _get_downloadable_path(self) -> str: return "" - def _upload(self, - path: str, - upload_checker: Optional[Callable[[str, Any], bool]] = None, - editor_id: Optional[str] = None, - comment: Optional[str] = None, - **kwargs: Any) -> ReasonCollection: + def _upload( + self, + path: str, + upload_checker: Optional[Callable[[str, Any], bool]] = None, + editor_id: Optional[str] = None, + comment: Optional[str] = None, + **kwargs: Any, + ) -> ReasonCollection: """Upload a file data to the data node. Arguments: @@ -136,11 +139,15 @@ def _upload(self, from ._data_manager_factory import _DataManagerFactory reasons = ReasonCollection() - if (editor_id - and self.edit_in_progress # type: ignore[attr-defined] - and self.editor_id != editor_id # type: ignore[attr-defined] - and (not self.editor_expiration_date # type: ignore[attr-defined] - or self.editor_expiration_date > datetime.now())): # type: ignore[attr-defined] + if ( + editor_id + and self.edit_in_progress # type: ignore[attr-defined] + and self.editor_id != editor_id # type: ignore[attr-defined] + and ( + not self.editor_expiration_date # type: ignore[attr-defined] + or self.editor_expiration_date > datetime.now() + ) + ): # type: ignore[attr-defined] reasons._add_reason(self.id, DataNodeEditInProgress(self.id)) # type: ignore[attr-defined] return reasons @@ -148,8 +155,7 @@ def _upload(self, try: upload_data = self._read_from_path(str(up_path)) except Exception as err: - self.__logger.error(f"Error uploading `{up_path.name}` to data " - f"node `{self.id}`:") # type: ignore[attr-defined] + self.__logger.error(f"Error uploading `{up_path.name}` to data " f"node `{self.id}`:") # type: ignore[attr-defined] self.__logger.error(f"Error: {err}") reasons._add_reason(self.id, UploadFileCanNotBeRead(up_path.name, self.id)) # type: ignore[attr-defined] return reasons @@ -161,7 +167,8 @@ def _upload(self, self.__logger.error( f"Error with the upload checker `{upload_checker.__name__}` " f"while checking `{up_path.name}` file for upload to the data " - f"node `{self.id}`:") # type: ignore[attr-defined] + f"node `{self.id}`:" + ) # type: ignore[attr-defined] self.__logger.error(f"Error: {err}") can_upload = False @@ -171,9 +178,12 @@ def _upload(self, shutil.copy(up_path, self.path) - self.track_edit(timestamp=datetime.now(), # type: ignore[attr-defined] - editor_id=editor_id, - comment=comment, **kwargs) + self.track_edit( + timestamp=datetime.now(), # type: ignore[attr-defined] + editor_id=editor_id, + comment=comment, + **kwargs, + ) self.unlock_edit() # type: ignore[attr-defined] _DataManagerFactory._build_manager()._set(self) # type: ignore[arg-type] @@ -212,3 +222,12 @@ def _migrate_path(self, storage_type, old_path) -> str: if os.path.exists(old_path): shutil.move(old_path, new_path) return new_path + + def _clone_data_file(self, id: str) -> Optional[str]: + if os.path.exists(self.path): + file_path, file_name = os.path.split(self.path) + new_file_path = os.path.join(file_path, f"TAIPY_CLONE_{id}_{file_name}") + shutil.copy(self.path, new_file_path) + return new_file_path + # TODO: update file path????? + return None diff --git a/taipy/core/data/csv.py b/taipy/core/data/csv.py index 083215bc4e..51fd9f986b 100644 --- a/taipy/core/data/csv.py +++ b/taipy/core/data/csv.py @@ -192,3 +192,6 @@ def _write(self, data: Any, columns: Optional[List[str]] = None): encoding=properties[self.__ENCODING_KEY], header=properties[self._HAS_HEADER_PROPERTY], ) + + def _clone_data(self): + return self._clone_data_file(self.id) diff --git a/taipy/core/data/data_node.py b/taipy/core/data/data_node.py index 08e8b2e1da..75dbed6691 100644 --- a/taipy/core/data/data_node.py +++ b/taipy/core/data/data_node.py @@ -433,22 +433,27 @@ def append(self, data, editor_id: Optional[str] = None, comment: Optional[str] = corresponding to this write. """ from ._data_manager_factory import _DataManagerFactory - if (editor_id + + if ( + editor_id and self.edit_in_progress and self.editor_id != editor_id - and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())): + and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now()) + ): raise DataNodeIsBeingEdited(self.id, self.editor_id) self._append(data) self.track_edit(editor_id=editor_id, comment=comment, **kwargs) self.unlock_edit() _DataManagerFactory._build_manager()._set(self) - def write(self, - data, - job_id: Optional[JobId] = None, - editor_id: Optional[str] = None, - comment: Optional[str] = None, - **kwargs: Any): + def write( + self, + data, + job_id: Optional[JobId] = None, + editor_id: Optional[str] = None, + comment: Optional[str] = None, + **kwargs: Any, + ): """Write some data to this data node. once the data is written, the data node is unlocked and the edit is tracked. @@ -461,10 +466,12 @@ def write(self, **kwargs (Any): Extra information to attach to the edit document corresponding to this write. """ - if (editor_id + if ( + editor_id and self.edit_in_progress and self.editor_id != editor_id - and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())): + and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now()) + ): raise DataNodeIsBeingEdited(self.id, self.editor_id) self._write(data) self.track_edit(job_id=job_id, editor_id=editor_id, comment=comment, **kwargs) @@ -473,12 +480,14 @@ def write(self, _DataManagerFactory._build_manager()._set(self) - def track_edit(self, - job_id: Optional[str] = None, - editor_id: Optional[str] = None, - timestamp: Optional[datetime] = None, - comment: Optional[str] = None, - **options: Any): + def track_edit( + self, + job_id: Optional[str] = None, + editor_id: Optional[str] = None, + timestamp: Optional[datetime] = None, + comment: Optional[str] = None, + **options: Any, + ): """Creates and adds a new entry in the edits attribute without writing the data. Arguments: @@ -627,15 +636,15 @@ def _get_rank(self, scenario_config_id: str) -> int: If the data node config is not part of the scenario config, 0xfffc is returned as an infinite rank. """ if not scenario_config_id: - return 0xfffb + return 0xFFFB dn_config = Config.data_nodes.get(self._config_id, None) if not dn_config: self._logger.error(f"Data node config `{self.config_id}` for data node `{self.id}` is not found.") - return 0xfffd + return 0xFFFD if not dn_config._ranks: self._logger.error(f"Data node config `{self.config_id}` for data node `{self.id}` has no rank.") - return 0xfffe - return dn_config._ranks.get(scenario_config_id, 0xfffc) + return 0xFFFE + return dn_config._ranks.get(scenario_config_id, 0xFFFC) @abstractmethod def _read(self): @@ -676,6 +685,9 @@ def _get_last_modified_datetime(cls, path: Optional[str] = None) -> Optional[dat return last_modified_datetime + def _clone_data(self): + raise NotImplementedError + @staticmethod def _class_map(): def all_subclasses(cls): diff --git a/taipy/core/data/excel.py b/taipy/core/data/excel.py index 3e39c1160f..9150a38acd 100644 --- a/taipy/core/data/excel.py +++ b/taipy/core/data/excel.py @@ -339,3 +339,6 @@ def _write(self, data: Any): self._write_excel_with_single_sheet( data.to_excel, self._path, index=False, header=properties[self._HAS_HEADER_PROPERTY] or None ) + + def _clone_data(self): + return self._clone_data_file(self.id) diff --git a/taipy/core/data/json.py b/taipy/core/data/json.py index c18ab8d7b1..fec2ad196c 100644 --- a/taipy/core/data/json.py +++ b/taipy/core/data/json.py @@ -158,6 +158,9 @@ def _write(self, data: Any): with open(self._path, "w", encoding=self.properties[self.__ENCODING_KEY]) as f: # type: ignore json.dump(data, f, indent=4, cls=self._encoder) + def _clone_data(self): + return self._clone_data_file(self.id) + class _DefaultJSONEncoder(json.JSONEncoder): def default(self, o): diff --git a/taipy/core/data/parquet.py b/taipy/core/data/parquet.py index 7c526b35d8..2f64d9ac11 100644 --- a/taipy/core/data/parquet.py +++ b/taipy/core/data/parquet.py @@ -249,3 +249,6 @@ def _append(self, data: Any): def _write(self, data: Any): self._write_with_kwargs(data) + + def _clone_data(self): + return self._clone_data_file(self.id) diff --git a/taipy/core/data/pickle.py b/taipy/core/data/pickle.py index b86e82d6c7..8e5de42563 100644 --- a/taipy/core/data/pickle.py +++ b/taipy/core/data/pickle.py @@ -108,3 +108,6 @@ def _read_from_path(self, path: Optional[str] = None, **read_kwargs) -> Any: def _write(self, data): with open(self._path, "wb") as pf: pickle.dump(data, pf) + + def _clone_data(self): + return self._clone_data_file(self.id) diff --git a/taipy/core/task/_task_manager.py b/taipy/core/task/_task_manager.py index 6566112ba1..722467f10e 100644 --- a/taipy/core/task/_task_manager.py +++ b/taipy/core/task/_task_manager.py @@ -57,6 +57,17 @@ def _set(cls, task: Task) -> None: cls.__save_data_nodes(task.output.values()) super()._set(task) + @classmethod + def _get_owner_id( + cls, scope, cycle_id, scenario_id + ) -> Union[Optional[SequenceId], Optional[ScenarioId], Optional[CycleId]]: + if scope == Scope.SCENARIO: + return scenario_id + elif scope == Scope.CYCLE: + return cycle_id + else: + return None + @classmethod def _bulk_get_or_create( cls, @@ -79,13 +90,7 @@ def _bulk_get_or_create( ] task_config_data_nodes = [data_nodes[dn_config] for dn_config in task_dn_configs] scope = min(dn.scope for dn in task_config_data_nodes) if len(task_config_data_nodes) != 0 else Scope.GLOBAL - owner_id: Union[Optional[SequenceId], Optional[ScenarioId], Optional[CycleId]] - if scope == Scope.SCENARIO: - owner_id = scenario_id - elif scope == Scope.CYCLE: - owner_id = cycle_id - else: - owner_id = None + owner_id = cls._get_owner_id(scope, cycle_id, scenario_id) tasks_configs_and_owner_id.append((task_config, owner_id)) @@ -234,6 +239,7 @@ def _clone(cls, task: Task, cycle_id: Optional[CycleId] = None, scenario_id: Opt outputs = [data_manager._clone(o, cycle_id, scenario_id) for o in task.output.values()] task.id = task._new_id(task.config_id) task._parent_ids = set() + task._owner_id = cls._get_owner_id(task.scope, cycle_id, scenario_id) for dn in set(inputs + outputs): dn._parent_ids.update([task.id]) cls._set(task)