diff --git a/granddb/config.ini.example b/granddb/config.ini.example index 580e5b9d..c1a3fedb 100644 --- a/granddb/config.ini.example +++ b/granddb/config.ini.example @@ -21,8 +21,9 @@ localdir = ["/home/fleg/incoming/"] ; If credentials are required to access the repository, they should be given in the [credential] section using the same name ; repository CCIN2P3 is already defined in the database (so it's not necessary to define it here), but credentials for it have ; to be supplied in the [credentials] section below +; THIS DEFINITIONS OVERRIDE THE ONES FROM THE DATABASE [repositories] -CCIN2P3 = ["ssh","cca.in2p3.fr",22,["/sps/grand/data/nancay/GRANDfiles"]] +CC = ["ssh","cca.in2p3.fr",22,["/sps/grand/data/nancay/GRANDfiles"]] ; Credentials for repositories given as : @@ -35,6 +36,7 @@ CCIN2P3 = ["ssh","cca.in2p3.fr",22,["/sps/grand/data/nancay/GRANDfiles"]] ; To run an ssh-agent just do : `eval $(ssh-agent)` and `ssh-add .ssh/id_rsa` [credentials] CCIN2P3 = ["john",""] +CC = ["jim",""] SSHTUNNEL = ["joe",""] ; database to use (only one database can be defined) diff --git a/granddb/granddatalib.py b/granddb/granddatalib.py index 4cfde33e..83cd2c7f 100644 --- a/granddb/granddatalib.py +++ b/granddb/granddatalib.py @@ -128,6 +128,7 @@ def __init__(self, file=os.path.join(os.path.dirname(__file__), 'config.ini')): self._repositories[repo["repository"]] = ds # Add remote repositories + # TODO: If repo exists from database just add path ? if configur.has_section('repositories'): for name in configur['repositories']: repo = json.loads(configur.get('repositories', name)) @@ -181,10 +182,16 @@ def referer(self): def SearchFileInDB(self, filename): return self.database().SearchFile(filename) + def exists(self, file, repository=None, path=None): + if self.get(file, repository, path, grab=False) is None: + return False + else: + return True + ## Get a file from the repositories. # If repo or path given, then directly search there. # If not, search first in localdirs and then in remote repositories. First match is returned. - def get(self, file, repository=None, path=None): + def get(self, file, repository=None, path=None, grab=True): res = None # Check if file is a simple name or full path name if (os.path.dirname(file) != ""): @@ -198,17 +205,41 @@ def get(self, file, repository=None, path=None): rep = self.getrepo(repository) if not (rep is None): logger.debug(f"search in repository {rep.name()} {path}") - res = rep.get(file, path) + res = rep.get(file, path, grab=grab) # if no repo specified, we search everywhere else: for name, rep in self.repositories().items(): logger.debug(f"search in repository {rep.name()} {path}") - res = rep.get(file, path) + res = rep.get(file, path, grab=grab) + logger.debug(f"res is {res}") if not (res is None): break - return res + def get_dataset(self, directory, repository=None, path=None): + res = None + # Check if directory is a simple name or full path name + if (os.path.dirname(directory) != ""): + if (not (path is None) and (path != os.path.dirname(directory))): + logger.warning(f"path given in dataset ({os.path.dirname(directory)}) and in repository path ({path}) are different ! The path {os.path.dirname(directory)} from dataset will be used !") + path = os.path.dirname(os.path.normpath(directory)) + directory = os.path.basename(os.path.normpath(directory)) + + # if repository is given we get directory directly from this repo + if not (repository is None): + rep = self.getrepo(repository) + if not (rep is None): + logger.debug(f"Search in repository {rep.name()} {path}") + res = rep.get_dataset(directory, path) + # if no repo specified, we search everywhere + else: + for name, rep in self.repositories().items(): + logger.debug(f"Search in repository {rep.name()} {path}") + res = rep.get_dataset(directory, path) + logger.debug(f"res is {res}") + if not (res is None): + break + return res def copy_to_incoming(self, pathfile): newname = self.incoming() + uniquename(pathfile) @@ -228,24 +259,98 @@ def getrepo(self, repo): break return res + ##Function to register a dataset (i.e directory) into the database. + def register_dataset(self, directory, repository=None, targetdir = None): + import grand.dataio.root_trees + if repository is None: + repository = self.referer() + else: + repository = self.getrepo(repository) - ##Function to register a file into the database. Returns the path to the file in the repository where the file was registered. - def register_file(self,filename, repository=None, path=None): + if targetdir is None: + targetdir = directory + + if repository is not None: + # For registering the full path of the dataset must be provided + path = os.path.dirname(directory) + if len(path) == 0: + logger.error(f"For registering, dataset ({directory}) must be a full path ") + else: + # And the dir must be already present in the target repository + # If so, we need to get it locally and use this local copy (to be able to read the files) + #localdir = self.get_dataset(directory, repository.name()) + localdir = self.get_dataset(directory) + #TODO: Check that target dir exists + if localdir is not None: + Tdir = grand.dataio.root_trees.DataDirectory(localdir) + for f in Tdir.get_list_of_files(): + self.register_file(localfile=f, dataset=Tdir.dir_name, repository=repository.name(), targetdir=targetdir) + else: + logger.error(f"Dataset {directory} was not found in repository {repository.name()} thus cannot be registered") + else: + logger.error(f"No repository found to register file {file}") + return directory + + ##Function to register a file into the database. + # The file MUST be present in the target repository and the full path must be given. + # Returns the path to the file in the repository where the file was registered. + def register_file(self, localfile, dataset=None, repository=None, targetdir=None): newfilename = None - file = self.get(filename,repository,path) - if file is not None: - # If filename in referer repository then keep it - #print(os.path.basename(filename)+" "+self.referer().name()+" "+os.path.dirname(filename)) - newfilename = self.get(os.path.basename(filename),self.referer().name()) - - if newfilename is None: - newfilename = self.referer().copy(file) + if targetdir is None or os.path.dirname(targetdir) == os.path.dirname(localfile): + targetdir = localfile + else: + # Target file is made of target dir + dataset name + filename + targetdir = targetdir + "/" + os.path.basename(dataset) + "/" + os.path.basename(localfile) + targetdir=os.path.normpath(targetdir) + # If repository not given then use the referer + if repository is None: + repository = self.referer() + else: + repository = self.getrepo(repository) + + if repository is not None: + # For registering the full path of the file must be provided + localpath = os.path.dirname(localfile) + + if len(localpath) == 0: + logger.error(f"For registering, local filename ({localfile}) must be a full path ") else: - newfilename = str(newfilename) + # And the file must be already present in the target repository and in the local directory + fileexists = self.get(targetdir, repository.name(), grab=False) + #TODO: Check file exists in + if fileexists : + #if fileexists is not None: + newfilename = localfile + self.database().register_file(localfile, newfilename, dataset, repository.id_repository, self.provider(), targetdir=targetdir) + else: + logger.error(f"File {targetdir} was not found in repository {repository.name()} thus cannot be registered") + newfilename = None + else: + logger.error(f"No repository found to register file {localfile}") + return newfilename - #print("newfilename = "+str(newfilename)) + def old_register_file(self,filename, dataset=None, repository=None, path=None): + newfilename = None - self.database().register_file(file, newfilename, self.referer().id_repository, self.provider()) + # For registering the full path of the file must be provided + path = os.path.dirname(filename) + if len(path) == 0: + logger.error(f"For registering, filename ({filename}) must be a full path ") + else: + file = self.get(filename,repository,path) + #file = filename + if file is not None: + # If filename in referer repository then keep it + newfilename = self.get(filename,self.referer().name(),path) + + if newfilename is None: + newfilename = self.referer().copy(file) + else: + newfilename = str(newfilename) + + #print("newfilename = "+str(newfilename)) + + self.database().register_file(file, newfilename, dataset, self.referer().id_repository, self.provider()) return newfilename @@ -365,6 +470,10 @@ def get(self, file, path=None): logger.warning(f"get method for protocol {self.protocol()} not implemented for repository {self.name()}") return None + def get_dataset(self, file, path=None): + logger.warning(f"get_dataset method for protocol {self.protocol()} not implemented for repository {self.name()}") + return None + def copy(self, pathfile): logger.warning(f"copy method for protocol {self.protocol()} not implemented for repository {self.name()}") return None @@ -376,7 +485,7 @@ def copy(self, pathfile): # @date Sept 2022 class DatasourceLocal(Datasource): ## Search for file in local directories and return the path to the first corresponding file found. - def get(self, file, path=None): + def get(self, file, path=None, grab=True): # TODO : Check that path is in self.paths(), if not then copy in incoming ? found_file = None # Path is given : we only search in that path @@ -393,7 +502,7 @@ def get(self, file, path=None): break if my_file is None: - logger.debug(f"file {file} not found in localdir {path}") + logger.debug(f"File {file} not found in localdir {path}") #my_file = Path(path + file) #if my_file.is_file(): @@ -403,7 +512,7 @@ def get(self, file, path=None): else: # No path given : we recursively search in all dirs and subdirs for path in self.paths(): - logger.debug(f"search in localdir {path} for file {file}") + logger.debug(f"Search in localdir {path} for file {file}") #my_file = Path(path + file) my_file = None @@ -418,19 +527,68 @@ def get(self, file, path=None): # found_file = path + file # break else: - logger.debug(f"file {file} not found in localdir {path}") + logger.debug(f"File {file} not found in localdir {path}") + + if not found_file is None: + logger.debug(f"File found in localdir {found_file}") + + return found_file + #return str(found_file) + + def get_dataset(self, file, path=None): + # TODO : Check that path is in self.paths(), if not then copy in incoming ? + found_file = None + # Path is given : we only search in that path + if not (path is None): + my_path = Path(path) + if not my_path.exists(): + logger.warning(f"path {path} not found (seems not exists) ! Check that it is mounted if you run in docker !") + + my_file = None + print(f'path {path} file {file} - {(Path(path))}') + liste = list(Path(path).rglob(file)) + print(f'list {liste}') + for my_file in liste: + if my_file.is_dir(): + found_file = my_file + break + + if my_file is None: + logger.debug(f"Dataset {file} not found in localdir {path}") + else: + # No path given : we recursively search in all dirs and subdirs + for path in self.paths(): + logger.debug(f"search in localdir {path} for dataset {file}") + + #my_file = Path(path + file) + my_file = None + liste = list(Path(path).rglob(file)) + for my_file in liste: + if my_file.is_dir(): + found_file = my_file + break + if not my_file is None and my_file.is_dir(): + break + else: + logger.debug(f"dataset {file} not found in localdir {path}") if not found_file is None: - logger.debug(f"file found in localdir {found_file}") + logger.debug(f"Dataset found in localdir {found_file}") return str(found_file) - def copy(self, pathfile): - newname = self.incoming() + uniquename(pathfile) - if os.path.join(os.path.dirname(pathfile), "") == self.incoming(): - os.rename(pathfile, newname) + def copy(self, pathfile, destfile = None): + if destfile is None: + newname = self.incoming() + uniquename(pathfile) + if os.path.join(os.path.dirname(pathfile), "") == self.incoming(): + os.rename(pathfile, newname) + else: + shutil.copy2(pathfile, newname) else: - shutil.copy2(pathfile, newname) + newname = destfile + if pathfile != newname: + shutil.copy2(pathfile, newname) + return newname @@ -469,20 +627,20 @@ def set_client(self, recurse=True): client = None return client - def get(self, file, path=None): + def get(self, file, path=None, grab=True): import getpass localfile = None client = self.set_client() if not(client is None): if not (path is None): logger.debug(f"search {file} in {path} @ {self.name()}") - localfile = self.get_file(client, path, file) + localfile = self.get_file(client, path, file, grab=grab) if (localfile is None): logger.debug(f"file {file} not found in {path} @ {self.name()}") else: for path in self.paths(): logger.debug(f"search {file} in {path}@ {self.name()}") - localfile = self.get_file(client, path, file) + localfile = self.get_file(client, path, file,grab=grab) if not (localfile is None): break else: @@ -493,11 +651,35 @@ def get(self, file, path=None): return localfile + + def get_dataset(self, file, path=None): + import getpass + localfile = None + client = self.set_client() + if not(client is None): + if not (path is None): + logger.debug(f"search {file} in {path} @ {self.name()}") + localfile = self.get_dir(client, path, file) + if (localfile is None): + logger.debug(f"Dataset {file} not found in {path} @ {self.name()}") + else: + for path in self.paths(): + logger.debug(f"search {file} in {path}@ {self.name()}") + localfile = self.get_dir(client, path, file) + if not (localfile is None): + break + else: + logger.debug(f"Dataset {file} not found in {path} @ {self.name()}") + else: + logger.debug(f"Search in repository {self.name()} is skipped") + + return localfile + ## Search for files in remote location accessed through ssh. # If file is found, it will be copied in the incoming local directory and the path to the local file is returned. # If file is not found, then None is returned. - def get_file(self, client, path, file): + def get_file(self, client, path, file, grab=True): localfile = None #stdin, stdout, stderr = client.exec_command('ls ' + path + file) #lines = list(map(lambda s: s.strip(), stdout.readlines())) @@ -508,20 +690,54 @@ def get_file(self, client, path, file): #if len(lines) == 1: logger.debug(f"file found in repository {self.name()} @ " + lines[0].strip('\n')) logger.debug(f"copy to {self.incoming()}{file}") - scpp = scp.SCPClient(client.get_transport()) - scpp.get(lines[0].strip('\n'), self.incoming() + file) - localfile = self.incoming() + file + if grab: + scpp = scp.SCPClient(client.get_transport()) + scpp.get(lines[0].strip('\n'), self.incoming() + file) + localfile = self.incoming() + file + else: + localfile = file return localfile - def copy(self, pathfile): - newname = self.incoming() + uniquename(pathfile) + def get_dir(self, client, path, dataset): + localfile = None + # Search directory on remote server + stdin, stdout, stderr = client.exec_command('find ' + path + " -type d -name " + dataset) + lines = sorted(list(map(lambda s: s.strip(), stdout.readlines())), key=len) + if len(lines) >= 1: + logger.debug(f"directory found in repository {self.name()} @ " + lines[0].strip('\n')) + # Create local directory if needed + if not os.path.exists(self.incoming() + dataset): + logger.debug(f"create local dir {self.incoming()}/{dataset}") + os.mkdir(self.incoming() + dataset) + # Search all files in dataset on remote server + stdin, stdout, stderr = client.exec_command('find ' + path + "/" + dataset + " -type f") + files = sorted(list(map(lambda s: s.strip(), stdout.readlines())), key=len) + scpp = scp.SCPClient(client.get_transport(), sanitize=lambda x: x) + # Get all files if not already present + for file in files: + filename = os.path.basename(file) + if not os.path.exists((self.incoming() + dataset + "/" + filename)): + logger.debug(f"copy {filename} to {self.incoming()}{dataset}") + scpp.get(file, self.incoming() + dataset ) + else: + logger.debug(f"File {filename} already exists in {self.incoming()}{dataset}") + #scpp.get(lines[0].strip('\n') + '/*', self.incoming() + file ) + localfile = self.incoming() + dataset + return localfile + + def copy(self, pathfile, destfile = None): + if destfile is None: + newname = self.incoming() + uniquename(pathfile) + else: + newname = destfile client = self.set_client() # search if original file exists remotely stdin, stdout, stderr = client.exec_command('ls ' + self.incoming() + os.path.basename(pathfile)) lines = list(map(lambda s: s.strip(), stdout.readlines())) if len(lines) == 1: - # original file exists... we rename it. + #original file exists... we rename it. client.exec_command('mv ' + self.incoming() + os.path.basename(pathfile) + ' ' + newname) + else: # search if dest files already there stdin, stdout, stderr = client.exec_command('ls ' + newname) @@ -542,28 +758,29 @@ class DatasourceHttp(Datasource): # If file is found, it will be copied in the incoming local directory and the path to the local file is returned. # If file is not found, then None is returned. # TODO: implement authentification - def get(self, file, path=None): + def get(self, file, path=None, grab=True): localfile = None if not (path is None): url = self._protocol + '://' + self.server() + '/' + path + '/' + file - localfile = self.get_file(url, file) + localfile = self.get_file(url, file,grab) else: for path in self.paths(): url = self._protocol + '://' + self.server() + '/' + path + '/' + file - localfile = self.get_file(url, file) + localfile = self.get_file(url, file, grab) if not (localfile is None): break return localfile - def get_file(self, url, file): + def get_file(self, url, file, grab=True): #import socket localfile = None try: #socket.setdefaulttimeout(10) + #TODO check grab and test url urllib.request.urlretrieve(url, self.incoming() + file) logger.debug(f"file found in repository {url}") localfile = self.incoming() + file diff --git a/granddb/granddblib.py b/granddb/granddblib.py index 45fb1050..3f0bfd29 100644 --- a/granddb/granddblib.py +++ b/granddb/granddblib.py @@ -22,22 +22,33 @@ def casttodb(value): + #print(f'{type(value)} - {value}') if isinstance(value, numpy.uint32): - value = int(value) - if isinstance(value, numpy.float32): - value = float(value) - if isinstance(value, numpy.ndarray): + val = int(value) + elif isinstance(value, numpy.float32): + val = float(value) + elif isinstance(value, numpy.ndarray): if value.size == 0: - value = None + val = None elif value.size == 1: - value = value.item() + val = value.item() else: - value = value.tolist() - if isinstance(value, grand.dataio.root_trees.StdVectorList): - value = [i for i in value] - if isinstance(value, str): - value = value.strip().strip('\t').strip('\n') - return value + val = value.tolist() + elif isinstance(value, grand.dataio.root_trees.StdVectorList): + val =[] + #postgres cannot store arrays of arrays... so we split (not sure if really correct)! + for i in value: + if isinstance(i,numpy.ndarray) or isinstance(i, grand.dataio.root_trees.StdVectorList): + val.append(casttodb(i)) + else: + val.append(i) + + #value = [i for i in value] + elif isinstance(value, str): + val = value.strip().strip('\t').strip('\n') + else: + val = value + return val ## @brief Class to handle the Grand database. @@ -94,7 +105,8 @@ def __init__(self, host, port, dbname, user, passwd, sshserv="", sshport=22, cre Base = automap_base() Base.prepare(engine, reflect=True) - self.sqlalchemysession = Session(engine) + self.sqlalchemysession = Session(engine,autoflush=False) + #self.sqlalchemysession.no_autoflush = True inspection = inspect(engine) for table in inspection.get_table_names(): # for table in engine.table_names(): #this is obsolete @@ -306,21 +318,37 @@ def register_repository(self, name, protocol, port, server, path, description="" # Returns the id_file for the file and a boolean True if the file was not previously in the DB (i.e it's a new file) # and false if the file was already registered. This is usefull to know if the metadata of the file needs to be read # or not - def register_filename(self, filename, newfilename, id_repository, provider): + def register_filename(self, filename, newfilename, dataset, id_repository, provider, targetfile=None): import os register_file = False isnewfile = False idfile = None + id_dataset = None + if targetfile is None: + targetfile = newfilename + if dataset is not None: + id_dataset = self.get_or_create_key('dataset', 'dataset_name', os.path.basename(dataset)) + filt = {} + filt['id_dataset'] = str(casttodb(id_dataset)) + filt['id_repository'] = str(casttodb(id_repository)) + ret = self.sqlalchemysession.query(getattr(self._tables['dataset_location'], 'id_dataset')).filter_by( + **filt).all() + if len(ret) == 0: + container = self.tables()['dataset_location'](id_dataset=id_dataset, id_repository=id_repository, path=dataset, + description="") + self.sqlalchemysession.add(container) + self.sqlalchemysession.flush() + ## Check if file not already registered IN THIS REPO : IF YES, ABORT, IF NO REGISTER + #First see if file is registered elsewhere file_exist = self.sqlalchemysession.query(self.tables()['file']).filter_by( - filename=os.path.basename(newfilename)).first() + filename=os.path.basename(targetfile),id_dataset=id_dataset).first() if file_exist is not None: - # file_exist_here = self.sqlalchemysession.query(self.tables()['file_location']).filter_by( - # id_repository=id_repository).first() + #File exists somewhere... see if in the repository we want file_exist_here = self.sqlalchemysession.query(self.tables()['file_location']).filter_by( - id_repository=id_repository).first() + id_repository=id_repository, id_file=file_exist.id_file).first() if file_exist_here is None: - # file exists in different repo. We only need to register it in the current repo + # file exists but in a different repo. We only need to register it in the current repo register_file = True idfile = file_exist.id_file else: @@ -332,11 +360,11 @@ def register_filename(self, filename, newfilename, id_repository, provider): if register_file: id_provider = self.get_or_create_key('provider', 'provider', provider) if isnewfile: - # rfile = ROOT.TFile(str(filename)) rfile = rdb.RootFile(str(filename)) - rfile.dataset_name() + #rfile.dataset_name() # rfile.file().GetSize() - container = self.tables()['file'](filename=os.path.basename(newfilename), + container = self.tables()['file'](id_dataset=id_dataset, + filename=os.path.basename(targetfile), description='autodesc', original_name=os.path.basename(filename), id_provider=id_provider, @@ -346,9 +374,10 @@ def register_filename(self, filename, newfilename, id_repository, provider): self.sqlalchemysession.flush() idfile = container.id_file # container = self.tables()['file_location'](id_file=idfile, id_repository=id_repository, path=os.path.dirname(newfilename)) - container = self.tables()['file_location'](id_file=idfile, id_repository=id_repository, path=newfilename, + container = self.tables()['file_location'](id_file=idfile, id_repository=id_repository, path=targetfile, description="") self.sqlalchemysession.add(container) + logger.debug(f"File name {filename} registered") # self.sqlalchemysession.flush() return idfile, isnewfile @@ -485,8 +514,8 @@ def register_filecontent(self, file, idfile): # print('Execution time:', elapsed_time, 'seconds') logger.debug(f"execution time {elapsed_time} seconds") - def register_file(self, orgfilename, newfilename, id_repository, provider): - idfile, read_file = self.register_filename(orgfilename, newfilename, id_repository, provider) + def register_file(self, orgfilename, newfilename, dataset, id_repository, provider, targetdir=None): + idfile, read_file = self.register_filename(orgfilename, newfilename, dataset, id_repository, provider, targetdir) if read_file: # We read the localfile and not the remote one self.register_filecontent(orgfilename, idfile) @@ -494,3 +523,5 @@ def register_file(self, orgfilename, newfilename, id_repository, provider): else: logger.info(f"file {orgfilename} already registered.") self.sqlalchemysession.commit() + + diff --git a/granddb/rootdblib.py b/granddb/rootdblib.py index fe2da570..65946ded 100644 --- a/granddb/rootdblib.py +++ b/granddb/rootdblib.py @@ -1,6 +1,7 @@ import ROOT import grand.dataio.root_trees as groot import grand.manage_log as mlg +import os logger = mlg.get_logger_for_script(__name__) #mlg.create_output_for_logger("debug", log_stdout=True) @@ -136,21 +137,21 @@ class RootFile: 'hadronic_model': 'id_hadronic_model', 'low_energy_model': 'id_low_energy_model', 'cpu_time': 'cpu_time', - # 'long_pd_depth': 'long_pd_depth', - # 'long_pd_eminus': 'long_pd_eminus', - # 'long_pd_eplus': 'long_pd_eplus', - # 'long_pd_muminus': 'long_pd_muminus', - # 'long_pd_muplus': 'long_pd_muplus', - # 'long_pd_gamma': 'long_pd_gamma', - # 'long_pd_hadron': 'long_pd_hadron', - # 'long_gamma_elow': 'long_gamma_elow', - # 'long_e_elow': 'long_e_elow', - # 'long_e_edep': 'long_e_edep', - # 'long_mu_edep': 'long_mu_edep', - # 'long_mu_elow': 'long_mu_elow', - # 'long_hadron_edep': 'long_hadron_edep', - # 'long_hadron_elow': 'long_hadron_elow', - # 'long_neutrino': 'long_neutrino', + #'long_pd_depth': 'long_pd_depth', + #'long_pd_eminus': 'long_pd_eminus', + #'long_pd_eplus': 'long_pd_eplus', + #'long_pd_muminus': 'long_pd_muminus', + #'long_pd_muplus': 'long_pd_muplus', + #'long_pd_gamma': 'long_pd_gamma', + #'long_pd_hadron': 'long_pd_hadron', + #'long_gamma_elow': 'long_gamma_elow', + #'long_e_elow': 'long_e_elow', + #'long_e_edep': 'long_e_edep', + #'long_mu_edep': 'long_mu_edep', + #'long_mu_elow': 'long_mu_elow', + #'long_hadron_edep': 'long_hadron_edep', + #'long_hadron_elow': 'long_hadron_elow', + #'long_neutrino': 'long_neutrino', 'event_weight': 'event_weight' } tadcToDB = { @@ -307,9 +308,11 @@ class RootFile: #TreeList is a dict with name of the trees as key and the class corresponding to it's type as value TreeList = {} file = None + filename = None ## We retreive the list of Ttrees in the file and store them as the corresponding class from root_files.py in the dict TreeList def __init__(self, f_name): + self.filename = f_name self.TreeList.clear() self.file = ROOT.TFile(f_name) for key in self.file.GetListOfKeys(): @@ -341,6 +344,8 @@ def copy_content_to(self, file): # [extra]-> given by user (metadata ?) # serial -> automatically incremented in case of new version (how to do that ?) def dataset_name(self): + name = os.path.basename(os.path.dirname(self.filename)) + return name treename = 'trun' name = "noname" for run in self.TreeList[treename].get_list_of_runs(): diff --git a/scripts/archiving/config.properties.gaa b/scripts/archiving/config.properties.gaa new file mode 100644 index 00000000..8e58b386 --- /dev/null +++ b/scripts/archiving/config.properties.gaa @@ -0,0 +1,4 @@ +aipTempDirectory=/sps/grand/prod_grand/tests/archivage/archs/gaa/ +configMetadataDescriptiveDC=dc_gaa.xml +configDocumentation=GRAND_DMP_2024.pdf +representationID_1=representation1 \ No newline at end of file diff --git a/scripts/archiving/config.properties.gp13 b/scripts/archiving/config.properties.gp13 new file mode 100644 index 00000000..fa8ce9b0 --- /dev/null +++ b/scripts/archiving/config.properties.gp13 @@ -0,0 +1,4 @@ +aipTempDirectory=/sps/grand/prod_grand/tests/archivage/archs/gp13/ +configMetadataDescriptiveDC=dc_gp13.xml +configDocumentation=GRAND_DMP_2024.pdf +representationID_1=representation1 diff --git a/scripts/archiving/create_archive.bash b/scripts/archiving/create_archive.bash new file mode 100644 index 00000000..c93526ef --- /dev/null +++ b/scripts/archiving/create_archive.bash @@ -0,0 +1,76 @@ +#!/bin/bash +datadir="/sps/grand/data" +archive_root_name="doi+10.25520+in2p3.archive.grand" +irods_path='/grand/home/trirods/data/archives/' + +usage="$(basename "$0") [-d DATE] [-s SITE] [ +Archive some Grand raw files into irods : + -s site (gaa, gp13) + -d YYYY-MM to be archived + " + +while getopts "d:s:" option ${args}; do + case $option in + d) + if [[ ${OPTARG} =~ ^([0-9]{4})-([0][1-9]|[1][0-2]|[1-9])$ ]]; then + date=$(date --date="${BASH_REMATCH[1]}-${BASH_REMATCH[2]}-01" "+%Y_%m") + dir=$(date --date="${BASH_REMATCH[1]}-${BASH_REMATCH[2]}-01" "+%Y/%m") + else + echo "Date ${OPTARG} should be in format YYYY-MM" + exit 1 + fi + ;; + s) + if [[ ${OPTARG} =~ gp13|gaa ]] ; then + site=${OPTARG} + else + echo "Site should be gp13 or gaa" + exit 1 + fi + ;; + :) + printf "option -${OPTARG} need an argument\n" + exit 1;; + ?) # Invalid option + printf "Error: Invalid option -${OPTARG}\n" + exit 1;; + esac +done + +if [ ! "$date" ] || [ ! "$site" ]; then + echo "arguments -d and -s must be provided" + echo "$usage" >&2; exit 1 +fi + +outfile="${archive_root_name}.${site}.${date}" +logfile=archs/${site}/${outfile}--$(date "+%Y_%m_%d_%H%M%S").log + +find $datadir/$site/raw/$dir/ -name "*.bin" >list_files_${site} +echo "List of files to archive :" >> ${logfile} +cat list_files_${site} >> ${logfile} + +java -jar createAIP.jar --configfile=config.properties.${site} --listobjects=list_files_${site} -i ${outfile} + +echo "Archive ready to tar" >> ${logfile} + +tar -cvf archs/${site}/${outfile}.tar archs/${site}/${outfile} + +echo "Archive tared" >> ${logfile} + +echo "Push archs/${site}/${outfile}.tar to irods" >> ${logfile} +# Put file into irods + sfile=archs/${site}/${outfile}.tar + ipath="${irods_path}${site}/raw" + ifile="${ipath}/${outfile}.tar" + echo "imkdir -p $ipath" >> ${logfile} + imkdir -p $ipath >> ${logfile} 2>&1 + echo "iput -f $sfile $ifile" >> ${logfile} + #iput -f $sfile $ifile >> ${logfile} 2>&1 + #iput_status=$? + #if [ "$iput_status" -ne 0 ]; then + # notify=1 + #fi + +rm -rf archs/${site}/${outfile} +rm $sfile +echo "Month archived.">> ${logfile} diff --git a/scripts/archiving/dc_gaa.xml b/scripts/archiving/dc_gaa.xml new file mode 100644 index 00000000..102adbea --- /dev/null +++ b/scripts/archiving/dc_gaa.xml @@ -0,0 +1,32 @@ + + + + doi+10.25520+in2p3.archive+grand+gaa + + + Grand Raw Files from GAA + + + + Grand Observatory + Grand Observatory + + +Grand Observatory +2024 +Grand raw files + + Grand Raw Data from GAA Observatory + + + radio astronomy cosmics rays neutrinos + + + 2024 + + + + Argentina + + + diff --git a/scripts/archiving/dc_gp13.xml b/scripts/archiving/dc_gp13.xml new file mode 100644 index 00000000..b4470580 --- /dev/null +++ b/scripts/archiving/dc_gp13.xml @@ -0,0 +1,32 @@ + + + + doi+10.25520+in2p3.archive+grand+gp13 + + + Grand Raw Files from GP13 + + + + Grand Observatory + Grand Observatory + + +Grand Observatory +2024 +Grand raw files + + Grand Raw Data from GP13 Observatory + + + radio astronomy cosmics rays neutrinos + + + 2024 + + + + China + + + diff --git a/scripts/transfers/bintoroot.bash b/scripts/transfers/bintoroot.bash index 6f7cae26..edb8b7b0 100644 --- a/scripts/transfers/bintoroot.bash +++ b/scripts/transfers/bintoroot.bash @@ -9,14 +9,16 @@ register_root='/pbs/home/p/prod_grand/softs/grand/granddb/register_file_in_db.py config_file='/pbs/home/p/prod_grand/softs/grand/scripts/transfers/config-prod.ini' sps_path='/sps/grand/' irods_path='/grand/home/trirods/' - +submit_base_name='' # Get tag and database file to use -while getopts ":d:g:" option; do +while getopts ":d:g:n:" option; do case $option in d) root_dest=${OPTARG};; g) gtot_options=${OPTARG};; + n) + submit_base_name=${OPTARG};; :) printf "option -${OPTARG} need an argument\n" exit 1;; @@ -28,52 +30,77 @@ done shift $(($OPTIND - 1)) -cd /pbs/home/p/prod_grand/softs/grand -source /pbs/throng/grand/soft/miniconda3/etc/profile.d/conda.sh -conda activate /sps/grand/software/conda/grandlib_2304 -source env/setup.sh +export PLATFORM=redhat-9-x86_64 +cd /pbs/home/p/prod_grand/softs/grand +source /pbs/throng/grand/soft/miniconda3/etc/profile.d/conda.sh + + +#Export some env to make irods works +export LOADEDMODULES=DataManagement/irods/4.3.1 +export TRIRODS_DATA_DIR=/grand/home/trirods/data +export BASH_ENV=/usr/share/Modules/init/bash +export LD_LIBRARY_PATH=/pbs/throng/grand/soft/lib/:/pbs/software/redhat-9-x86_64/irods/4.3.1/lib:/pbs/software/redhat-9-x86_64/irods/irods-externals/4.3.1/lib +export PATH=/pbs/throng/grand/soft/miniconda3/condabin:/pbs/throng/grand/soft/bin/:/pbs/throng/grand/bin/:/opt/software/rfio-hpss/prod/bin:/usr/share/Modules/bin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/opt/puppetlabs/bin:/opt/ccin2p3/bin:/pbs/software/redhat-9-x86_64/irods/utils:/pbs/software/redhat-9-x86_64/irods/4.3.1/bin:. +export _LMFILES_=/pbs/software/modulefiles/redhat-9-x86_64/DataManagement/irods/4.3.1 +export IRODS_PLUGINS_HOME=/pbs/software/redhat-9-x86_64/irods/4.3.1/lib/plugins +export MODULEPATH=/etc/scl/modulefiles:/pbs/software/modulefiles/redhat-9-x86_64:/etc/modulefiles +conda activate /sps/grand/software/conda/grandlib_2409 +source env/setup.sh cd /pbs/home/p/prod_grand/scripts/transfers notify=0 for file in "$@" do - echo "converting ${file} to GrandRoot" - filename=$(basename $file) - tmp=${filename#*_} - dateobs=${tmp:0:8} - dest="${root_dest}/${dateobs:0:4}/${dateobs:4:2}" - if [ ! -d $dest ];then - mkdir -p $dest >/dev/null 2>&1 - fi - dirlogs=${root_dest}/../logs - logfile=${dirlogs}/bin2root-${filename%.*} - if [ ! -d $dirlogs ];then - mkdir -p $dirlogs >/dev/null 2>&1 - fi - # Convert file - ${gtot_path} ${gtot_options} -i ${file} -o ${dest}/${filename%.*}.root >> ${logfile} - conv_status=$? - if [ "$conv_status" -ne 0 ]; then - notify=1 - fi - echo $conv_status >> ${logfile} - # Put GrandRoot file into irods - sfile=${dest}/${filename%.*}.root - ifile=${sfile/$sps_path/$irods_path} - ipath=${ifile%/*} - echo "imkdir -p $ipath" >> ${logfile} - imkdir -p $ipath >> ${logfile} 2>&1 - echo "iput -f $sfile $ifile" >> ${logfile} - iput -f $sfile $ifile >> ${logfile} 2>&1 - iput_status=$? - if [ "$iput_status" -ne 0 ]; then - notify=1 + if [ -f $file ]; then + echo "converting ${file} to GrandRoot" + filename=$(basename $file) + tmp=${filename#*_} + dateobs=${tmp:0:8} + dest="${root_dest}/${dateobs:0:4}/${dateobs:4:2}" + if [ ! -d $dest ];then + mkdir -p $dest >/dev/null 2>&1 + fi + dirlogs=${root_dest}/../logs + logfile=${dirlogs}/${submit_base_name}-bin2root-${filename%.*}.log + if [ ! -d $dirlogs ];then + mkdir -p $dirlogs >/dev/null 2>&1 + fi + #Determine if file is TR (so no conversion) + tr=$(echo basename ${file} |awk -F_ '{print $5}') + if [ $tr == "TR" ]; then + cp ${file} ${dest}/${filename%.*}.root + conv_status=0 + else + # Convert file + ${gtot_path} ${gtot_options} -i ${file} -o ${dest}/${filename%.*}.root >> ${logfile} + conv_status=$? + fi + if [ "$conv_status" -ne 0 ]; then + notify=1 + fi + echo $conv_status >> ${logfile} + # Put GrandRoot file into irods + sfile=${dest}/${filename%.*}.root + ifile=${sfile/$sps_path/$irods_path} + ipath=${ifile%/*} + echo "imkdir -p $ipath" >> ${logfile} + imkdir -p $ipath >> ${logfile} 2>&1 + echo "iput -f $sfile $ifile" >> ${logfile} + iput -f $sfile $ifile >> ${logfile} 2>&1 + iput_status=$? + if [ "$iput_status" -ne 0 ]; then + notify=1 + fi + # Register conversion result into the database + echo "Register convertion" >> ${logfile} + python3 ${register_convertion} -i ${filename} -o ${filename%.*}.root -s ${conv_status} -l ${logfile} >> ${logfile} 2>&1 + # Register root file into db + if [ $tr != "TR" ]; then + echo "register file in database" >> ${logfile} + python3 ${register_root} -c ${config_file} -r "CCIN2P3" ${dest}/${filename%.*}.root >> ${logfile} 2>&1 + fi fi - # Register conversion result into the database - python3 ${register_convertion} -i ${filename} -o ${filename%.*}.root -s ${conv_status} -l ${logfile} - # Register root file into db - python3 ${register_root} -c ${config_file} -r "CCIN2P3" ${dest}/${filename%.*}.root done if [ "$notify" -ne "0" ]; then diff --git a/scripts/transfers/ccscript.bash b/scripts/transfers/ccscript.bash old mode 100755 new mode 100644 index a5698029..e8e62b28 --- a/scripts/transfers/ccscript.bash +++ b/scripts/transfers/ccscript.bash @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/bash -l # Script triggered after transfering data from a GRAND observatory to CCIN2P3 (or to any site) # It will launch the jobs to convert binary files into GrandRoot and register the results of the transfers and convertions into the database # Fleg & Fred: 03/2024 @@ -9,6 +9,7 @@ bin2root='/pbs/home/p/prod_grand/scripts/transfers/bintoroot.bash' register_transfers='/pbs/home/p/prod_grand/scripts/transfers/register_transfer.bash' refresh_mat_script='/pbs/home/p/prod_grand/scripts/transfers/refresh_mat_views.bash' update_web_script='/sps/grand/prod_grand/monitoring_page/launch_webmonitoring_update.bash' +tar_logs_script='/pbs/home/p/prod_grand/scripts/transfers/tar_logs.bash' # gtot options for convertion -g1 for gp13 -f2 for gaa gtot_option="-g1" @@ -18,22 +19,10 @@ nbfiles=3 #time required to run bin2root on one file bin2rootduration=15 -# Notification options +# Notification options q mail_user='fleg@lpnhe.in2p3.fr' mail_type='FAIL,TIME_LIMIT,INVALID_DEPEND' -#Export some env to make irods works -export LD_LIBRARY_PATH=/pbs/throng/grand/soft/lib/:/pbs/software/centos-7-x86_64/oracle/12.2.0/instantclient/lib::/pbs/software/centos-7-x86_64/irods/4.3.1/lib:/pbs/software/centos-7-x86_64/irods/irods-externals/4.3.1/lib -export PATH=/pbs/throng/grand/soft/miniconda3/condabin:/pbs/throng/grand/soft/bin/:/pbs/throng/grand/bin/:/opt/bin:/opt/software/rfio-hpss/prod/bin:/pbs/software/centos-7-x86_64/oracle/12.2.0/instantclient/bin:/pbs/software/centos-7-x86_64/fs4/prod/bin:/usr/lib64/qt-3.3/bin:/usr/share/Modules/bin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/opt/puppetlabs/bin:/pbs/software/centos-7-x86_64/suptools/prod/bin:/opt/ccin2p3/bin:/pbs/software/centos-7-x86_64/irods/utils:/pbs/software/centos-7-x86_64/irods/4.3.1/bin:. -export _LMFILES_=/pbs/software/centos-7-x86_64/modules/modulefiles/DataManagement/irods/4.3.1 -export IRODS_PLUGINS_HOME=/pbs/software/centos-7-x86_64/irods/4.3.1/lib/plugins -export MODULEPATH=/pbs/software/centos-7-x86_64/modules/modulefiles:/etc/modulefiles -export LOADEDMODULES=DataManagement/irods/4.3.1 -export __MODULES_SHARE_PATH=/pbs/software/centos-7-x86_64/irods/utils:2:/pbs/software/centos-7-x86_64/irods/4.3.1/bin:2 -export TRIRODS_DATA_DIR=/grand/home/trirods/data -export BASH_ENV=/usr/share/Modules/init/bash - - # manage call from remote restricted ssh command (extract opt parameters) # default args fullscriptpath=${BASH_SOURCE[0]} @@ -75,6 +64,7 @@ case $site in gtot_option="-g1";; esac +export PLATFORM=redhat-9-x86_64 #test dbfile exists and tag is set if [ -z "$tag" ] || [ -z "$db" ];then @@ -88,7 +78,7 @@ fi # Determine root_dir from database path root_dest=${db%/logs*}/GrandRoot/ submit_dir=$(dirname "${db}") -submit_base_name=s${tag} +submit_base_name=${site}_${tag} crap_dir=${db%/logs*}/raw/crap if [ ! -d $root_dest ];then @@ -104,7 +94,7 @@ fi outfile="${submit_dir}/${submit_base_name}-register-transfer.bash" echo "#!/bin/bash" > $outfile echo "$register_transfers -d $db -t $tag" >> $outfile -jregid=$(sbatch -t 0-00:10 -n 1 -J ${submit_base_name}-register-transfer -o ${submit_dir}/slurm-${submit_base_name}-register-transfer --mem 1G ${outfile} --mail-user=${mail_user} --mail-type=${mail_type}) +jregid=$(sbatch -t 0-00:10 -n 1 -J ${submit_base_name}-register-transfer -o ${submit_dir}/${submit_base_name}-register-transfer.log --mem 1G --constraint el9 --mail-user=${mail_user} --mail-type=${mail_type} ${outfile} ) jregid=$(echo $jregid |awk '{print $NF}') # List files to be converted and group them by bunchs of nbfiles @@ -136,10 +126,10 @@ do outfile="${submit_dir}/${submit_base_name}-${j}.bash" logfile="${submit_dir}/${submit_base_name}-${j}.log" echo "#!/bin/bash" > $outfile - echo "$bin2root -g '$gtot_option' -d $root_dest ${listoffiles[$j]}" >> $outfile + echo "$bin2root -g '$gtot_option' -n $submit_base_name -d $root_dest ${listoffiles[$j]}" >> $outfile #submit script echo "submit $outfile" - jid=$(sbatch --dependency=afterany:${jregid} -t 0-${jobtime} -n 1 -J ${submit_base_name}-${j} -o ${submit_dir}/slurm-${submit_base_name}-${j} --mem 2G ${outfile} --mail-user=${mail_user} --mail-type=${mail_type}) + jid=$(sbatch --dependency=afterany:${jregid} -t 0-${jobtime} -n 1 -J ${submit_base_name}-${j} -o ${submit_dir}/${submit_base_name}-${j}.log --mem 2G --constraint el9 --mail-user=${mail_user} --mail-type=${mail_type} ${outfile} ) jid=$(echo $jid |awk '{print $NF}') convjobs=$convjobs":"$jid done @@ -149,7 +139,8 @@ if [ "$convjobs" = "" ]; then else dep="--dependency=afterany${convjobs}" #finally refresh the materialized views in the database and the update of monitoring - sbatch ${dep} -t 0-00:10 -n 1 -J refresh_mat -o ${submit_dir}/slurm-refresh_mat --mem 1G ${refresh_mat_script} --mail-user=${mail_user} --mail-type=${mail_type} - sbatch ${dep} -t 0-01:00 -n 1 -J update_webmonitoring -o ${submit_dir}/slurm-update_webmonitoring --mem 12G ${update_web_script} -mail-user=${mail_user} --mail-type=${mail_type} + sbatch ${dep} -t 0-00:10 -n 1 -J refresh_mat_${tag} -o ${submit_dir}/refresh_mat_${tag}.log --mem 1G --constraint el9 --mail-user=${mail_user} --mail-type=${mail_type} ${refresh_mat_script} + sbatch ${dep} -t 0-01:00 -n 1 -J update_webmonitoring_${tag} -o ${submit_dir}/update_webmonitoring_${tag}.log --mem 12G --constraint el9 --mail-user=${mail_user} --mail-type=${mail_type} ${update_web_script} + sbatch -t 0-00:15 -n 1 -J tar_logs_${tag} -o ${submit_dir}/tar_logs_${tag}.log --mem 1G --mail-user=${mail_user} --mail-type=${mail_type} --wrap="${tar_logs_script} -s ${site,,} -d 2" fi diff --git a/scripts/transfers/refresh_mat_views.bash b/scripts/transfers/refresh_mat_views.bash old mode 100644 new mode 100755 index d0044f3c..f570d07f --- a/scripts/transfers/refresh_mat_views.bash +++ b/scripts/transfers/refresh_mat_views.bash @@ -1,7 +1,8 @@ -#!/bin/bash +#!/bin/bash -l cd /pbs/home/p/prod_grand/softs/grand +export PLATFORM=redhat-9-x86_64 source /pbs/throng/grand/soft/miniconda3/etc/profile.d/conda.sh -conda activate /sps/grand/software/conda/grandlib_2304 +conda activate /sps/grand/software/conda/grandlib_2409 source env/setup.sh -cd /pbs/home/p/prod_grand/scripts/transfers +#cd /pbs/home/p/prod_grand/scripts/transfers python3 /pbs/home/p/prod_grand/softs/grand/granddb/refresh_mat_views.py \ No newline at end of file diff --git a/scripts/transfers/register_transfer.bash b/scripts/transfers/register_transfer.bash index 5e4f9d39..39987a45 100644 --- a/scripts/transfers/register_transfer.bash +++ b/scripts/transfers/register_transfer.bash @@ -19,10 +19,16 @@ while getopts ":d:t:" option; do esac done +uname -r |grep el9 >/dev/null +el9=$? cd /pbs/home/p/prod_grand/softs/grand source /pbs/throng/grand/soft/miniconda3/etc/profile.d/conda.sh -conda activate /sps/grand/software/conda/grandlib_2304 +if [ "$el9" -ne 0 ]; then + conda activate /sps/grand/software/conda/grandlib_2304 +else + conda activate /sps/grand/software/conda/grandlib_2409 +fi source env/setup.sh cd /pbs/home/p/prod_grand/scripts/transfers diff --git a/scripts/transfers/tar_logs.bash b/scripts/transfers/tar_logs.bash new file mode 100755 index 00000000..7e1112d0 --- /dev/null +++ b/scripts/transfers/tar_logs.bash @@ -0,0 +1,28 @@ +#!/bin/bash +# Script to tar all logs olders than -d month for site -s site. +data_path='/sps/grand/data' + +while getopts ":d:s:" option ${args}; do + case $option in + d) + monthbefore=${OPTARG};; + s) + site=${OPTARG};; + :) + printf "option -${OPTARG} need an argument\n" + exit 1;; + ?) # Invalid option + printf "Error: Invalid option -${OPTARG}\n" + exit 1;; + esac +done + +if [ -z "$site" ] || [ -z "$monthbefore" ];then + printf "Missing option -s or -d\n" + exit 1 +fi + +monthstart="$(date -d "$(date +%y-%m-1) - ${monthbefore} month")" +monthend=$(date -d "$(date +%y-%m-1) - $((${monthbefore}-1)) month") +datetag="$(date -d "$(date +%y-%m-1) - ${monthbefore} month" +%Y-%m)" +find /sps/grand/data/${site}/logs/ -type f -newermt "${monthstart}" -and -not -newermt "${monthend}" -and -not -name '*.tgz' -and -not -name '*.tar' -and -not -name '*.gz' |xargs tar --remove-files -uvf /sps/grand/data/${site}/logs/logs_${datetag}.tar