From ff45676376323d7b60c344f42b0aea09053a48ab Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Fri, 19 Aug 2022 17:07:46 +0200 Subject: [PATCH 1/4] feat (MySQL): add profiling utility --- .../environment_variable_configuration.rst | 3 + src/DIRAC/Core/Utilities/MySQL.py | 180 ++++++++++++++++-- 2 files changed, 163 insertions(+), 20 deletions(-) diff --git a/docs/source/AdministratorGuide/ServerInstallations/environment_variable_configuration.rst b/docs/source/AdministratorGuide/ServerInstallations/environment_variable_configuration.rst index 144f543acb5..1e17e11e874 100644 --- a/docs/source/AdministratorGuide/ServerInstallations/environment_variable_configuration.rst +++ b/docs/source/AdministratorGuide/ServerInstallations/environment_variable_configuration.rst @@ -50,6 +50,9 @@ DIRAC_M2CRYPTO_SSL_CIPHERS DIRAC_M2CRYPTO_SSL_METHODS If set, overwrites the default SSL methods accepted. It should be a colon separated list. See :py:mod:`DIRAC.Core.DISET` +DIRAC_MYSQL_OPTIMIZER_TRACES_PATH + If set, it should point to an existing directory, where MySQL Optimizer traces will be stored. See :py:func:`DIRAC.Core.Utilities.MySQL.captureOptimizerTraces` + DIRAC_NO_CFG If set to anything, cfg files on the command line must be passed to the command using the --cfg option. diff --git a/src/DIRAC/Core/Utilities/MySQL.py b/src/DIRAC/Core/Utilities/MySQL.py index 08a82e4a44c..47b770dfbf6 100755 --- a/src/DIRAC/Core/Utilities/MySQL.py +++ b/src/DIRAC/Core/Utilities/MySQL.py @@ -147,6 +147,9 @@ """ import collections +import functools +import json +import os import time import threading import MySQLdb @@ -198,6 +201,127 @@ def _quotedList(fieldList=None): return ", ".join(quotedFields) +def captureOptimizerTraces(meth): + """If enabled, this will dump the optimizer trace for each query performed. + Obviously, it has a performance cost... + + In order to enable the tracing, the environment variable ``DIRAC_MYSQL_OPTIMIZER_TRACES_PATH`` + should be set and point to an existing directory where the files will be stored. + + It makes sense to enable it when preparing the migration to a newer major version + of mysql: you run your integration tests (or whever scenario you prepared) with the old version, + then the same tests with the new version, and compare the output files. + + The file produced are called "optimizer_trace__.json" + The hash is here to minimize the risk of concurence for the same file. + The timestamp is to maintain the execution order. For easier comparison between two executions, + you can rename the files with a sequence number. + + .. code-block:: bash + + cd ${DIRAC_MYSQL_OPTIMIZER_TRACES_PATH} + c=0; for i in $(ls); do newFn=$(echo $i | sed -E "s/_trace_[0-9]+.[0-9]+_(.*)/_trace_${c}_\1/g"); mv $i $newFn; c=$(( c + 1 )); done + + This tool is useful then to compare the files https://github.com/cosmicanant/recursive-diff + + Note that this method is far from pretty: + + * error handling is not really done. Of course, I could add a lot of safety and try/catch and what not, + but if you are using this, it means you reaaaally want to profile something. And in that case, you want things + to go smoothly. And if they don't, you want to see it, and you want it to crash. + * it mangles a bit with the connection pool to be able to capture the traces + + All the docs related to the optimizer tracing is available here https://dev.mysql.com/doc/internals/en/optimizer-tracing.html + + The generated file contains one of the following: + + * ``{"EmptyTrace": arguments}``: some method like "show tables" do not generate a trace + * A list of dictionaries, one per trace for the specific call: + + * ``{ "Query": , "Trace" : }`` if all is fine + * ``{"Error": }`` in case something goes wrong. See the lower in the code + for the description of errors + + + """ + + optimizerTracingFolder = os.environ.get("DIRAC_MYSQL_OPTIMIZER_TRACES_PATH") + + @functools.wraps(meth) + def innerMethod(self, *args, **kwargs): + + # First, get a connection to the DB, and enable the tracing + connection = self.__connectionPool.get(self.__dbName) + connection.cursor().execute('SET optimizer_trace="enabled=on";') + # We also set some options that worked for my use case. + # you may need to tune these parameters if you have huge traces + # or more recursive calls. + # I doubt it though.... + + connection.cursor().execute( + "SET optimizer_trace_offset=0, optimizer_trace_limit=20, optimizer_trace_max_mem_size=131072;" + ) + + # Because we can only trace on a per session base, give the same connection object + # to the actual method + kwargs["conn"] = connection + + # Execute the method + res = meth(self, *args, **kwargs) + + # Turn of the tracing + connection.cursor().execute('SET optimizer_trace="enabled=off";') + + # Get all the traces + cursor = connection.cursor() + if cursor.execute("SELECT * FROM INFORMATION_SCHEMA.OPTIMIZER_TRACE;"): + queryTraces = cursor.fetchall() + else: + queryTraces = () + + # Generate a filename stored in DIRAC_MYSQL_OPTIMIZER_TRACES_PATH + methHash = hash(f"{args},{kwargs}") + # optimizer_trace__.json + jsonFn = os.path.join(optimizerTracingFolder, f"optimizer_trace_{time.time()}_{methHash}.json") + + with open(jsonFn, "wt") as f: + # Some calls do not generate a trace, like "show tables" + if not queryTraces: + json.dump({"EmptyTrace": args}, f) + + jsonTraces = [] + + for trace_query, trace_analysis, trace_missingBytes, trace_privilegeError in queryTraces: + # if trace_privilegeError is True, it's a permission error + # https://dev.mysql.com/doc/internals/en/privilege-checking.html + # It may particularly happen with stored procedures. + # Although it is not really a good practice, for profiling purposes, + # I would go with the no brainer solution: GRANT ALL ON .* TO 'Dirac'@'%'; + if trace_privilegeError: + # f.write(f"ERROR: {args}") + jsonTraces.append({"Error": f"PrivilegeError: {args}"}) + continue + + # The memory is not large enough to store all the traces, so it is truncated + # https://dev.mysql.com/doc/internals/en/tracing-memory-usage.html + if trace_missingBytes: + jsonTraces.append({"Error": f"MissingBytes {trace_missingBytes}"}) + continue + + jsonTraces.append( + {"Query": trace_query, "Trace": json.loads(trace_analysis) if trace_analysis else None} + ) + + json.dump(jsonTraces, f) + + return res + + if optimizerTracingFolder: + return innerMethod + else: + return meth + + class ConnectionPool: """ Management of connections per thread @@ -583,6 +707,7 @@ def _connect(self): self._connected = True return S_OK() + @captureOptimizerTraces def _query(self, cmd, conn=None, debug=True): """ execute MySQL query command @@ -596,10 +721,13 @@ def _query(self, cmd, conn=None, debug=True): self.log.debug("_query: %s" % self._safeCmd(cmd)) - retDict = self._getConnection() - if not retDict["OK"]: - return retDict - connection = retDict["Value"] + if conn: + connection = conn + else: + retDict = self._getConnection() + if not retDict["OK"]: + return retDict + connection = retDict["Value"] try: cursor = connection.cursor() @@ -627,6 +755,7 @@ def _query(self, cmd, conn=None, debug=True): return retDict + @captureOptimizerTraces def _update(self, cmd, conn=None, debug=True): """execute MySQL update command @@ -637,11 +766,13 @@ def _update(self, cmd, conn=None, debug=True): """ self.log.debug("_update: %s" % self._safeCmd(cmd)) - - retDict = self._getConnection() - if not retDict["OK"]: - return retDict - connection = retDict["Value"] + if conn: + connection = conn + else: + retDict = self._getConnection() + if not retDict["OK"]: + return retDict + connection = retDict["Value"] try: cursor = connection.cursor() @@ -1475,14 +1606,18 @@ def insertFields(self, tableName, inFields=None, inValues=None, conn=None, inDic # self.log.debug('insertFields:', 'inserting %s into table %s' # % (inFieldString, table)) - return self._update(f"INSERT INTO {table} {inFieldString} VALUES {inValueString}", conn) + return self._update(f"INSERT INTO {table} {inFieldString} VALUES {inValueString}", conn=conn) - def executeStoredProcedure(self, packageName, parameters, outputIds): - conDict = self._getConnection() - if not conDict["OK"]: - return conDict + @captureOptimizerTraces + def executeStoredProcedure(self, packageName, parameters, outputIds, conn=None): + if conn: + connection = conn + else: + conDict = self._getConnection() + if not conDict["OK"]: + return conDict - connection = conDict["Value"] + connection = conDict["Value"] cursor = connection.cursor() try: cursor.callproc(packageName, parameters) @@ -1503,12 +1638,17 @@ def executeStoredProcedure(self, packageName, parameters, outputIds): return retDict # For the procedures that execute a select without storing the result - def executeStoredProcedureWithCursor(self, packageName, parameters): - conDict = self._getConnection() - if not conDict["OK"]: - return conDict + @captureOptimizerTraces + def executeStoredProcedureWithCursor(self, packageName, parameters, conn=None): + if conn: + connection = conn + else: + conDict = self._getConnection() + if not conDict["OK"]: + return conDict + + connection = conDict["Value"] - connection = conDict["Value"] cursor = connection.cursor() try: # execStr = "call %s(%s);" % ( packageName, ",".join( map( str, parameters ) ) ) From a73ba156d75e8ea400f6668f47aa15e69de2bc34 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Fri, 19 Aug 2022 15:42:29 +0200 Subject: [PATCH 2/4] feat (MySQL): Make keyword parameters mandatory for standard calls --- src/DIRAC/Core/Utilities/MySQL.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/DIRAC/Core/Utilities/MySQL.py b/src/DIRAC/Core/Utilities/MySQL.py index 47b770dfbf6..0f92977ad44 100755 --- a/src/DIRAC/Core/Utilities/MySQL.py +++ b/src/DIRAC/Core/Utilities/MySQL.py @@ -708,7 +708,7 @@ def _connect(self): return S_OK() @captureOptimizerTraces - def _query(self, cmd, conn=None, debug=True): + def _query(self, cmd, *, conn=None, debug=True): """ execute MySQL query command @@ -756,7 +756,7 @@ def _query(self, cmd, conn=None, debug=True): return retDict @captureOptimizerTraces - def _update(self, cmd, conn=None, debug=True): + def _update(self, cmd, *, conn=None, debug=True): """execute MySQL update command :param debug: print or not the errors @@ -1096,7 +1096,7 @@ def countEntries( return S_ERROR(DErrno.EMYSQL, x) cmd = f"SELECT COUNT(*) FROM {table} {cond}" - res = self._query(cmd, connection) + res = self._query(cmd, cconn=onnection) if not res["OK"]: return res @@ -1139,7 +1139,7 @@ def getCounters( return S_ERROR(DErrno.EMYSQL, x) cmd = f"SELECT {attrNames}, COUNT(*) FROM {table} {cond} GROUP BY {attrNames} ORDER BY {attrNames}" - res = self._query(cmd, connection) + res = self._query(cmd, conn=connection) if not res["OK"]: return res @@ -1188,7 +1188,7 @@ def getDistinctAttributeValues( return S_ERROR(DErrno.EMYSQL, exc) cmd = f"SELECT DISTINCT( {attributeName} ) FROM {table} {cond} ORDER BY {attributeName}" - res = self._query(cmd, connection) + res = self._query(cmd, conn=connection) if not res["OK"]: return res attr_list = [x[0] for x in res["Value"]] @@ -1421,7 +1421,7 @@ def getFields( except Exception as x: return S_ERROR(DErrno.EMYSQL, x) - return self._query(f"SELECT {quotedOutFields} FROM {table} {condition}", conn) + return self._query(f"SELECT {quotedOutFields} FROM {table} {condition}", conn=conn) ############################################################################# def deleteEntries( @@ -1465,7 +1465,7 @@ def deleteEntries( except Exception as x: return S_ERROR(DErrno.EMYSQL, x) - return self._update(f"DELETE FROM {table} {condition}", conn) + return self._update(f"DELETE FROM {table} {condition}", conn=conn) ############################################################################# def updateFields( @@ -1551,7 +1551,7 @@ def updateFields( [f"{_quotedList([updateFields[k]])} = {updateValues[k]}" for k in range(len(updateFields))] ) - return self._update(f"UPDATE {table} SET {updateString} {condition}", conn) + return self._update(f"UPDATE {table} SET {updateString} {condition}", conn=conn) ############################################################################# def insertFields(self, tableName, inFields=None, inValues=None, conn=None, inDict=None): @@ -1609,7 +1609,7 @@ def insertFields(self, tableName, inFields=None, inValues=None, conn=None, inDic return self._update(f"INSERT INTO {table} {inFieldString} VALUES {inValueString}", conn=conn) @captureOptimizerTraces - def executeStoredProcedure(self, packageName, parameters, outputIds, conn=None): + def executeStoredProcedure(self, packageName, parameters, outputIds, *, conn=None): if conn: connection = conn else: @@ -1639,7 +1639,7 @@ def executeStoredProcedure(self, packageName, parameters, outputIds, conn=None): # For the procedures that execute a select without storing the result @captureOptimizerTraces - def executeStoredProcedureWithCursor(self, packageName, parameters, conn=None): + def executeStoredProcedureWithCursor(self, packageName, parameters, *, conn=None): if conn: connection = conn else: From 14531d1977fb9ee2a6e7d803b8b5ce50150b4ef5 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Fri, 19 Aug 2022 17:08:44 +0200 Subject: [PATCH 3/4] fix: Call MySQL method with named parameters --- src/DIRAC/AccountingSystem/DB/AccountingDB.py | 4 +- .../DatasetManager/DatasetManager.py | 14 +-- .../DirectoryManager/DirectoryClosure.py | 2 +- .../DirectoryManager/DirectoryLevelTree.py | 26 +++--- .../DirectoryManager/DirectoryTreeBase.py | 22 ++--- .../FileManager/FileManager.py | 46 +++++----- .../FileManager/FileManagerBase.py | 44 ++++----- .../FileManager/FileManagerFlat.py | 22 ++--- .../FileManager/FileManagerPs.py | 2 +- .../SEManager/SEManagerDB.py | 4 +- src/DIRAC/ProductionSystem/DB/ProductionDB.py | 24 ++--- .../DB/StorageManagementDB.py | 90 +++++++++---------- .../DB/TransformationDB.py | 84 ++++++++--------- 13 files changed, 192 insertions(+), 192 deletions(-) diff --git a/src/DIRAC/AccountingSystem/DB/AccountingDB.py b/src/DIRAC/AccountingSystem/DB/AccountingDB.py index 8cd0efea6d9..7f18b257d17 100644 --- a/src/DIRAC/AccountingSystem/DB/AccountingDB.py +++ b/src/DIRAC/AccountingSystem/DB/AccountingDB.py @@ -510,10 +510,10 @@ def __addKeyValue(self, typeName, keyName, keyValue): return retVal connection = retVal["Value"] self.log.info(f"Value {keyValue} for key {keyName} didn't exist, inserting") - retVal = self.insertFields(keyTable, ["id", "value"], [0, keyValue], connection) + retVal = self.insertFields(keyTable, ["id", "value"], [0, keyValue], conn=connection) if not retVal["OK"] and retVal["Message"].find("Duplicate key") == -1: return retVal - result = self.__getIdForKeyValue(typeName, keyName, keyValue, connection) + result = self.__getIdForKeyValue(typeName, keyName, keyValue, conn=connection) if not result["OK"]: return result keyCache[keyValue] = result["Value"] diff --git a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DatasetManager/DatasetManager.py b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DatasetManager/DatasetManager.py index 85f5b7dbb1e..77ede27369c 100644 --- a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DatasetManager/DatasetManager.py +++ b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DatasetManager/DatasetManager.py @@ -168,13 +168,13 @@ def _findDatasets(self, datasets, connection=False): resultDict = {"Successful": {}, "Failed": {}} if fullNames: - result = self.__findFullPathDatasets(fullNames, connection) + result = self.__findFullPathDatasets(fullNames, connection=connection) if not result["OK"]: return result resultDict = result["Value"] if shortNames: - result = self.__findNoPathDatasets(shortNames, connection) + result = self.__findNoPathDatasets(shortNames, connection=connection) if not result["OK"]: return result resultDict["Successful"].update(result["Value"]["Successful"]) @@ -209,7 +209,7 @@ def __findFullPathDatasets(self, datasets, connection): wheres.append("( DirID=%d AND DatasetName IN (%s) )" % (dirID, stringListToString(dsNames))) req = "SELECT DatasetName,DirID,DatasetID FROM FC_MetaDatasets WHERE %s" % " OR ".join(wheres) - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: return result for dsName, dirID, dsID in result["Value"]: @@ -230,7 +230,7 @@ def __findNoPathDatasets(self, nodirDatasets, connection): dsIDs = {} req = "SELECT COUNT(DatasetName),DatasetName,DatasetID FROM FC_MetaDatasets WHERE DatasetName in " req += "( %s ) GROUP BY DatasetName,DatasetID" % stringListToString(nodirDatasets) - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: return result for dsCount, dsName, dsID in result["Value"]: @@ -243,7 +243,7 @@ def __findNoPathDatasets(self, nodirDatasets, connection): req = "SELECT DatasetName,DatasetID,DirID FROM FC_MetaDatasets WHERE DatasetID in (%s)" % ",".join( dsIDs.values() ) - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: return result for dsName, dsID, dirID in result["Value"]: @@ -259,7 +259,7 @@ def addDatasetAnnotation(self, datasets, credDict): """Add annotation to the given dataset""" connection = self._getConnection() successful = {} - result = self._findDatasets(list(datasets), connection) + result = self._findDatasets(list(datasets), connection=connection) if not result["OK"]: return result failed = result["Value"]["Failed"] @@ -270,7 +270,7 @@ def addDatasetAnnotation(self, datasets, credDict): annotation, datasetDict[dataset]["DatasetID"], ) - result = self.db._update(req, connection) + result = self.db._update(req, conn=connection) if not result["OK"]: failed[dataset] = "Failed to add annotation" else: diff --git a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DirectoryManager/DirectoryClosure.py b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DirectoryManager/DirectoryClosure.py index 577d311a7ea..a1202f502eb 100644 --- a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DirectoryManager/DirectoryClosure.py +++ b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DirectoryManager/DirectoryClosure.py @@ -197,7 +197,7 @@ def getPathIDsByID(self, dirID): def getChildren(self, path, connection=False): """Get child directory IDs for the given directory""" if isinstance(path, str): - result = self.findDir(path, connection) + result = self.findDir(path, connection=connection) if not result["OK"]: return result if not result["Value"]: diff --git a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DirectoryManager/DirectoryLevelTree.py b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DirectoryManager/DirectoryLevelTree.py index b087bc1d3a9..adc2445b581 100644 --- a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DirectoryManager/DirectoryLevelTree.py +++ b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DirectoryManager/DirectoryLevelTree.py @@ -31,7 +31,7 @@ def findDir(self, path, connection=False): return dpath dpath = dpath["Value"] req = "SELECT DirID,Level from FC_DirectoryLevelTree WHERE DirName=%s" % dpath - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: return result @@ -52,7 +52,7 @@ def findDirs(self, paths, connection=False): dpathList.append(dpath["Value"]) dpaths = ",".join(dpathList) req = "SELECT DirName,DirID from FC_DirectoryLevelTree WHERE DirName in (%s)" % dpaths - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: return result dirDict = {} @@ -82,7 +82,7 @@ def __getNumericPath(self, dirID, connection=False): """Get the enumerated path of the given directory""" epathString = ",".join(["LPATH%d" % (i + 1) for i in range(MAX_LEVELS)]) req = "SELECT LEVEL,%s FROM FC_DirectoryLevelTree WHERE DirID=%d" % (epathString, dirID) - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: return result if not result["Value"]: @@ -332,7 +332,7 @@ def getPathIDsByID(self, dirID): def getChildren(self, path, connection=False): """Get child directory IDs for the given directory""" if isinstance(path, str): - result = self.findDir(path, connection) + result = self.findDir(path, connection=connection) if not result["OK"]: return result if not result["Value"]: @@ -341,7 +341,7 @@ def getChildren(self, path, connection=False): else: dirID = path req = "SELECT DirID FROM FC_DirectoryLevelTree WHERE Parent=%d" % dirID - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: return result if not result["Value"]: @@ -506,12 +506,12 @@ def recoverOrphanDirectories(self, credDict): continue connection = self._getConnection() - result = self.db._query("LOCK TABLES FC_DirectoryLevelTree WRITE", connection) + result = self.db._query("LOCK TABLES FC_DirectoryLevelTree WRITE", conn=connection) if not result["OK"]: - resUnlock = self.db._query("UNLOCK TABLES", connection) + resUnlock = self.db._query("UNLOCK TABLES", conn=connection) return result - result = self.__rebuildLevelIndexes(parentID, connection) - resUnlock = self.db._query("UNLOCK TABLES", connection) + result = self.__rebuildLevelIndexes(parentID, connection=connection) + resUnlock = self.db._query("UNLOCK TABLES", conn=connection) return S_OK() @@ -525,13 +525,13 @@ def _getConnection(self, connection=False): def __rebuildLevelIndexes(self, parentID, connection=False): """Rebuild level indexes for all the subdirectories""" - result = self.__getNumericPath(parentID, connection) + result = self.__getNumericPath(parentID, connection=connection) if not result["OK"]: return result parentIndexList = result["Value"] parentLevel = result["Level"] - result = self.getChildren(parentID, connection) + result = self.getChildren(parentID, connection=connection) if not result["OK"]: return result subIDList = result["Value"] @@ -543,9 +543,9 @@ def __rebuildLevelIndexes(self, parentID, connection=False): lpaths = ["LPATH%d=%d" % (i + 1, indexList[i]) for i in range(parentLevel + 1)] lpathString = "SET " + ",".join(lpaths) req = f"UPDATE FC_DirectoryLevelTree {lpathString} WHERE DirID={dirID}" - result = self.db._update(req, connection) + result = self.db._update(req, conn=connection) if not result["OK"]: return result - result = self.__rebuildLevelIndexes(dirID, connection) + result = self.__rebuildLevelIndexes(dirID, connection=connection) return S_OK() diff --git a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DirectoryManager/DirectoryTreeBase.py b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DirectoryManager/DirectoryTreeBase.py index 1259c8f5b9d..a26a739d149 100755 --- a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DirectoryManager/DirectoryTreeBase.py +++ b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/DirectoryManager/DirectoryTreeBase.py @@ -776,7 +776,7 @@ def _getDirectoryLogicalSizeFromUsage(self, lfns, recursiveSum=True, connection= dirID = result["Value"] req = "SELECT SESize, SEFiles FROM FC_DirectoryUsage WHERE SEID=0 AND DirID=%d" % dirID - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: failed[path] = result["Message"] elif not result["Value"]: @@ -835,7 +835,7 @@ def _getDirectoryLogicalSize(self, lfns, recursiveSum=True, connection=None): ) reqDir = dirString.replace("SELECT DirID FROM", "SELECT count(*) FROM") - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: failed[path] = result["Message"] elif not result["Value"]: @@ -845,7 +845,7 @@ def _getDirectoryLogicalSize(self, lfns, recursiveSum=True, connection=None): "LogicalSize": int(result["Value"][0][0]), "LogicalFiles": int(result["Value"][0][1]), } - result = self.db._query(reqDir, connection) + result = self.db._query(reqDir, conn=connection) if result["OK"] and result["Value"]: successful[path]["LogicalDirectories"] = result["Value"][0][0] - 1 else: @@ -879,7 +879,7 @@ def _getDirectoryPhysicalSizeFromUsage(self, lfns, recursiveSum=True, connection req = "SELECT S.SEID, S.SEName, D.SESize, D.SEFiles FROM FC_DirectoryUsage as D, FC_StorageElements as S" req += " WHERE S.SEID=D.SEID AND D.DirID=%d" % dirID - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: failed[path] = result["Message"] elif not result["Value"]: @@ -932,7 +932,7 @@ def _getDirectoryPhysicalSizeFromUsage_old(self, lfns, connection): req += " JOIN (%s) AS F" % subDirString req += " WHERE S.SEID=D.SEID AND D.DirID=F.DirID" - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: failed[path] = result["Message"] elif not result["Value"]: @@ -993,7 +993,7 @@ def _getDirectoryPhysicalSize(self, lfns, recursiveSum=True, connection=None): req += "WHERE R.SEID=S.SEID AND F.FileID=R.FileID AND F.DirID=T.DirID " req += "GROUP BY S.SEID" - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: failed[path] = result["Message"] elif not result["Value"]: @@ -1161,7 +1161,7 @@ def getDirectoryCounters(self, connection=False): conn = self._getConnection(connection) resultDict = {} req = "SELECT COUNT(*) from FC_DirectoryInfo" - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res resultDict["Directories"] = res["Value"][0][0] @@ -1170,26 +1170,26 @@ def getDirectoryCounters(self, connection=False): req = f"SELECT COUNT(DirID) FROM {treeTable} WHERE Parent NOT IN ( SELECT DirID from {treeTable} )" req += " AND DirID <> 1" - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res resultDict["Orphan Directories"] = res["Value"][0][0] req = f"SELECT COUNT(DirID) FROM {treeTable} WHERE DirID NOT IN ( SELECT Parent from {treeTable} )" req += " AND DirID NOT IN ( SELECT DirID from FC_Files ) " - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res resultDict["Empty Directories"] = res["Value"][0][0] req = "SELECT COUNT(DirID) FROM %s WHERE DirID NOT IN ( SELECT DirID FROM FC_DirectoryInfo )" % treeTable - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res resultDict["DirTree w/o DirInfo"] = res["Value"][0][0] req = "SELECT COUNT(DirID) FROM FC_DirectoryInfo WHERE DirID NOT IN ( SELECT DirID FROM %s )" % treeTable - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res resultDict["DirInfo w/o DirTree"] = res["Value"][0][0] diff --git a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileManager/FileManager.py b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileManager/FileManager.py index da07e100a9f..03554feceaf 100755 --- a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileManager/FileManager.py +++ b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileManager/FileManager.py @@ -90,7 +90,7 @@ def _findFileIDs(self, lfns, connection=False): wheres.append("( DirID=%d AND FileName IN (%s) )" % (dirID, stringListToString(fileNames))) req = "SELECT FileName,DirID,FileID FROM FC_Files WHERE %s" % " OR ".join(wheres) - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: return result for fileName, dirID, fileID in result["Value"]: @@ -122,7 +122,7 @@ def _getDirectoryFiles(self, dirID, fileNames, metadata_input, allStatus=False, req = f"{req} AND Status IN ({intListToString(statusIDs)})" if fileNames: req = f"{req} AND FileName IN ({stringListToString(fileNames)})" - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res fileNameIDs = res["Value"] @@ -178,7 +178,7 @@ def _getDirectoryFiles(self, dirID, fileNames, metadata_input, allStatus=False, intListToString(metadata), intListToString(filesDict.keys()), ) - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res for tuple_ in res["Value"]: @@ -212,7 +212,7 @@ def _getFileMetadataByID(self, fileIDs, connection=False): """Get standard file metadata for a list of files specified by FileID""" stringIDs = ",".join(["%s" % id_ for id_ in fileIDs]) req = "SELECT FileID,Size,UID,GID,Status FROM FC_Files WHERE FileID in ( %s )" % stringIDs - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: return result resultDict = {} @@ -225,7 +225,7 @@ def _getFileMetadataByID(self, fileIDs, connection=False): } req = "SELECT FileID,GUID,CreationDate from FC_FileInfo WHERE FileID in ( %s )" % stringIDs - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: return result for fileID, guid, date in result["Value"]: @@ -269,7 +269,7 @@ def _insertFiles(self, lfns, uid, gid, connection=False): directorySESizeDict[dirID][0]["Files"] += 1 req = "INSERT INTO FC_Files (DirID,Size,UID,GID,Status,FileName) VALUES %s" % (",".join(insertTuples)) - res = self.db._update(req, connection) + res = self.db._update(req, conn=connection) if not res["OK"]: return res # Get the fileIDs for the inserted files @@ -322,7 +322,7 @@ def _getFileIDFromGUID(self, guid, connection=False): if not isinstance(guid, (list, tuple)): guid = [guid] req = "SELECT FileID,GUID FROM FC_FileInfo WHERE GUID IN (%s)" % stringListToString(guid) - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res guidDict = {} @@ -340,7 +340,7 @@ def getLFNForGUID(self, guids, connection=False): guids = [guids] req = "SELECT f.FileID, f.FileName, fi.GUID, f.DirID FROM FC_FileInfo fi" req += " JOIN FC_Files f on fi.FileID = f.FileID WHERE GUID IN (%s)" % stringListToString(guids) - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res @@ -400,7 +400,7 @@ def __deleteFiles(self, fileIDs, connection=False): failed = [] for table in ["FC_Files", "FC_FileInfo"]: req = f"DELETE FROM {table} WHERE FileID in ({fileIDString})" - res = self.db._update(req, connection) + res = self.db._update(req, conn=connection) if not res["OK"]: gLogger.error("Failed to remove files from table %s" % table, res["Message"]) failed.append(table) @@ -457,7 +457,7 @@ def _insertReplicas(self, lfns, master=False, connection=False): req = "INSERT INTO FC_Replicas (FileID,SEID,Status) VALUES %s" % ( ",".join(["(%d,%d,%d)" % (tuple_[0], tuple_[1], statusID) for tuple_ in insertTuples]) ) - res = self.db._update(req, connection) + res = self.db._update(req, conn=connection) if not res["OK"]: return res res = self._getRepIDsForReplica(insertTuples, connection=connection) @@ -491,7 +491,7 @@ def _insertReplicas(self, lfns, master=False, connection=False): req = "INSERT INTO FC_ReplicaInfo (RepID,RepType,CreationDate,ModificationDate,PFN) VALUES %s" % ( ",".join(insertReplicas) ) - res = self.db._update(req, connection) + res = self.db._update(req, conn=connection) if not res["OK"]: for lfn in lfns.keys(): failed[lfn] = res["Message"] @@ -509,7 +509,7 @@ def _getRepIDsForReplica(self, replicaTuples, connection=False): for fileID, seID in replicaTuples: queryTuples.append("(%d,%d)" % (fileID, seID)) req = "SELECT RepID,FileID,SEID FROM FC_Replicas WHERE (FileID,SEID) IN (%s)" % intListToString(queryTuples) - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res replicaDict = {} @@ -555,7 +555,7 @@ def _deleteReplicas(self, lfns, connection=False): directorySESizeDict[dirID].setdefault(seID, {"Files": 0, "Size": 0}) directorySESizeDict[dirID][seID]["Size"] += fileDict["Size"] directorySESizeDict[dirID][seID]["Files"] += 1 - res = self._getRepIDsForReplica(toRemove, connection) + res = self._getRepIDsForReplica(toRemove, connection=connection) if not res["OK"]: for lfn in lfnFileIDDict.keys(): failed[lfn] = res["Message"] @@ -585,7 +585,7 @@ def __deleteReplicas(self, repIDs, connection=False): failed = [] for table in ["FC_Replicas", "FC_ReplicaInfo"]: req = f"DELETE FROM {table} WHERE RepID in ({repIDString})" - res = self.db._update(req, connection) + res = self.db._update(req, conn=connection) if not res["OK"]: gLogger.error("Failed to remove replicas from table %s" % table, res["Message"]) failed.append(table) @@ -616,7 +616,7 @@ def _setReplicaStatus(self, fileID, se, status, connection=False): return res repID = res["Value"] req = "UPDATE FC_Replicas SET Status=%d WHERE RepID=%d" % (statusID, repID) - return self.db._update(req, connection) + return self.db._update(req, conn=connection) def _setReplicaHost(self, fileID, se, newSE, connection=False): connection = self._getConnection(connection) @@ -631,7 +631,7 @@ def _setReplicaHost(self, fileID, se, newSE, connection=False): return res repID = res["Value"] req = "UPDATE FC_Replicas SET SEID=%d WHERE RepID = %d;" % (newSE, repID) - return self.db._update(req, connection) + return self.db._update(req, conn=connection) def _setReplicaParameter(self, fileID, se, paramName, paramValue, connection=False): connection = self._getConnection(connection) @@ -646,7 +646,7 @@ def _setReplicaParameter(self, fileID, se, paramName, paramValue, connection=Fal paramValue, repID, ) - return self.db._update(req, connection) + return self.db._update(req, conn=connection) def _setFileParameter(self, fileID, paramName, paramValue, connection=False): connection = self._getConnection(connection) @@ -665,7 +665,7 @@ def _setFileParameter(self, fileID, paramName, paramValue, connection=False): else: tmpreq = "UPDATE FC_Files %%s WHERE FileID IN (%s)" % fileIDString req = tmpreq % "SET %s='%s'" % (paramName, paramValue) - result = self.db._update(req, connection) + result = self.db._update(req, conn=connection) if not result["OK"]: return result if "select" in fileIDString.lower(): @@ -685,7 +685,7 @@ def _setFileParameter(self, fileID, paramName, paramValue, connection=False): paramValue, fileIDString, ) - return self.db._update(req, connection) + return self.db._update(req, conn=connection) def __getRepIDForReplica(self, fileID, seID, connection=False): connection = self._getConnection(connection) @@ -727,7 +727,7 @@ def _getFileReplicas(self, fileIDs, fields_input=["PFN"], allStatus=False, conne intListToString(fields), intListToString(fileIDDict.keys()), ) - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res for tuple_ in res["Value"]: @@ -777,7 +777,7 @@ def __getFileIDReplicas(self, fileIDs, allStatus=False, connection=False): if result["OK"]: statusIDs.append(result["Value"]) req += " AND Status in (%s)" % (intListToString(statusIDs)) - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res fileIDDict = {} @@ -817,7 +817,7 @@ def _getDirectoryReplicas(self, dirID, allStatus=False, connection=False): if fileStatusIDs: req += " AND FF.Status in (%s)" % intListToString(fileStatusIDs) - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) return result def repairFileTables(self, connection=False): @@ -825,7 +825,7 @@ def repairFileTables(self, connection=False): req = "SELECT F1.FileID, F2.FileID from FC_Files as F1 LEFT JOIN FC_FileInfo as F2 " req += "ON F1.FileID=F2.FileID WHERE F2.FileID IS NULL" - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: return result diff --git a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileManager/FileManagerBase.py b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileManager/FileManagerBase.py index 89b3fc5467a..673335ff57a 100755 --- a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileManager/FileManagerBase.py +++ b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileManager/FileManagerBase.py @@ -36,39 +36,39 @@ def getFileCounters(self, connection=False): resultDict = {} req = "SELECT COUNT(*) FROM FC_Files;" - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res resultDict["Files"] = res["Value"][0][0] req = "SELECT COUNT(FileID) FROM FC_Files WHERE FileID NOT IN ( SELECT FileID FROM FC_Replicas )" - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res resultDict["Files w/o Replicas"] = res["Value"][0][0] req = "SELECT COUNT(RepID) FROM FC_Replicas WHERE FileID NOT IN ( SELECT FileID FROM FC_Files )" - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res resultDict["Replicas w/o Files"] = res["Value"][0][0] treeTable = self.db.dtree.getTreeTable() req = "SELECT COUNT(FileID) FROM FC_Files WHERE DirID NOT IN ( SELECT DirID FROM %s)" % treeTable - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res resultDict["Orphan Files"] = res["Value"][0][0] req = "SELECT COUNT(FileID) FROM FC_Files WHERE FileID NOT IN ( SELECT FileID FROM FC_FileInfo)" - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: resultDict["Files w/o FileInfo"] = 0 else: resultDict["Files w/o FileInfo"] = res["Value"][0][0] req = "SELECT COUNT(FileID) FROM FC_FileInfo WHERE FileID NOT IN ( SELECT FileID FROM FC_Files)" - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: resultDict["FileInfo w/o Files"] = 0 else: @@ -81,7 +81,7 @@ def getReplicaCounters(self, connection=False): connection = self._getConnection(connection) req = "SELECT COUNT(*) FROM FC_Replicas;" - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res return S_OK({"Replicas": res["Value"][0][0]}) @@ -446,7 +446,7 @@ def _insertFileAncestors(self, fileID, ancestorDict, connection=False): req = "INSERT INTO FC_FileAncestors (FileID, AncestorID, AncestorDepth) VALUES %s" % intListToString( ancestorTuples ) - return self.db._update(req, connection) + return self.db._update(req, conn=connection) def _getFileAncestors(self, fileIDs, depths=[], connection=False): connection = self._getConnection(connection) @@ -455,7 +455,7 @@ def _getFileAncestors(self, fileIDs, depths=[], connection=False): ) if depths: req = f"{req} AND AncestorDepth IN ({intListToString(depths)});" - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res fileIDAncestors = {} @@ -473,7 +473,7 @@ def _getFileDescendents(self, fileIDs, depths, connection=False): ) if depths: req = f"{req} AND AncestorDepth IN ({intListToString(depths)});" - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res fileIDAncestors = {} @@ -501,7 +501,7 @@ def addFileAncestors(self, lfns, connection=False): for lfn in result["Value"]["Successful"]: lfns[lfn]["FileID"] = result["Value"]["Successful"][lfn]["FileID"] - result = self._populateFileAncestors(lfns, connection) + result = self._populateFileAncestors(lfns, connection=connection) if not result["OK"]: return result failed.update(result["Value"]["Failed"]) @@ -528,9 +528,9 @@ def _getFileRelatives(self, lfns, depths, relation, connection=False): inputIDs = list(inputIDDict) if relation == "ancestor": - result = self._getFileAncestors(inputIDs, depths, connection) + result = self._getFileAncestors(inputIDs, depths, connection=connection) else: - result = self._getFileDescendents(inputIDs, depths, connection) + result = self._getFileDescendents(inputIDs, depths, connection=connection) if not result["OK"]: return result @@ -557,10 +557,10 @@ def _getFileRelatives(self, lfns, depths, relation, connection=False): return S_OK({"Successful": successful, "Failed": failed}) def getFileAncestors(self, lfns, depths, connection=False): - return self._getFileRelatives(lfns, depths, "ancestor", connection) + return self._getFileRelatives(lfns, depths, "ancestor", connection=connection) def getFileDescendents(self, lfns, depths, connection=False): - return self._getFileRelatives(lfns, depths, "descendent", connection) + return self._getFileRelatives(lfns, depths, "descendent", connection=connection) def _getExistingMetadata(self, lfns, connection=False): connection = self._getConnection(connection) @@ -868,7 +868,7 @@ def exists(self, lfns, connection=False): if guidList: # A dict { guid: lfn to which it is supposed to be associated } guidToGivenLfn = dict(zip(guidList, lfns)) - res = self.getLFNForGUID(guidList, connection) + res = self.getLFNForGUID(guidList, connection=connection) if not res["OK"]: return res guidLfns = res["Value"]["Successful"] @@ -999,7 +999,7 @@ def getReplicas(self, lfns, allStatus, connection=False): for lfn, fileID in res["Value"]["Successful"].items(): fileIDLFNs[fileID] = lfn - result = self.__getReplicasForIDs(fileIDLFNs, allStatus, connection) + result = self.__getReplicasForIDs(fileIDLFNs, allStatus, connection=connection) if not result["OK"]: return result replicas = result["Value"] @@ -1017,7 +1017,7 @@ def getReplicasByMetadata(self, metaDict, path, allStatus, credDict, connection= return result idLfnDict = result["Value"] - result = self.__getReplicasForIDs(idLfnDict, allStatus, connection) + result = self.__getReplicasForIDs(idLfnDict, allStatus, connection=connection) if not result["OK"]: return result replicas = result["Value"] @@ -1059,13 +1059,13 @@ def getReplicaStatus(self, lfns, connection=False): def _getStatusInt(self, status, connection=False): connection = self._getConnection(connection) req = "SELECT StatusID FROM FC_Statuses WHERE Status = '%s';" % status - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res if res["Value"]: return S_OK(res["Value"][0][0]) req = "INSERT INTO FC_Statuses (Status) VALUES ('%s');" % status - res = self.db._update(req, connection) + res = self.db._update(req, conn=connection) if not res["OK"]: return res return S_OK(res["lastRowId"]) @@ -1075,7 +1075,7 @@ def _getIntStatus(self, statusID, connection=False): return S_OK(self.statusDict[statusID]) connection = self._getConnection(connection) req = "SELECT StatusID,Status FROM FC_Statuses" - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res if res["Value"]: @@ -1155,7 +1155,7 @@ def getDirectoryReplicas(self, dirID, path, allStatus=False, connection=False): If False, take the visibleFileStatus and visibleReplicaStatus values from the configuration """ connection = self._getConnection(connection) - result = self._getDirectoryReplicas(dirID, allStatus, connection) + result = self._getDirectoryReplicas(dirID, allStatus, connection=connection) if not result["OK"]: return result diff --git a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileManager/FileManagerFlat.py b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileManager/FileManagerFlat.py index a3901552a44..3a3f981f587 100755 --- a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileManager/FileManagerFlat.py +++ b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileManager/FileManagerFlat.py @@ -54,7 +54,7 @@ def _getDirectoryFiles(self, dirID, fileNames, metadata, allStatus=False, connec req = f"{req} AND Status IN ({intListToString(statusIDs)})" if fileNames: req = f"{req} AND FileName IN ({stringListToString(fileNames)})" - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res files = {} @@ -96,7 +96,7 @@ def _insertFiles(self, lfns, uid, gid, connection=False): ) fields = "DirID,Size,UID,GID,Status,FileName,GUID,Checksum,ChecksumType,CreationDate,ModificationDate,Mode" req = "INSERT INTO FC_Files ({}) VALUES {}".format(fields, ",".join(insertTuples)) - res = self.db._update(req, connection) + res = self.db._update(req, conn=connection) if not res["OK"]: return res # Get the fileIDs for the inserted files @@ -118,7 +118,7 @@ def _getFileIDFromGUID(self, guid, connection=False): if not isinstance(guid, (list, tuple)): guid = [guid] req = "SELECT FileID,GUID FROM FC_Files WHERE GUID IN (%s)" % stringListToString(guid) - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res guidDict = {} @@ -146,14 +146,14 @@ def __deleteFileReplicas(self, fileIDs, connection=False): if not fileIDs: return S_OK() req = "DELETE FROM FC_Replicas WHERE FileID in (%s)" % (intListToString(fileIDs)) - return self.db._update(req, connection) + return self.db._update(req, conn=connection) def __deleteFiles(self, fileIDs, connection=False): connection = self._getConnection(connection) if not fileIDs: return S_OK() req = "DELETE FROM FC_Files WHERE FileID in (%s)" % (intListToString(fileIDs)) - return self.db._update(req, connection) + return self.db._update(req, conn=connection) ###################################################### # @@ -209,7 +209,7 @@ def _insertReplicas(self, lfns, master=False, connection=False): if insertTuples: fields = "FileID,SEID,Status,RepType,CreationDate,ModificationDate,PFN" req = "INSERT INTO FC_Replicas ({}) VALUES {}".format(fields, ",".join(insertTuples.values())) - res = self.db._update(req, connection) + res = self.db._update(req, conn=connection) if not res["OK"]: self.__deleteReplicas(deleteTuples, connection=connection) for lfn in insertTuples.keys(): @@ -231,7 +231,7 @@ def __existsReplica(self, fileID, seID, connection=False): return res seID = res["Value"] req = "SELECT FileID FROM FC_Replicas WHERE FileID=%d AND SEID=%d" % (fileID, seID) - result = self.db._query(req, connection) + result = self.db._query(req, conn=connection) if not result["OK"]: return result if not result["Value"]: @@ -289,7 +289,7 @@ def __deleteReplicas(self, replicaTuples, connection=False): seID = res["Value"] deleteTuples.append("(%d,%d)" % (fileID, seID)) req = "DELETE FROM FC_Replicas WHERE (FileID,SEID) IN (%s)" % intListToString(deleteTuples) - return self.db._update(req, connection) + return self.db._update(req, conn=connection) ###################################################### # @@ -326,7 +326,7 @@ def _setReplicaParameter(self, fileID, seID, paramName, paramValue, connection=F fileID, seID, ) - return self.db._update(req, connection) + return self.db._update(req, conn=connection) def _setFileParameter(self, fileID, paramName, paramValue, connection=False): connection = self._getConnection(connection) @@ -337,7 +337,7 @@ def _setFileParameter(self, fileID, paramName, paramValue, connection=False): paramValue, intListToString(fileID), ) - return self.db._update(req, connection) + return self.db._update(req, conn=connection) ###################################################### # @@ -352,7 +352,7 @@ def _getFileReplicas(self, fileIDs, fields=["PFN"], connection=False): intListToString(fields), intListToString(fileIDs), ) - res = self.db._query(req, connection) + res = self.db._query(req, conn=connection) if not res["OK"]: return res replicas = {} diff --git a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileManager/FileManagerPs.py b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileManager/FileManagerPs.py index 5f78826fd37..ae5e328c423 100755 --- a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileManager/FileManagerPs.py +++ b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/FileManager/FileManagerPs.py @@ -761,7 +761,7 @@ def _setFileParameter(self, fileID, paramName, paramValue, connection=False): paramValue, intListToString(fileID), ) - return self.db._update(req, connection) + return self.db._update(req, conn=connection) return S_OK() diff --git a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/SEManager/SEManagerDB.py b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/SEManager/SEManagerDB.py index a2f2bb8b2a5..c17200c12b8 100644 --- a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/SEManager/SEManagerDB.py +++ b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/SEManager/SEManagerDB.py @@ -58,7 +58,7 @@ def __addSE(self, seName, connection=False): self.lock.release() return S_OK(seid) connection = self.db._getConnection() - res = self.db.insertFields("FC_StorageElements", ["SEName"], [seName], connection) + res = self.db.insertFields("FC_StorageElements", ["SEName"], [seName], conn=connection) if not res["OK"]: gLogger.debug(f"SEManager AddSE lock released. Used {time.time() - waitTime:.3f} seconds. {seName}") self.lock.release() @@ -85,7 +85,7 @@ def __removeSE(self, seName, connection=False): gLogger.debug(f"SEManager RemoveSE lock created. Waited {waitTime - startTime:.3f} seconds. {seName}") seid = self.db.seNames.get(seName, "Missing") req = "DELETE FROM FC_StorageElements WHERE SEName='%s'" % seName - res = self.db._update(req, connection) + res = self.db._update(req, conn=connection) if not res["OK"]: gLogger.debug(f"SEManager RemoveSE lock released. Used {time.time() - waitTime:.3f} seconds. {seName}") self.lock.release() diff --git a/src/DIRAC/ProductionSystem/DB/ProductionDB.py b/src/DIRAC/ProductionSystem/DB/ProductionDB.py index cc96cb5e80a..5a4ae53e1b6 100644 --- a/src/DIRAC/ProductionSystem/DB/ProductionDB.py +++ b/src/DIRAC/ProductionSystem/DB/ProductionDB.py @@ -100,7 +100,7 @@ def addProduction(self, prodName, prodDescription, authorDN, authorGroup, connec % (prodName, prodDescription, authorDN, authorGroup) ) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: self.lock.release() return res @@ -138,7 +138,7 @@ def getProductions( intListToString(self.PRODPARAMS), self.buildCondition(condDict, older, newer, timeStamp, orderAttribute, limit, offset=offset), ) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res @@ -208,7 +208,7 @@ def getProductionStep(self, stepID, connection=False): """ connection = self.__getConnection(connection) req = f"SELECT {intListToString(self.PRODSTEPSPARAMS)} FROM ProductionSteps WHERE StepID = {str(stepID)}" - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res if not res["Value"]: @@ -267,7 +267,7 @@ def addProductionStep( ) ) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: self.lock.release() return res @@ -310,7 +310,7 @@ def getProductionTransformations( intListToString(self.TRANSPARAMS), self.buildCondition(condDict, older, newer, timeStamp, orderAttribute, limit, offset=offset), ) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res @@ -334,7 +334,7 @@ def __setProductionStatus(self, prodID, status, connection=False): :param str status: the Production status """ req = "UPDATE Productions SET Status='%s', LastUpdate=UTC_TIMESTAMP() WHERE ProductionID=%d" % (status, prodID) - return self._update(req, connection) + return self._update(req, conn=connection) # This is to be replaced by startProduction, stopProduction etc. def setProductionStatus(self, prodName, status, connection=False): @@ -422,7 +422,7 @@ def __deleteProduction(self, prodID, connection=False): :param int prodID: ProductionID """ req = "DELETE FROM Productions WHERE ProductionID=%d;" % prodID - return self._update(req, connection) + return self._update(req, conn=connection) def __deleteProductionTransformations(self, prodID, connection=False): """Remove all the transformations of the specified production from the TS and from the PS @@ -438,12 +438,12 @@ def __deleteProductionTransformations(self, prodID, connection=False): # Remove transformations from the PS req = "DELETE FROM ProductionTransformationLinks WHERE ProductionID = %d;" % prodID - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("Failed to delete production transformation links from the PS", res["Message"]) req = "DELETE FROM ProductionTransformations WHERE ProductionID = %d;" % prodID - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("Failed to delete production transformations from the PS", res["Message"]) @@ -585,7 +585,7 @@ def __addTransformationLinks(self, prodID, transIDs, parentTransIDs=None, connec gLogger.notice(req) req = req.rstrip(",") - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: return res @@ -603,7 +603,7 @@ def __addTransformations(self, prodID, transIDs, connection=False): req = "%s (%d,%d,UTC_TIMESTAMP(),UTC_TIMESTAMP())," % (req, prodID, transID) gLogger.notice(req) req = req.rstrip(",") - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: return res @@ -621,7 +621,7 @@ def _getProductionID(self, prodName, connection=False): if not isinstance(prodName, str): return S_ERROR("Production should be ID or name") cmd = "SELECT ProductionID from Productions WHERE ProductionName='%s';" % prodName - res = self._query(cmd, connection) + res = self._query(cmd, conn=connection) if not res["OK"]: gLogger.error("Failed to obtain production ID for production", "{}: {}".format(prodName, res["Message"])) return res diff --git a/src/DIRAC/StorageManagementSystem/DB/StorageManagementDB.py b/src/DIRAC/StorageManagementSystem/DB/StorageManagementDB.py index 6e91e187e16..28c8ae716bc 100644 --- a/src/DIRAC/StorageManagementSystem/DB/StorageManagementDB.py +++ b/src/DIRAC/StorageManagementSystem/DB/StorageManagementDB.py @@ -112,7 +112,7 @@ def __updateTaskStatus(self, taskIDs, newTaskStatus, force=False, connection=Fal intListToString(toUpdate), newTaskStatus, ) - resSelect = self._query(reqSelect, connection) + resSelect = self._query(reqSelect, conn=connection) if not resSelect["OK"]: gLogger.error( "{}.{}_DB: problem retrieving record:".format(self._caller(), "__updateTaskStatus"), @@ -124,7 +124,7 @@ def __updateTaskStatus(self, taskIDs, newTaskStatus, force=False, connection=Fal intListToString(toUpdate), newTaskStatus, ) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: return res @@ -135,7 +135,7 @@ def __updateTaskStatus(self, taskIDs, newTaskStatus, force=False, connection=Fal if len(taskIDs) > 0: reqSelect1 = "SELECT * FROM Tasks WHERE TaskID IN (%s);" % intListToString(taskIDs) - resSelect1 = self._query(reqSelect1, connection) + resSelect1 = self._query(reqSelect1, conn=connection) if not resSelect1["OK"]: gLogger.warn( "%s.%s_DB: problem retrieving records: %s. %s" @@ -177,7 +177,7 @@ def _checkTaskUpdate(self, taskIDs, newTaskState, connection=False): stringListToString(oldTaskState), intListToString(taskIDs), ) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res toUpdate = [row[0] for row in res["Value"]] @@ -200,7 +200,7 @@ def updateReplicaStatus(self, replicaIDs, newReplicaStatus, connection=False): intListToString(toUpdate), newReplicaStatus, ) - resSelect = self._query(reqSelect, connection) + resSelect = self._query(reqSelect, conn=connection) if not resSelect["OK"]: gLogger.error( "{}.{}_DB: problem retrieving record:".format(self._caller(), "updateReplicaStatus"), @@ -211,7 +211,7 @@ def updateReplicaStatus(self, replicaIDs, newReplicaStatus, connection=False): "UPDATE CacheReplicas SET Status='%s',LastUpdate=UTC_TIMESTAMP() WHERE ReplicaID IN (%s) AND Status != '%s';" % (newReplicaStatus, intListToString(toUpdate), newReplicaStatus) ) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: return res @@ -223,7 +223,7 @@ def updateReplicaStatus(self, replicaIDs, newReplicaStatus, connection=False): ) if len(replicaIDs) > 0: reqSelect1 = "SELECT * FROM CacheReplicas WHERE ReplicaID IN (%s);" % intListToString(replicaIDs) - resSelect1 = self._query(reqSelect1, connection) + resSelect1 = self._query(reqSelect1, conn=connection) if not resSelect1["OK"]: gLogger.warn( "%s.%s_DB: problem retrieving records: %s. %s" @@ -249,7 +249,7 @@ def _updateTasksForReplica(self, replicaIDs, connection=False): "SELECT T.TaskID,T.Status FROM Tasks AS T, TaskReplicas AS R WHERE R.ReplicaID IN " "( %s ) AND R.TaskID = T.TaskID GROUP BY T.TaskID, T.Status;" ) % intListToString(replicaIDs) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res @@ -258,7 +258,7 @@ def _updateTasksForReplica(self, replicaIDs, connection=False): "SELECT DISTINCT(C.Status) FROM TaskReplicas AS R, CacheReplicas AS C WHERE R.TaskID=%s AND R.ReplicaID = C.ReplicaID;" % taskId ) - subres = self._query(subreq, connection) + subres = self._query(subreq, conn=connection) if not subres["OK"]: return subres @@ -326,7 +326,7 @@ def _checkReplicaUpdate(self, replicaIDs, newReplicaState, connection=False): stringListToString(oldReplicaState), intListToString(replicaIDs), ) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res toUpdate = [row[0] for row in res["Value"]] @@ -351,7 +351,7 @@ def updateStageRequestStatus(self, replicaIDs, newStageStatus, connection=False) intListToString(toUpdate), newStageStatus, ) - resSelect = self._query(reqSelect, connection) + resSelect = self._query(reqSelect, conn=connection) if not resSelect["OK"]: gLogger.warn( "%s.%s_DB: problem retrieving record: %s. %s" @@ -362,7 +362,7 @@ def updateStageRequestStatus(self, replicaIDs, newStageStatus, connection=False) "UPDATE CacheReplicas SET Status='%s',LastUpdate=UTC_TIMESTAMP() WHERE ReplicaID IN (%s) AND Status != '%s';" % (newStageStatus, intListToString(toUpdate), newStageStatus) ) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: return res @@ -374,7 +374,7 @@ def updateStageRequestStatus(self, replicaIDs, newStageStatus, connection=False) ) reqSelect1 = "SELECT * FROM CacheReplicas WHERE ReplicaID IN (%s);" % intListToString(replicaIDs) - resSelect1 = self._query(reqSelect1, connection) + resSelect1 = self._query(reqSelect1, conn=connection) if not resSelect1["OK"]: gLogger.warn( "%s.%s_DB: problem retrieving records: %s. %s" @@ -411,7 +411,7 @@ def _checkStageUpdate(self, replicaIDs, newStageState, connection=False): oldStageState, intListToString(replicaIDs), ) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res toUpdate = [row[0] for row in res["Value"]] @@ -446,7 +446,7 @@ def getTaskInfo(self, taskID, connection=False): "SELECT TaskID,Status,Source,SubmitTime,CompleteTime,CallBackMethod,SourceTaskID from Tasks WHERE TaskID IN (%s);" % intListToString(taskID) ) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: gLogger.error("StorageManagementDB.getTaskInfo: Failed to get task information.", res["Message"]) return res @@ -497,7 +497,7 @@ def getTaskSummary(self, jobID, connection=False): "SELECT R.LFN,R.SE,R.PFN,R.Size,R.Status,R.LastUpdate,R.Reason FROM CacheReplicas AS R, TaskReplicas AS TR WHERE TR.TaskID in (%s) AND TR.ReplicaID=R.ReplicaID;" % intListToString(taskID) ) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: gLogger.error("StorageManagementDB.getTaskSummary: Failed to get Replica summary for task.", res["Message"]) return res @@ -537,7 +537,7 @@ def getTasks( return res condDict["TaskID"] = res["Value"] req = f"{req} {self.buildCondition(condDict, older, newer, timeStamp, orderAttribute, limit)}" - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res # Cast to list to be able to serialize @@ -578,7 +578,7 @@ def getCacheReplicas( # BUG: limit is ignored unless there is a nonempty condition dictionary OR # older OR newer is nonemtpy req = f"{req} {self.buildCondition(condDict, older, newer, timeStamp, orderAttribute, limit)}" - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res # Cast to list to be able to serialize @@ -617,7 +617,7 @@ def getStageRequests( else: condDict["ReplicaID"] = [-1] req = f"{req} {self.buildCondition(condDict, older, newer, timeStamp, orderAttribute, limit)}" - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res # Cast to list to be able to serialize @@ -634,7 +634,7 @@ def _getTaskReplicaIDs(self, taskIDs, connection=False): if not taskIDs: return S_OK([]) req = "SELECT DISTINCT(ReplicaID) FROM TaskReplicas WHERE TaskID IN (%s);" % intListToString(taskIDs) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res replicaIDs = [row[0] for row in res["Value"]] @@ -644,7 +644,7 @@ def _getReplicaIDTasks(self, replicaIDs, connection=False): if not replicaIDs: return S_OK([]) req = "SELECT DISTINCT(TaskID) FROM TaskReplicas WHERE ReplicaID IN (%s);" % intListToString(replicaIDs) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res taskIDs = [row[0] for row in res["Value"]] @@ -731,14 +731,14 @@ def _createTask(self, source, callbackMethod, sourceTaskID, connection=False): "INSERT INTO Tasks (Source,SubmitTime,CallBackMethod,SourceTaskID) VALUES ('%s',UTC_TIMESTAMP(),'%s','%s');" % (source, callbackMethod, sourceTaskID) ) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("StorageManagementDB._createTask: Failed to create task.", res["Message"]) return res # gLogger.info( "%s_DB:%s" % ('_createTask',req)) taskID = res["lastRowId"] reqSelect = "SELECT * FROM Tasks WHERE TaskID = %s;" % (taskID) - resSelect = self._query(reqSelect, connection) + resSelect = self._query(reqSelect, conn=connection) if not resSelect["OK"]: gLogger.info( "%s.%s_DB: problem retrieving record: %s. %s" @@ -759,7 +759,7 @@ def _getExistingReplicas(self, storageElement, lfns, connection=False): storageElement, stringListToString(lfns), ) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: gLogger.error("StorageManagementDB._getExistingReplicas: Failed to get existing replicas.", res["Message"]) return res @@ -775,7 +775,7 @@ def _insertReplicaInformation(self, lfn, storageElement, rType, connection=False "INSERT INTO CacheReplicas (Type,SE,LFN,PFN,Size,FileChecksum,GUID,SubmitTime,LastUpdate) VALUES ('%s','%s','%s','',0,'','',UTC_TIMESTAMP(),UTC_TIMESTAMP());" % (rType, storageElement, lfn) ) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("_insertReplicaInformation: Failed to insert to CacheReplicas table.", res["Message"]) return res @@ -783,7 +783,7 @@ def _insertReplicaInformation(self, lfn, storageElement, rType, connection=False replicaID = res["lastRowId"] reqSelect = "SELECT * FROM CacheReplicas WHERE ReplicaID = %s;" % (replicaID) - resSelect = self._query(reqSelect, connection) + resSelect = self._query(reqSelect, conn=connection) if not resSelect["OK"]: gLogger.warn( "%s.%s_DB: problem retrieving record: %s. %s" @@ -805,7 +805,7 @@ def _insertTaskReplicaInformation(self, taskID, replicaIDs, connection=False): replicaString = f"({taskID},{replicaID})," req = f"{req} {replicaString}" req = req.rstrip(",") - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error( "StorageManagementDB._insertTaskReplicaInformation: Failed to insert to TaskReplicas table.", @@ -829,7 +829,7 @@ def _insertTaskReplicaInformation(self, taskID, replicaIDs, connection=False): def getStagedReplicas(self, connection=False): connection = self.__getConnection(connection) req = "SELECT TR.TaskID, R.Status, COUNT(*) from TaskReplicas as TR, CacheReplicas as R where TR.ReplicaID=R.ReplicaID GROUP BY TR.TaskID,R.Status;" - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: gLogger.error("StorageManagementDB.getStagedReplicas: Failed to get eligible TaskReplicas", res["Message"]) return res @@ -844,7 +844,7 @@ def getStagedReplicas(self, connection=False): def getWaitingReplicas(self, connection=False): connection = self.__getConnection(connection) req = "SELECT TR.TaskID, R.Status, COUNT(*) from TaskReplicas as TR, CacheReplicas as R where TR.ReplicaID=R.ReplicaID GROUP BY TR.TaskID,R.Status;" - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: gLogger.error("StorageManagementDB.getWaitingReplicas: Failed to get eligible TaskReplicas", res["Message"]) return res @@ -864,7 +864,7 @@ def getWaitingReplicas(self, connection=False): def getOfflineReplicas(self, connection=False): connection = self.__getConnection(connection) req = "SELECT TR.TaskID, R.Status, COUNT(*) from TaskReplicas as TR, CacheReplicas as R where TR.ReplicaID=R.ReplicaID GROUP BY TR.TaskID,R.Status;" - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: gLogger.error("StorageManagementDB.getOfflineReplicas: Failed to get eligible TaskReplicas", res["Message"]) return res @@ -1147,7 +1147,7 @@ def wakeupOldRequests(self, replicaIDs, retryInterval, connection=False): "UPDATE CacheReplicas SET Status='New',LastUpdate = UTC_TIMESTAMP(), Reason = 'wakeupOldRequests' WHERE ReplicaID in (%s);" % intListToString(old_replicaIDs) ) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error( "StorageManagementDB.wakeupOldRequests: Failed to roll CacheReplicas back to Status=New.", @@ -1156,7 +1156,7 @@ def wakeupOldRequests(self, replicaIDs, retryInterval, connection=False): return res req = "DELETE FROM StageRequests WHERE ReplicaID in (%s);" % intListToString(old_replicaIDs) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("StorageManagementDB.wakeupOldRequests. Problem removing entries from StageRequests.") return res @@ -1260,7 +1260,7 @@ def killTasksBySourceTaskID(self, sourceTaskIDs, connection=False): if replicaIDs: req = "DELETE FROM StageRequests WHERE ReplicaID IN (%s);" % intListToString(replicaIDs) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error( "%s.%s_DB: problem removing records: %s. %s" @@ -1268,7 +1268,7 @@ def killTasksBySourceTaskID(self, sourceTaskIDs, connection=False): ) req = "DELETE FROM CacheReplicas WHERE ReplicaID in (%s) AND Links=1;" % intListToString(replicaIDs) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error( "%s.%s_DB: problem removing records: %s. %s" @@ -1276,13 +1276,13 @@ def killTasksBySourceTaskID(self, sourceTaskIDs, connection=False): ) # Finally, remove the Task and TaskReplicas entries. - res = self.removeTasks(taskIDs, connection) + res = self.removeTasks(taskIDs, connection=connection) return res def removeStageRequests(self, replicaIDs, connection=False): connection = self.__getConnection(connection) req = "DELETE FROM StageRequests WHERE ReplicaID in (%s);" % intListToString(replicaIDs) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("StorageManagementDB.removeStageRequests. Problem removing entries from StageRequests.") return res @@ -1292,7 +1292,7 @@ def removeTasks(self, taskIDs, connection=False): """This will delete the entries from the TaskReplicas for the provided taskIDs.""" connection = self.__getConnection(connection) req = "DELETE FROM TaskReplicas WHERE TaskID IN (%s);" % intListToString(taskIDs) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("StorageManagementDB.removeTasks. Problem removing entries from TaskReplicas.") return res @@ -1309,7 +1309,7 @@ def removeTasks(self, taskIDs, connection=False): gLogger.verbose("{}.{}_DB: to_delete Tasks = {}".format(self._caller(), "removeTasks", record)) req = "DELETE FROM Tasks WHERE TaskID in (%s);" % intListToString(taskIDs) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("StorageManagementDB.removeTasks. Problem removing entries from Tasks.") gLogger.verbose("{}.{}_DB: deleted Tasks".format(self._caller(), "removeTasks")) @@ -1324,7 +1324,7 @@ def setOldTasksAsFailed(self, daysOld, connection=False): req = "UPDATE Tasks SET Status='Failed' WHERE DATE_ADD(SubmitTime, INTERVAL %s DAY ) < UTC_TIMESTAMP();" % ( daysOld ) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("StorageManagementDB.setOldTasksAsFailed. Problem setting old Tasks to Failed.") return res @@ -1336,7 +1336,7 @@ def getCacheReplicasSummary(self, connection=False): """ connection = self.__getConnection(connection) req = "SELECT DISTINCT(Status),SE,COUNT(*),sum(size)/(1024*1024*1024) FROM CacheReplicas GROUP BY Status,SE;" - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: gLogger.error("StorageManagementDB.getCacheReplicasSummary failed.") return res @@ -1357,7 +1357,7 @@ def removeUnlinkedReplicas(self, connection=False): # First, check if there is a StageRequest and PinExpiryTime has arrived req = "select SR.ReplicaID from CacheReplicas CR,StageRequests SR WHERE CR.Links = 0 and CR.ReplicaID=SR.ReplicaID group by SR.ReplicaID HAVING max(SR.PinExpiryTime) < UTC_TIMESTAMP();" # req = "SELECT ReplicaID from CacheReplicas WHERE Links = 0;" - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: gLogger.error( "StorageManagementDB.removeUnlinkedReplicas. Problem selecting entries from CacheReplicas where Links = 0." @@ -1369,7 +1369,7 @@ def removeUnlinkedReplicas(self, connection=False): # as they were not staged successfully (for various reasons), even though # a staging request had been submitted req = "SELECT ReplicaID FROM CacheReplicas WHERE Links = 0 AND Status = 'Failed';" - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: gLogger.error( "StorageManagementDB.removeUnlinkedReplicas. Problem selecting entries from CacheReplicas where Links = 0 AND Status=Failed." @@ -1395,7 +1395,7 @@ def removeUnlinkedReplicas(self, connection=False): ) req = "DELETE FROM StageRequests WHERE ReplicaID IN (%s);" % intListToString(replicaIDs) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("StorageManagementDB.removeUnlinkedReplicas. Problem deleting from StageRequests.") return res @@ -1408,7 +1408,7 @@ def removeUnlinkedReplicas(self, connection=False): # Second look for CacheReplicas for which there is no entry in StageRequests req = "SELECT ReplicaID FROM CacheReplicas WHERE Links = 0 AND ReplicaID NOT IN ( SELECT DISTINCT( ReplicaID ) FROM StageRequests )" - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: gLogger.error( "StorageManagementDB.removeUnlinkedReplicas. Problem selecting entries from CacheReplicas where Links = 0." @@ -1434,7 +1434,7 @@ def removeUnlinkedReplicas(self, connection=False): ) req = "DELETE FROM CacheReplicas WHERE ReplicaID IN (%s) AND Links= 0;" % intListToString(replicaIDs) - res = self._update(req, connection) + res = self._update(req, conn=connection) if res["OK"]: gLogger.verbose("{}.{}_DB: deleted CacheReplicas".format(self._caller(), "removeUnlinkedReplicas")) gLogger.debug( diff --git a/src/DIRAC/TransformationSystem/DB/TransformationDB.py b/src/DIRAC/TransformationSystem/DB/TransformationDB.py index 065b31511ad..1f5599281c2 100755 --- a/src/DIRAC/TransformationSystem/DB/TransformationDB.py +++ b/src/DIRAC/TransformationSystem/DB/TransformationDB.py @@ -187,7 +187,7 @@ def addTransformation( eventsPerTask, ) ) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: self.lock.release() return res @@ -292,7 +292,7 @@ def getTransformations( intListToString(columns), self.buildCondition(condDict, older, newer, timeStamp, orderAttribute, limit, offset=offset), ) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res if condDict is None: @@ -406,13 +406,13 @@ def __updateTransformationParameter(self, transID, paramName, paramValue, connec paramValue, transID, ) - return self._update(req, connection) + return self._update(req, conn=connection) req = "UPDATE Transformations SET %s='%s', LastUpdate=UTC_TIMESTAMP() WHERE TransformationID=%d" % ( paramName, paramValue, transID, ) - return self._update(req, connection) + return self._update(req, conn=connection) def _getTransformationID(self, transName, connection=False): """Method returns ID of transformation with the name=""" @@ -423,7 +423,7 @@ def _getTransformationID(self, transName, connection=False): if not isinstance(transName, str): return S_ERROR("Transformation should be ID or name") cmd = "SELECT TransformationID from Transformations WHERE TransformationName='%s';" % transName - res = self._query(cmd, connection) + res = self._query(cmd, conn=connection) if not res["OK"]: gLogger.error( "Failed to obtain transformation ID for transformation", "{}: {}".format(transName, res["Message"]) @@ -436,7 +436,7 @@ def _getTransformationID(self, transName, connection=False): def __deleteTransformation(self, transID, connection=False): req = "DELETE FROM Transformations WHERE TransformationID=%d;" % transID - return self._update(req, connection) + return self._update(req, conn=connection) def __updateFilterQueries(self, connection=False): """Get filters for all defined input streams in all the transformations.""" @@ -516,7 +516,7 @@ def deleteTransformationParameter(self, transName, paramName, author="", connect def __addAdditionalTransformationParameter(self, transID, paramName, paramValue, connection=False): req = "DELETE FROM AdditionalParameters WHERE TransformationID=%d AND ParameterName='%s'" % (transID, paramName) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: return res res = self._escapeString(paramValue) @@ -533,14 +533,14 @@ def __addAdditionalTransformationParameter(self, transID, paramName, paramValue, paramValue, paramType, ) - return self._update(req, connection) + return self._update(req, conn=connection) def __getAdditionalParameters(self, transID, connection=False): req = "SELECT %s FROM AdditionalParameters WHERE TransformationID = %d" % ( ", ".join(self.ADDITIONALPARAMETERS), transID, ) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res paramDict = {} @@ -557,7 +557,7 @@ def __deleteTransformationParameters(self, transID, parameters=None, connection= req = "DELETE FROM AdditionalParameters WHERE TransformationID=%d" % transID if parameters: req = f"{req} AND ParameterName IN ({stringListToString(parameters)});" - return self._update(req, connection) + return self._update(req, conn=connection) ########################################################################### # @@ -630,7 +630,7 @@ def getTransformationFiles( req, self.buildCondition(condDict, older, newer, timeStamp, orderAttribute, limit, offset=offset), ) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res @@ -700,7 +700,7 @@ def setFileStatusForTransformation(self, transID, fileStatusDict=None, connectio else: req += " ON DUPLICATE KEY UPDATE Status=VALUES(Status),LastUpdate=VALUES(LastUpdate)" - result = self._update(req, connection) + result = self._update(req, conn=connection) if not result["OK"]: return result return S_OK() @@ -741,7 +741,7 @@ def getTransformationFilesCount(self, transName, field, selection=None, connecti def __addFilesToTransformation(self, transID, fileIDs, connection=False): req = "SELECT FileID from TransformationFiles" req = req + " WHERE TransformationID = %d AND FileID IN (%s);" % (transID, intListToString(fileIDs)) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res for tupleIn in res["Value"]: @@ -752,7 +752,7 @@ def __addFilesToTransformation(self, transID, fileIDs, connection=False): for fileID in fileIDs: req = "%s (%d,%d,UTC_TIMESTAMP(),UTC_TIMESTAMP())," % (req, transID, fileID) req = req.rstrip(",") - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: return res return S_OK(fileIDs) @@ -794,7 +794,7 @@ def __insertExistingTransformationFiles(self, transID, fileTuplesList, connectio continue req = req.rstrip(",") - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: return res @@ -809,28 +809,28 @@ def __assignTransformationFile(self, transID, taskID, se, fileIDs, connection=Fa transID, intListToString(fileIDs), ) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("Failed to assign file to task", res["Message"]) fileTuples = [] for fileID in fileIDs: fileTuples.append("(%d,%d,%d)" % (transID, fileID, taskID)) req = "INSERT INTO TransformationFileTasks (TransformationID,FileID,TaskID) VALUES %s" % ",".join(fileTuples) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("Failed to assign file to task", res["Message"]) return res def __setTransformationFileStatus(self, fileIDs, status, connection=False): req = f"UPDATE TransformationFiles SET Status = '{status}' WHERE FileID IN ({intListToString(fileIDs)});" - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("Failed to update file status", res["Message"]) return res def __setTransformationFileUsedSE(self, fileIDs, usedSE, connection=False): req = f"UPDATE TransformationFiles SET UsedSE = '{usedSE}' WHERE FileID IN ({intListToString(fileIDs)});" - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("Failed to update file usedSE", res["Message"]) return res @@ -841,7 +841,7 @@ def __resetTransformationFile(self, transID, taskID, connection=False): WHERE TransformationID = %d AND TaskID=%d;" % (transID, taskID) ) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("Failed to reset transformation file", res["Message"]) return res @@ -866,7 +866,7 @@ def __deleteTransformationFiles(self, transID, connection=False): WHERE TransformationID = %d;" % transID ) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("Failed to delete transformation files", res["Message"]) return res @@ -881,12 +881,12 @@ def __deleteTransformationFileTask(self, transID, taskID, connection=False): from the TransformationFileTasks table for transformation with TransformationID and TaskID """ req = "DELETE FROM TransformationFileTasks WHERE TransformationID=%d AND TaskID=%d" % (transID, taskID) - return self._update(req, connection) + return self._update(req, conn=connection) def __deleteTransformationFileTasks(self, transID, connection=False): """Remove all associations between files, tasks and a transformation""" req = "DELETE FROM TransformationFileTasks WHERE TransformationID = %d;" % transID - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: gLogger.error("Failed to delete transformation files/task history", res["Message"]) return res @@ -913,7 +913,7 @@ def getTransformationTasks( intListToString(self.TASKSPARAMS), self.buildCondition(condDict, older, newer, timeStamp, orderAttribute, limit, offset=offset), ) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res if condDict is None: @@ -1087,22 +1087,22 @@ def getTransformationTaskStats(self, transName="", connection=False): def __setTaskParameterValue(self, transID, taskID, paramName, paramValue, connection=False): req = f"UPDATE TransformationTasks SET {paramName}='{paramValue}', LastUpdateTime=UTC_TIMESTAMP()" req = req + " WHERE TransformationID=%d AND TaskID=%d;" % (transID, taskID) - return self._update(req, connection) + return self._update(req, conn=connection) def __deleteTransformationTasks(self, transID, connection=False): """Delete all the tasks from the TransformationTasks table for transformation with TransformationID""" req = "DELETE FROM TransformationTasks WHERE TransformationID=%d" % transID - return self._update(req, connection) + return self._update(req, conn=connection) def __deleteTransformationTask(self, transID, taskID, connection=False): """Delete the task from the TransformationTasks table for transformation with TransformationID""" req = "DELETE FROM TransformationTasks WHERE TransformationID=%d AND TaskID=%d" % (transID, taskID) - return self._update(req, connection) + return self._update(req, conn=connection) def __deleteTransformationMetaQueries(self, transID, connection=False): """Delete all the meta queries from the TransformationMetaQueries table for transformation with TransformationID""" req = "DELETE FROM TransformationMetaQueries WHERE TransformationID=%d" % transID - return self._update(req, connection) + return self._update(req, conn=connection) #################################################################### # @@ -1173,7 +1173,7 @@ def deleteTransformationMetaQuery(self, transName, queryType, author="", connect return S_ERROR("Failed to parse the transformation query type") queryType = res["Value"] req = "DELETE FROM TransformationMetaQueries WHERE TransformationID=%d AND QueryType=%s;" % (transID, queryType) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: return res if res["Value"]: @@ -1202,7 +1202,7 @@ def getTransformationMetaQuery(self, transName, queryType, connection=False): queryType = res["Value"] req = "SELECT MetaDataName,MetaDataValue,MetaDataType FROM TransformationMetaQueries" req = req + " WHERE TransformationID=%d AND QueryType=%s;" % (transID, queryType) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res queryDict = {} @@ -1254,7 +1254,7 @@ def __insertTaskInputs(self, transID, taskID, lfns, connection=False): vector = str.join(";", lfns) fields = ["TransformationID", "TaskID", "InputVector"] values = [transID, taskID, vector] - res = self.insertFields("TaskInputs", fields, values, connection) + res = self.insertFields("TaskInputs", fields, values, conn=connection) if not res["OK"]: gLogger.error("Failed to add input vector to task %d" % taskID) return res @@ -1264,7 +1264,7 @@ def __deleteTransformationTaskInputs(self, transID, taskID=0, connection=False): req = "DELETE FROM TaskInputs WHERE TransformationID=%d" % transID if taskID: req = "%s AND TaskID=%d" % (req, int(taskID)) - return self._update(req, connection) + return self._update(req, conn=connection) ########################################################################### # @@ -1284,7 +1284,7 @@ def __updateTransformationLogging(self, transName, message, authorDN, connection transID = res["Value"]["TransformationID"] req = "INSERT INTO TransformationLog (TransformationID,Message,Author,MessageDate)" req = req + f" VALUES ({transID},'{message}','{authorDN}',UTC_TIMESTAMP());" - return self._update(req, connection) + return self._update(req, conn=connection) def getTransformationLogging(self, transName, connection=False): """Get logging info from the TransformationLog table""" @@ -1311,7 +1311,7 @@ def getTransformationLogging(self, transName, connection=False): def __deleteTransformationLog(self, transID, connection=False): """Remove the entries in the transformation log for a transformation""" req = "DELETE FROM TransformationLog WHERE TransformationID=%d;" % transID - return self._update(req, connection) + return self._update(req, conn=connection) ########################################################################### # @@ -1320,7 +1320,7 @@ def __deleteTransformationLog(self, transID, connection=False): def __getAllFileIDs(self, connection=False): """Get all the fileIDs for the supplied list of lfns""" req = "SELECT LFN,FileID FROM DataFiles;" - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res fids = {} @@ -1335,7 +1335,7 @@ def __getFileIDsForLfns(self, lfns, connection=False): warning: if the file is not present, we'll see no errors """ req = "SELECT LFN,FileID FROM DataFiles WHERE LFN in (%s);" % (stringListToString(lfns)) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res lfns = dict(res["Value"]) @@ -1346,7 +1346,7 @@ def __getFileIDsForLfns(self, lfns, connection=False): def __getLfnsForFileIDs(self, fileIDs, connection=False): """Get lfns for the given list of fileIDs""" req = "SELECT LFN,FileID FROM DataFiles WHERE FileID in (%s);" % stringListToString(fileIDs) - res = self._query(req, connection) + res = self._query(req, conn=connection) if not res["OK"]: return res fids = dict(res["Value"]) @@ -1363,7 +1363,7 @@ def __addDataFiles(self, lfns, connection=False): lfnFileIDs = res["Value"][1] for lfn in set(lfns) - set(lfnFileIDs): req = "INSERT INTO DataFiles (LFN,Status) VALUES ('%s','New');" % lfn - res = self._update(req, connection) + res = self._update(req, conn=connection) # If the LFN is duplicate we get an error and ignore it if res["OK"]: lfnFileIDs[lfn] = res["lastRowId"] @@ -1372,7 +1372,7 @@ def __addDataFiles(self, lfns, connection=False): def __setDataFileStatus(self, fileIDs, status, connection=False): """Set the status of the supplied files""" req = f"UPDATE DataFiles SET Status = '{status}' WHERE FileID IN ({intListToString(fileIDs)});" - return self._update(req, connection) + return self._update(req, conn=connection) ########################################################################### # @@ -1417,7 +1417,7 @@ def addTaskForTransformation(self, transID, lfns=None, se="Unknown", connection= req = "INSERT INTO TransformationTasks(TransformationID, ExternalStatus, ExternalID, TargetSE," req = req + " CreationTime, LastUpdateTime)" req = req + " VALUES (%s,'%s','%d','%s', UTC_TIMESTAMP(), UTC_TIMESTAMP());" % (transID, "Created", 0, se) - res = self._update(req, connection) + res = self._update(req, conn=connection) if not res["OK"]: self.lock.release() gLogger.error("Failed to publish task for transformation", res["Message"]) @@ -1426,7 +1426,7 @@ def addTaskForTransformation(self, transID, lfns=None, se="Unknown", connection= # TaskID is computed by a trigger, which sets the local variable @last (per connection) # @last is the last insert TaskID. With multi-row inserts, will be the first new TaskID inserted. # The trigger TaskID_Generator must be present with the InnoDB schema (defined in TransformationDB.sql) - res = self._query("SELECT @last;", connection) + res = self._query("SELECT @last;", conn=connection) self.lock.release() if not res["OK"]: @@ -1536,7 +1536,7 @@ def __checkUpdate(self, table, param, paramValue, selectDict=None, connection=Fa req = f"UPDATE {table} SET {param} = '{paramValue}'" if selectDict: req = f"{req} {self.buildCondition(selectDict)}" - return self._update(req, connection) + return self._update(req, conn=connection) def __getConnection(self, connection): if connection: From 90906e4e6dbd764e2c9196ff9bb1da95a2fa6a76 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Fri, 19 Aug 2022 22:54:06 +0200 Subject: [PATCH 4/4] fix (DFC): wrong use of connection pool --- src/DIRAC/Core/Utilities/MySQL.py | 2 +- .../FileCatalogComponents/SEManager/SEManagerDB.py | 13 +++++++++++-- src/DIRAC/DataManagementSystem/DB/FileCatalogDB.py | 2 +- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/DIRAC/Core/Utilities/MySQL.py b/src/DIRAC/Core/Utilities/MySQL.py index 0f92977ad44..310ab68dad7 100755 --- a/src/DIRAC/Core/Utilities/MySQL.py +++ b/src/DIRAC/Core/Utilities/MySQL.py @@ -1096,7 +1096,7 @@ def countEntries( return S_ERROR(DErrno.EMYSQL, x) cmd = f"SELECT COUNT(*) FROM {table} {cond}" - res = self._query(cmd, cconn=onnection) + res = self._query(cmd, conn=connection) if not res["OK"]: return res diff --git a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/SEManager/SEManagerDB.py b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/SEManager/SEManagerDB.py index c17200c12b8..69e3f9dfcf8 100644 --- a/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/SEManager/SEManagerDB.py +++ b/src/DIRAC/DataManagementSystem/DB/FileCatalogComponents/SEManager/SEManagerDB.py @@ -47,6 +47,15 @@ def _refreshSEs(self, connection=False): self.lock.release() return S_OK() + def _getConnection(self, connection): + if connection: + return connection + res = self.db._getConnection() + if res["OK"]: + return res["Value"] + gLogger.warn("Failed to get MySQL connection", res["Message"]) + return connection + def __addSE(self, seName, connection=False): startTime = time.time() self.lock.acquire() @@ -57,7 +66,7 @@ def __addSE(self, seName, connection=False): gLogger.debug(f"SEManager AddSE lock released. Used {time.time() - waitTime:.3f} seconds. {seName}") self.lock.release() return S_OK(seid) - connection = self.db._getConnection() + connection = self._getConnection(connection) res = self.db.insertFields("FC_StorageElements", ["SEName"], [seName], conn=connection) if not res["OK"]: gLogger.debug(f"SEManager AddSE lock released. Used {time.time() - waitTime:.3f} seconds. {seName}") @@ -78,7 +87,7 @@ def __addSE(self, seName, connection=False): return S_OK(seid) def __removeSE(self, seName, connection=False): - connection = self.db._getConnection() + connection = self._getConnection(connection) startTime = time.time() self.lock.acquire() waitTime = time.time() diff --git a/src/DIRAC/DataManagementSystem/DB/FileCatalogDB.py b/src/DIRAC/DataManagementSystem/DB/FileCatalogDB.py index 4ed8ae1c187..4fc41918f24 100755 --- a/src/DIRAC/DataManagementSystem/DB/FileCatalogDB.py +++ b/src/DIRAC/DataManagementSystem/DB/FileCatalogDB.py @@ -445,7 +445,7 @@ def setFileStatus(self, lfns, credDict): if not res["Value"]["Successful"]: return S_OK({"Successful": {}, "Failed": failed}) - res = self.fileManager.setFileStatus(res["Value"]["Successful"], credDict) + res = self.fileManager.setFileStatus(res["Value"]["Successful"]) if not res["OK"]: return res failed.update(res["Value"]["Failed"])