Skip to content

Commit

Permalink
Merge pull request #101 from Subaru-PFS/u/monodera/errmsg_transfer-20…
Browse files Browse the repository at this point in the history
…240912

Add status check and output for transfer cli command
  • Loading branch information
monodera authored Sep 13, 2024
2 parents 22742b7 + 750830f commit a032818
Showing 1 changed file with 58 additions and 4 deletions.
62 changes: 58 additions & 4 deletions src/targetdb/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime
from pathlib import Path

import numpy as np
import pandas as pd
from astropy.table import Table
from loguru import logger
Expand Down Expand Up @@ -1175,10 +1176,14 @@ def parse_allocation_file(input_file, output_dir=Path("."), outfile_prefix=None)


def transfer_data_from_uploader(df, config, local_dir=Path("."), force=False):
status = []
n_transfer = []
for upload_id in df["upload_id"]:
# datadirs = glob.glob(local_dir / f"????????-??????-{upload_id}")
datadirs = list(local_dir.cwd().glob(f"????????-??????-{upload_id}"))
# datadirs = list(local_dir.cwd().glob(f"????????-??????-{upload_id}"))
datadirs = list(local_dir.glob(f"????????-??????-{upload_id}"))
skip_transfer = False
# logger.info(f"{local_dir=} {local_dir.cwd()} {len(datadirs)=}")
if len(datadirs) == 1:
skip_transfer = True if not force else False
if skip_transfer:
Expand All @@ -1190,8 +1195,12 @@ def transfer_data_from_uploader(df, config, local_dir=Path("."), force=False):
f"Data directory, {datadirs[0]}, is found locally, but force transfer"
)
elif len(datadirs) > 1:
logger.error(f"Multiple data directories are found locally: {datadirs}")
raise ValueError("Multiple data directories are found locally")
logger.error(
f"Multiple data directories are found in the destination directory: {datadirs}."
)
raise ValueError(
f"Multiple data directories are found in the destination directory for {upload_id}: {datadirs}"
)
else:
logger.info(
f"Data directory for upload_id: {upload_id} is not found locally. Try transfer"
Expand Down Expand Up @@ -1243,10 +1252,55 @@ def transfer_data_from_uploader(df, config, local_dir=Path("."), force=False):

# Execute the rsync command
try:
subprocess.run(rsync_command, shell=use_shell, check=True)
proc = subprocess.run(
rsync_command,
shell=use_shell,
check=True,
stdout=subprocess.PIPE,
encoding="utf-8",
)
str_uploaded_dirs = [
line
for line in proc.stdout.splitlines()
if upload_id in line
if line.endswith("/")
]
logger.info(f"Transferred directories: {str_uploaded_dirs}")

n_dirs = len(str_uploaded_dirs)
n_transfer.append(n_dirs)

if n_dirs == 1:
status.append("success")
else:
status.append("WARNING")
except subprocess.CalledProcessError as e:
logger.error(f"Failed to transfer data for upload_id: {upload_id}")
logger.error(e)
# raise Exception(f"Failed to transfer data for upload_id: {upload_id}")
# raise
n_transfer.append(0)
status.append("FAILED")
else:
status.append("skipped")
n_transfer.append(0)

custom_status_dict = {"success": 0, "WARNING": 1, "FAILED": 3}
df_status = pd.DataFrame(
{"upload_id": df["upload_id"], "status": status, "n_transfer": n_transfer}
)
df_status.sort_values(by=["status"], key=lambda x: x.map(custom_status_dict))
df_status_out = df_status.sort_values(
by=["status"], key=lambda x: x.map(custom_status_dict)
)
logger.info(f"Transfer status: \n{df_status_out.to_string(index=False)}")

if np.all(df_status["status"] == "success"):
logger.info("All data transfer is successful.")
else:
logger.error(
"There are some issues with data transfer. Please check the status."
)


def insert_targets_from_uploader(
Expand Down

0 comments on commit a032818

Please sign in to comment.