Skip to content
This repository has been archived by the owner on Feb 24, 2022. It is now read-only.

Commit

Permalink
Merge pull request #4526 from dciangot/fixes
Browse files Browse the repository at this point in the history
connection exception fixes and other minor
  • Loading branch information
mmascher authored Feb 21, 2017
2 parents 292c152 + 3d96c74 commit b79c4c7
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 62 deletions.
21 changes: 14 additions & 7 deletions src/python/AsyncStageOut/PublisherDaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,26 @@ def __init__(self, config):
#Need a better way to test this without turning off this next line
BaseDaemon.__init__(self, config, 'DBSPublisher')

server = CouchServer(dburl=self.config.couch_instance,
ckey=self.config.opsProxy,
cert=self.config.opsProxy)
self.logger.debug('Connected to CouchDB')
try:
config_server = CouchServer(dburl=self.config.config_couch_instance)
self.config_db = config_server.connectDatabase(self.config.config_database)
self.logger.debug('Connected to config CouchDB')
except:
self.logger.exception('Failed when contacting local couch')
raise
# Set up a factory for loading plugins
self.factory = WMFactory(self.config.schedAlgoDir,
namespace=self.config.schedAlgoDir)
self.pool = Pool(processes=self.config.publication_pool_size)

if self.config.isOracle:
try:
self.oracleDB = HTTPRequests(self.config.oracleDB,
self.config.opsProxy,
self.config.opsProxy)
else:
self.db = server.connectDatabase(self.config.files_database)
self.logger.debug('Contacting OracleDB:' + self.config.oracleDB)
except:
self.logger.exception('Failed when contacting Oracle')
raise

def algorithm(self, parameters=None):
"""
Expand Down Expand Up @@ -122,6 +127,7 @@ def active_users(self, db):
except Exception as ex:
self.logger.error("Failed to acquire publications \
from oracleDB: %s" %ex)
return []

fileDoc = dict()
fileDoc['asoworker'] = self.config.asoworker
Expand All @@ -137,6 +143,7 @@ def active_users(self, db):
except Exception as ex:
self.logger.error("Failed to acquire publications \
from oracleDB: %s" %ex)
return []

self.logger.debug("%s acquired puclications retrieved" % len(result))
#TODO: join query for publisher (same of submitter)
Expand Down
11 changes: 4 additions & 7 deletions src/python/AsyncStageOut/PublisherWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,15 @@ def __init__(self, user, config):
# self.cache_area = self.config.cache_area
if os.getenv("TEST_ASO"):
self.db = None
elif not self.config.isOracle:
server = CouchServer(dburl=self.config.couch_instance,
ckey=self.config.opsProxy,
cert=self.config.opsProxy)
self.db = server.connectDatabase(self.config.files_database)
else:
try:
self.oracleDB = HTTPRequests(self.config.oracleDB,
self.config.opsProxy,
self.config.opsProxy)
self.oracleDB_user = HTTPRequests(self.config.oracleDB,
self.userProxy,
self.userProxy)
except:
self.logger.exception('Failed to contact Oracle')
self.phedexApi = PhEDEx(responseType='json')
self.max_files_per_block = max(1, self.config.max_files_per_block)
self.block_publication_timeout = self.config.block_closure_timeout
Expand Down Expand Up @@ -501,7 +498,7 @@ def __call__(self):
self.logger.debug("Updating last publication type for: %s " % workflow)
data['workflow'] = workflow
data['subresource'] = 'updatepublicationtime'
result = self.oracleDB_user.get(self.config.oracleFileTrans.replace('filetransfers','task'),
result = self.oracleDB_user.post(self.config.oracleFileTrans.replace('filetransfers','task'),
data=encodeRequest(data))
self.logger.debug("%s last publication type update: %s " % (workflow,str(result)))
self.logger.info("Publications for user %s (group: %s, role: %s) completed." % (self.user,
Expand Down
56 changes: 37 additions & 19 deletions src/python/AsyncStageOut/ReporterWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,13 @@ def __init__(self, user, config):
self.logger.error('Did not get valid proxy. Setting proxy to ops proxy')
self.userProxy = config.opsProxy

if self.config.isOracle:
try:
self.oracleDB = HTTPRequests(self.config.oracleDB,
config.opsProxy,
config.opsProxy)
except Exception:
self.logger.exception()
raise
else:
server = CouchServer(dburl=self.config.couch_instance, ckey=self.config.opsProxy, cert=self.config.opsProxy)
self.db = server.connectDatabase(self.config.files_database)
try:
self.oracleDB = HTTPRequests(self.config.oracleDB,
config.opsProxy,
config.opsProxy)
except Exception:
self.logger.exception('failed to connect to Oracle')
raise

# Set up a factory for loading plugins
self.factory = WMFactory(self.config.pluginDir, namespace = self.config.pluginDir)
Expand Down Expand Up @@ -228,7 +224,10 @@ def __call__(self):
self.logger.debug('failed indexes %s' % failed_indexes)
for i in failed_indexes:
failed_lfns.append(json_data['LFNs'][i])
failure_reason.append(json_data['failure_reason'][i])
try:
failure_reason.append(json_data['failure_reason'][i])
except:
failure_reason.append('Unable to find failure reason')
self.logger.debug('Marking failed %s %s' %(failed_lfns, failure_reason))
updated_failed_lfns = self.mark_failed(failed_lfns, failure_reason)

Expand All @@ -247,8 +246,12 @@ def __call__(self):
self.logger.exception('Either no files to mark or failed to update state')

# Remove the json file
self.logger.debug('Removing %s' % input_file)
os.unlink( input_file )
if len(updated_good_lfns) == len(good_lfns) and len(updated_failed_lfns) == len(failed_lfns):
try:
self.logger.debug('Removing %s' % input_file)
os.unlink( input_file )
except:
self.logger.exception('Failed to remove '+ input_file)

else:
self.logger.info('Empty file %s' % input_file)
Expand Down Expand Up @@ -356,9 +359,12 @@ def mark_good(self, files):
self.logger.debug("site not found... gathering info from phedex")
self.site_tfc_map[document["source"]] = self.get_tfc_rules(document["source"])
pfn = self.apply_tfc_to_lfn( '%s:%s' %(document["source"], lfn))
self.logger.debug("File has to be removed now from source site: %s" %pfn)
self.remove_files(self.userProxy, pfn)
self.logger.debug("Transferred file removed from source")
try:
self.logger.debug("File has to be removed now from source site: %s" %pfn)
self.remove_files(self.userProxy, pfn)
self.logger.debug("Transferred file removed from source")
except:
self.logger.exception('Error removing file from source')
return updated_lfn

def remove_files(self, userProxy, pfn):
Expand Down Expand Up @@ -452,12 +458,24 @@ def mark_failed(self, files=[], failures_reasons=[], force_fail=False):
if force_fail or document['transfer_retry_count'] + 1 > self.max_retry:
data['list_of_transfer_state'] = 'FAILED'
data['list_of_retry_value'] = 0
try:
self.logger.debug("File has to be removed now from source site: %s" %pfn)
self.remove_files(self.userProxy, pfn)
self.logger.debug("Transferred file removed from source")
except:
self.logger.exception('Error removing file from source')
else:
data['list_of_transfer_state'] = 'RETRY'
fatal_error = self.determine_fatal_error(failures_reasons[files.index(lfn)])
if fatal_error:
data['list_of_transfer_state'] = 'FAILED'

try:
self.logger.debug("File has to be removed now from source site: %s" %pfn)
self.remove_files(self.userProxy, pfn)
self.logger.debug("Transferred file removed from source")
except:
self.logger.exception('Error removing file from source')

data['list_of_failure_reason'] = failures_reasons[files.index(lfn)]
data['list_of_retry_value'] = 0

Expand All @@ -469,7 +487,7 @@ def mark_failed(self, files=[], failures_reasons=[], force_fail=False):
self.logger.debug("Marked failed %s" % lfn)
except Exception as ex:
self.logger.error("Error updating document status: %s" %ex)
continue
return {}
else:
try:
document = self.db.document( docId )
Expand Down
11 changes: 7 additions & 4 deletions src/python/AsyncStageOut/RetryManagerDaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ def __init__(self, config):
BaseDaemon.__init__(self, config, 'RetryManager')

if self.config.isOracle:
self.oracleDB = HTTPRequests(self.config.oracleDB,
self.config.opsProxy,
self.config.opsProxy)
try:
self.oracleDB = HTTPRequests(self.config.oracleDB,
self.config.opsProxy,
self.config.opsProxy)
except:
self.logger.exception('Failed to connect to Oracle')
else:
try:
server = CouchServer(dburl=self.config.couch_instance,
Expand Down Expand Up @@ -111,9 +114,9 @@ def algorithm(self, parameters=None):
try:
results = self.oracleDB.post(self.config.oracleFileTrans,
data=encodeRequest(fileDoc))
logging.info("Retried files in cooloff: %s" % str(results))
except Exception:
self.logger.exception("Failed to get retry transfers in oracleDB: %s")
logging.info("Retried files in cooloff: %s" % str(results))
else:
self.doRetries()

Expand Down
26 changes: 15 additions & 11 deletions src/python/AsyncStageOut/TransferDaemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,20 @@ def __init__(self, config):
if not e.errno == errno.EEXIST:
self.logger.exception('Unknown error in mkdir' % e.errno)
raise
try:
config_server = CouchServer(dburl=self.config.config_couch_instance)
self.config_db = config_server.connectDatabase(self.config.config_database)
except:
self.logger.exception('Failed when contacting local couch')
raise

config_server = CouchServer(dburl=self.config.config_couch_instance)
self.config_db = config_server.connectDatabase(self.config.config_database)
if self.config.isOracle:
try:
self.oracleDB = HTTPRequests(self.config.oracleDB,
self.config.opsProxy,
self.config.opsProxy)
else:
server = CouchServer(dburl=self.config.couch_instance,
ckey=self.config.opsProxy,
cert=self.config.opsProxy)
self.db = server.connectDatabase(self.config.files_database)
self.logger.debug('Connected to CouchDB')
except:
self.logger.exception('Failed when contacting Oracle')
raise
self.pool = Pool(processes=self.config.pool_size)
self.factory = WMFactory(self.config.schedAlgoDir,
namespace=self.config.schedAlgoDir)
Expand All @@ -121,13 +122,15 @@ def __init__(self, config):
'cert':self.config.opsProxy})
except Exception as e:
self.logger.exception('PhEDEx exception: %s' % e)
raise
# TODO: decode xml
try:
self.phedex2 = PhEDEx(responseType='json',
dict={'key':self.config.opsProxy,
'cert':self.config.opsProxy})
except Exception as e:
self.logger.exception('PhEDEx exception: %s' % e)
raise

self.logger.debug(type((self.phedex2.getNodeMap())['phedex']['node']))
for site in [x['name'] for x in self.phedex2.getNodeMap()['phedex']['node']]:
Expand Down Expand Up @@ -162,7 +165,7 @@ def algorithm(self, parameters=None):
u[i] = ''

self.logger.debug('current_running %s' % current_running)
self.logger.debug('BBBBBB: %s %s %s' % (u, current_running, (u not in current_running)))
self.logger.debug('Testing current running: %s %s %s' % (u, current_running, (u not in current_running)))
if u not in current_running:
self.logger.debug('processing %s' % u)
current_running.append(u)
Expand All @@ -189,6 +192,7 @@ def oracleSiteUser(self, db):
except Exception as ex:
self.logger.error("Failed to acquire transfers \
from oracleDB: %s" % ex)
return []

self.logger.debug(oracleOutputMapping(result))
# TODO: translate result into list((user,group,role),...)
Expand All @@ -200,7 +204,7 @@ def oracleSiteUser(self, db):
self.logger.info('Users to process: %s' % str(users))
except:
self.logger.exception('User data malformed. ')
else:
else:
self.logger.info('No new user to acquire')
return []

Expand Down
24 changes: 10 additions & 14 deletions src/python/AsyncStageOut/TransferWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,9 @@ def __init__(self, user, tfc_map, config):
self.factory = WMFactory(self.config.pluginDir, namespace=self.config.pluginDir)
self.commandTimeout = 1200
try:
if self.config.isOracle:
self.oracleDB = HTTPRequests(self.config.oracleDB,
self.config.opsProxy,
self.config.opsProxy)
else:
server = CouchServer(dburl=self.config.couch_instance, ckey=self.config.opsProxy, cert=self.config.opsProxy)
self.db = server.connectDatabase(self.config.files_database)
self.oracleDB = HTTPRequests(self.config.oracleDB,
self.config.opsProxy,
self.config.opsProxy)
config_server = CouchServer(dburl=self.config.config_couch_instance, ckey=self.config.opsProxy, cert=self.config.opsProxy)
self.config_db = config_server.connectDatabase(self.config.config_database)
self.fts_server_for_transfer = getFTServer("T1_UK_RAL", 'getRunningFTSserver', self.config_db, self.logger)
Expand Down Expand Up @@ -604,11 +600,11 @@ def mark_acquired(self, files=[]):
docbyId = self.oracleDB.get(self.config.oracleFileTrans.replace('filetransfers','fileusertransfers'),
data=encodeRequest({'subresource': 'getById', 'id': docId}))
document = oracleOutputMapping(docbyId, None)[0]
dash_rep = (document['jobid'], document['job_retry_count'], document['taskname'])
lfn_in_transfer.append(lfn)
except Exception as ex:
self.logger.error("Error during dashboard report update: %s" %ex)

lfn_in_transfer.append(lfn)
dash_rep = (document['jobid'], document['job_retry_count'], document['taskname'])
return [],()

try:
fileDoc = dict()
Expand All @@ -624,7 +620,7 @@ def mark_acquired(self, files=[]):
self.logger.debug("Marked acquired %s of %s" % (fileDoc, result))
except Exception as ex:
self.logger.error("Error during status update: %s" %ex)

return [],()
# TODO: no need of mark good right? the postjob should updated the status in case of direct stageout I think
return lfn_in_transfer, dash_rep
else:
Expand Down Expand Up @@ -734,8 +730,8 @@ def mark_failed(self, files=[], force_fail=False, submission_error=False):
'fileusertransfers'),
data=encodeRequest({'subresource': 'getById', 'id': docId}))
except Exception as ex:
self.logger.error("Error updating failed docs: %s" %ex)
continue
self.logger.error("Error getting failed docs: %s" %ex)
return []
document = oracleOutputMapping(docbyId, None)[0]
self.logger.debug("Document: %s" % document)

Expand Down Expand Up @@ -769,7 +765,7 @@ def mark_failed(self, files=[], force_fail=False, submission_error=False):
msg += str(ex)
msg += str(traceback.format_exc())
self.logger.error(msg)
continue
return []

else:
docId = getHashLfn(temp_lfn)
Expand Down

0 comments on commit b79c4c7

Please sign in to comment.