Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ DIRAC_M2CRYPTO_SSL_CIPHERS
DIRAC_M2CRYPTO_SSL_METHODS
If set, overwrites the default SSL methods accepted. It should be a colon separated list. See :py:mod:`DIRAC.Core.DISET`

DIRAC_MYSQL_OPTIMIZER_TRACES_PATH
If set, it should point to an existing directory, where MySQL Optimizer traces will be stored. See :py:func:`DIRAC.Core.Utilities.MySQL.captureOptimizerTraces`

DIRAC_NO_CFG
If set to anything, cfg files on the command line must be passed to the command using the --cfg option.

Expand Down
4 changes: 2 additions & 2 deletions src/DIRAC/AccountingSystem/DB/AccountingDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,10 +510,10 @@ def __addKeyValue(self, typeName, keyName, keyValue):
return retVal
connection = retVal["Value"]
self.log.info(f"Value {keyValue} for key {keyName} didn't exist, inserting")
retVal = self.insertFields(keyTable, ["id", "value"], [0, keyValue], connection)
retVal = self.insertFields(keyTable, ["id", "value"], [0, keyValue], conn=connection)
if not retVal["OK"] and retVal["Message"].find("Duplicate key") == -1:
return retVal
result = self.__getIdForKeyValue(typeName, keyName, keyValue, connection)
result = self.__getIdForKeyValue(typeName, keyName, keyValue, conn=connection)
if not result["OK"]:
return result
keyCache[keyValue] = result["Value"]
Expand Down
196 changes: 168 additions & 28 deletions src/DIRAC/Core/Utilities/MySQL.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@

"""
import collections
import functools
import json
import os
import time
import threading
import MySQLdb
Expand Down Expand Up @@ -198,6 +201,127 @@ def _quotedList(fieldList=None):
return ", ".join(quotedFields)


def captureOptimizerTraces(meth):
"""If enabled, this will dump the optimizer trace for each query performed.
Obviously, it has a performance cost...

In order to enable the tracing, the environment variable ``DIRAC_MYSQL_OPTIMIZER_TRACES_PATH``
should be set and point to an existing directory where the files will be stored.

It makes sense to enable it when preparing the migration to a newer major version
of mysql: you run your integration tests (or whever scenario you prepared) with the old version,
then the same tests with the new version, and compare the output files.

The file produced are called "optimizer_trace_<timestamp>_<hash>.json"
The hash is here to minimize the risk of concurence for the same file.
The timestamp is to maintain the execution order. For easier comparison between two executions,
you can rename the files with a sequence number.

.. code-block:: bash

cd ${DIRAC_MYSQL_OPTIMIZER_TRACES_PATH}
c=0; for i in $(ls); do newFn=$(echo $i | sed -E "s/_trace_[0-9]+.[0-9]+_(.*)/_trace_${c}_\1/g"); mv $i $newFn; c=$(( c + 1 )); done
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
c=0; for i in $(ls); do newFn=$(echo $i | sed -E "s/_trace_[0-9]+.[0-9]+_(.*)/_trace_${c}_\1/g"); mv $i $newFn; c=$(( c + 1 )); done
c=0; for i in *; do newFn=$(echo $i | sed -E "s/_trace_[0-9]+.[0-9]+_(.*)/_trace_${c}_\1/g"); mv $i $newFn; c=$(( c + 1 )); done


This tool is useful then to compare the files https://github.com/cosmicanant/recursive-diff

Note that this method is far from pretty:

* error handling is not really done. Of course, I could add a lot of safety and try/catch and what not,
but if you are using this, it means you reaaaally want to profile something. And in that case, you want things
to go smoothly. And if they don't, you want to see it, and you want it to crash.
* it mangles a bit with the connection pool to be able to capture the traces

All the docs related to the optimizer tracing is available here https://dev.mysql.com/doc/internals/en/optimizer-tracing.html

The generated file contains one of the following:

* ``{"EmptyTrace": arguments}``: some method like "show tables" do not generate a trace
* A list of dictionaries, one per trace for the specific call:

* ``{ "Query": <query executed>, "Trace" : <optimizer analysis>}`` if all is fine
* ``{"Error": <the error>}`` in case something goes wrong. See the lower in the code
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* ``{"Error": <the error>}`` in case something goes wrong. See the lower in the code
* ``{"Error": <the error>}`` in case something goes wrong. See below in this function

Assuming this is what you meant (lines 294ff)

for the description of errors


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add something like this, (if I understand correctly)?

Suggested change
To make use of this functionality in a database implementation, calls to :func:`~DIRAC.Core.Utilities.MySQL.MySQL._query`, :func:`~DIRAC.Core.Utilities.MySQL.MySQL_update`, and :func:`~DIRAC.Core.Utilities.MySQL.MySQL.executeStoredProcedure` must explicitly make use of the ``conn`` keyword-argument.

"""

optimizerTracingFolder = os.environ.get("DIRAC_MYSQL_OPTIMIZER_TRACES_PATH")

@functools.wraps(meth)
def innerMethod(self, *args, **kwargs):

# First, get a connection to the DB, and enable the tracing
connection = self.__connectionPool.get(self.__dbName)
connection.cursor().execute('SET optimizer_trace="enabled=on";')
# We also set some options that worked for my use case.
# you may need to tune these parameters if you have huge traces
# or more recursive calls.
# I doubt it though....

connection.cursor().execute(
"SET optimizer_trace_offset=0, optimizer_trace_limit=20, optimizer_trace_max_mem_size=131072;"
)

# Because we can only trace on a per session base, give the same connection object
# to the actual method
kwargs["conn"] = connection

# Execute the method
res = meth(self, *args, **kwargs)

# Turn of the tracing
connection.cursor().execute('SET optimizer_trace="enabled=off";')

# Get all the traces
cursor = connection.cursor()
if cursor.execute("SELECT * FROM INFORMATION_SCHEMA.OPTIMIZER_TRACE;"):
queryTraces = cursor.fetchall()
else:
queryTraces = ()

# Generate a filename stored in DIRAC_MYSQL_OPTIMIZER_TRACES_PATH
methHash = hash(f"{args},{kwargs}")
# optimizer_trace_<timestamp>_<hash>.json
jsonFn = os.path.join(optimizerTracingFolder, f"optimizer_trace_{time.time()}_{methHash}.json")

with open(jsonFn, "wt") as f:
# Some calls do not generate a trace, like "show tables"
if not queryTraces:
json.dump({"EmptyTrace": args}, f)

jsonTraces = []

for trace_query, trace_analysis, trace_missingBytes, trace_privilegeError in queryTraces:
# if trace_privilegeError is True, it's a permission error
# https://dev.mysql.com/doc/internals/en/privilege-checking.html
# It may particularly happen with stored procedures.
# Although it is not really a good practice, for profiling purposes,
# I would go with the no brainer solution: GRANT ALL ON <yourDB>.* TO 'Dirac'@'%';
if trace_privilegeError:
# f.write(f"ERROR: {args}")
jsonTraces.append({"Error": f"PrivilegeError: {args}"})
continue

# The memory is not large enough to store all the traces, so it is truncated
# https://dev.mysql.com/doc/internals/en/tracing-memory-usage.html
if trace_missingBytes:
jsonTraces.append({"Error": f"MissingBytes {trace_missingBytes}"})
continue

jsonTraces.append(
{"Query": trace_query, "Trace": json.loads(trace_analysis) if trace_analysis else None}
)

json.dump(jsonTraces, f)

return res

if optimizerTracingFolder:
return innerMethod
else:
return meth


class ConnectionPool:
"""
Management of connections per thread
Expand Down Expand Up @@ -583,7 +707,8 @@ def _connect(self):
self._connected = True
return S_OK()

def _query(self, cmd, conn=None, debug=True):
@captureOptimizerTraces
def _query(self, cmd, *, conn=None, debug=True):
"""
execute MySQL query command

Expand All @@ -596,10 +721,13 @@ def _query(self, cmd, conn=None, debug=True):

self.log.debug("_query: %s" % self._safeCmd(cmd))

retDict = self._getConnection()
if not retDict["OK"]:
return retDict
connection = retDict["Value"]
if conn:
connection = conn
else:
retDict = self._getConnection()
if not retDict["OK"]:
return retDict
connection = retDict["Value"]

try:
cursor = connection.cursor()
Expand Down Expand Up @@ -627,7 +755,8 @@ def _query(self, cmd, conn=None, debug=True):

return retDict

def _update(self, cmd, conn=None, debug=True):
@captureOptimizerTraces
def _update(self, cmd, *, conn=None, debug=True):
"""execute MySQL update command

:param debug: print or not the errors
Expand All @@ -637,11 +766,13 @@ def _update(self, cmd, conn=None, debug=True):
"""

self.log.debug("_update: %s" % self._safeCmd(cmd))

retDict = self._getConnection()
if not retDict["OK"]:
return retDict
connection = retDict["Value"]
if conn:
connection = conn
else:
retDict = self._getConnection()
if not retDict["OK"]:
return retDict
connection = retDict["Value"]

try:
cursor = connection.cursor()
Expand Down Expand Up @@ -965,7 +1096,7 @@ def countEntries(
return S_ERROR(DErrno.EMYSQL, x)

cmd = f"SELECT COUNT(*) FROM {table} {cond}"
res = self._query(cmd, connection)
res = self._query(cmd, conn=connection)
if not res["OK"]:
return res

Expand Down Expand Up @@ -1008,7 +1139,7 @@ def getCounters(
return S_ERROR(DErrno.EMYSQL, x)

cmd = f"SELECT {attrNames}, COUNT(*) FROM {table} {cond} GROUP BY {attrNames} ORDER BY {attrNames}"
res = self._query(cmd, connection)
res = self._query(cmd, conn=connection)
if not res["OK"]:
return res

Expand Down Expand Up @@ -1057,7 +1188,7 @@ def getDistinctAttributeValues(
return S_ERROR(DErrno.EMYSQL, exc)

cmd = f"SELECT DISTINCT( {attributeName} ) FROM {table} {cond} ORDER BY {attributeName}"
res = self._query(cmd, connection)
res = self._query(cmd, conn=connection)
if not res["OK"]:
return res
attr_list = [x[0] for x in res["Value"]]
Expand Down Expand Up @@ -1290,7 +1421,7 @@ def getFields(
except Exception as x:
return S_ERROR(DErrno.EMYSQL, x)

return self._query(f"SELECT {quotedOutFields} FROM {table} {condition}", conn)
return self._query(f"SELECT {quotedOutFields} FROM {table} {condition}", conn=conn)

#############################################################################
def deleteEntries(
Expand Down Expand Up @@ -1334,7 +1465,7 @@ def deleteEntries(
except Exception as x:
return S_ERROR(DErrno.EMYSQL, x)

return self._update(f"DELETE FROM {table} {condition}", conn)
return self._update(f"DELETE FROM {table} {condition}", conn=conn)

#############################################################################
def updateFields(
Expand Down Expand Up @@ -1420,7 +1551,7 @@ def updateFields(
[f"{_quotedList([updateFields[k]])} = {updateValues[k]}" for k in range(len(updateFields))]
)

return self._update(f"UPDATE {table} SET {updateString} {condition}", conn)
return self._update(f"UPDATE {table} SET {updateString} {condition}", conn=conn)

#############################################################################
def insertFields(self, tableName, inFields=None, inValues=None, conn=None, inDict=None):
Expand Down Expand Up @@ -1475,14 +1606,18 @@ def insertFields(self, tableName, inFields=None, inValues=None, conn=None, inDic
# self.log.debug('insertFields:', 'inserting %s into table %s'
# % (inFieldString, table))

return self._update(f"INSERT INTO {table} {inFieldString} VALUES {inValueString}", conn)
return self._update(f"INSERT INTO {table} {inFieldString} VALUES {inValueString}", conn=conn)

def executeStoredProcedure(self, packageName, parameters, outputIds):
conDict = self._getConnection()
if not conDict["OK"]:
return conDict
@captureOptimizerTraces
def executeStoredProcedure(self, packageName, parameters, outputIds, *, conn=None):
if conn:
connection = conn
else:
conDict = self._getConnection()
if not conDict["OK"]:
return conDict

connection = conDict["Value"]
connection = conDict["Value"]
cursor = connection.cursor()
try:
cursor.callproc(packageName, parameters)
Expand All @@ -1503,12 +1638,17 @@ def executeStoredProcedure(self, packageName, parameters, outputIds):
return retDict

# For the procedures that execute a select without storing the result
def executeStoredProcedureWithCursor(self, packageName, parameters):
conDict = self._getConnection()
if not conDict["OK"]:
return conDict
@captureOptimizerTraces
def executeStoredProcedureWithCursor(self, packageName, parameters, *, conn=None):
if conn:
connection = conn
else:
conDict = self._getConnection()
if not conDict["OK"]:
return conDict

connection = conDict["Value"]

connection = conDict["Value"]
cursor = connection.cursor()
try:
# execStr = "call %s(%s);" % ( packageName, ",".join( map( str, parameters ) ) )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,13 @@ def _findDatasets(self, datasets, connection=False):

resultDict = {"Successful": {}, "Failed": {}}
if fullNames:
result = self.__findFullPathDatasets(fullNames, connection)
result = self.__findFullPathDatasets(fullNames, connection=connection)
if not result["OK"]:
return result
resultDict = result["Value"]

if shortNames:
result = self.__findNoPathDatasets(shortNames, connection)
result = self.__findNoPathDatasets(shortNames, connection=connection)
if not result["OK"]:
return result
resultDict["Successful"].update(result["Value"]["Successful"])
Expand Down Expand Up @@ -209,7 +209,7 @@ def __findFullPathDatasets(self, datasets, connection):
wheres.append("( DirID=%d AND DatasetName IN (%s) )" % (dirID, stringListToString(dsNames)))

req = "SELECT DatasetName,DirID,DatasetID FROM FC_MetaDatasets WHERE %s" % " OR ".join(wheres)
result = self.db._query(req, connection)
result = self.db._query(req, conn=connection)
if not result["OK"]:
return result
for dsName, dirID, dsID in result["Value"]:
Expand All @@ -230,7 +230,7 @@ def __findNoPathDatasets(self, nodirDatasets, connection):
dsIDs = {}
req = "SELECT COUNT(DatasetName),DatasetName,DatasetID FROM FC_MetaDatasets WHERE DatasetName in "
req += "( %s ) GROUP BY DatasetName,DatasetID" % stringListToString(nodirDatasets)
result = self.db._query(req, connection)
result = self.db._query(req, conn=connection)
if not result["OK"]:
return result
for dsCount, dsName, dsID in result["Value"]:
Expand All @@ -243,7 +243,7 @@ def __findNoPathDatasets(self, nodirDatasets, connection):
req = "SELECT DatasetName,DatasetID,DirID FROM FC_MetaDatasets WHERE DatasetID in (%s)" % ",".join(
dsIDs.values()
)
result = self.db._query(req, connection)
result = self.db._query(req, conn=connection)
if not result["OK"]:
return result
for dsName, dsID, dirID in result["Value"]:
Expand All @@ -259,7 +259,7 @@ def addDatasetAnnotation(self, datasets, credDict):
"""Add annotation to the given dataset"""
connection = self._getConnection()
successful = {}
result = self._findDatasets(list(datasets), connection)
result = self._findDatasets(list(datasets), connection=connection)
if not result["OK"]:
return result
failed = result["Value"]["Failed"]
Expand All @@ -270,7 +270,7 @@ def addDatasetAnnotation(self, datasets, credDict):
annotation,
datasetDict[dataset]["DatasetID"],
)
result = self.db._update(req, connection)
result = self.db._update(req, conn=connection)
if not result["OK"]:
failed[dataset] = "Failed to add annotation"
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def getPathIDsByID(self, dirID):
def getChildren(self, path, connection=False):
"""Get child directory IDs for the given directory"""
if isinstance(path, str):
result = self.findDir(path, connection)
result = self.findDir(path, connection=connection)
if not result["OK"]:
return result
if not result["Value"]:
Expand Down
Loading