Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 50 additions & 6 deletions TransformationSystem/DB/TransformationDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ def __init__( self, dbname = None, dbconfig = None, maxQueueSize = 10, dbIn = No
'ParameterType'
]

self.isTransformationTasksInnoDB = True
res = self._query( "SELECT Engine FROM INFORMATION_SCHEMA.TABLES WHERE table_name = 'TransformationTasks'" )
if not res['OK']:
return res
else:
engine = res['Value'][0][0]
if engine.lower() != 'innodb':
self.isTransformationTasksInnoDB = False

def getName( self ):
""" Get the database name
"""
Expand Down Expand Up @@ -813,6 +822,26 @@ def __deleteTransformationFiles( self, transID, connection = False ):
gLogger.error( "Failed to delete transformation files", res['Message'] )
return res

###########################################################################
#
# These methods manipulate the TransformationFileTasks table
#

def __deleteTransformationFileTask( self, transID, taskID, connection = False ):
''' Delete the file associated to a given task of a given transformation
from the TransformationFileTasks table for transformation with TransformationID and TaskID
'''
req = "DELETE FROM TransformationFileTasks WHERE TransformationID=%d AND TaskID=%d" % ( transID, taskID )
return self._update( req, connection )

def __deleteTransformationFileTasks( self, transID, connection = False ):
''' Remove all associations between files, tasks and a transformation '''
req = "DELETE FROM TransformationFileTasks WHERE TransformationID = %d;" % transID
res = self._update( req, connection )
if not res['OK']:
gLogger.error( "Failed to delete transformation files/task history", res['Message'] )
return res

###########################################################################
#
# These methods manipulate the TransformationTasks table
Expand Down Expand Up @@ -1125,7 +1154,7 @@ def __insertTaskInputs( self, transID, taskID, lfns, connection = False ):
vector = str.join( ';', lfns )
fields = ['TransformationID', 'TaskID', 'InputVector']
values = [transID, taskID, vector]
res = self._insert( 'TaskInputs', fields, values, connection )
res = self.insertFields( 'TaskInputs', fields, values, connection )
if not res['OK']:
gLogger.error( "Failed to add input vector to task %d" % taskID )
return res
Expand Down Expand Up @@ -1208,6 +1237,7 @@ def __getAllFileIDs( self, connection = False ):

def __getFileIDsForLfns( self, lfns, connection = False ):
""" Get file IDs for the given list of lfns
warning: if the file is not present, we'll see no errors
"""
req = "SELECT LFN,FileID FROM DataFiles WHERE LFN in (%s);" % ( stringListToString( lfns ) )
res = self._query( req, connection )
Expand Down Expand Up @@ -1374,7 +1404,12 @@ def addTaskForTransformation( self, transID, lfns = [], se = 'Unknown', connecti
self.lock.release()
gLogger.error( "Failed to publish task for transformation", res['Message'] )
return res
res = self._query( "SELECT LAST_INSERT_ID();", connection )

if self.isTransformationTasksInnoDB:
res = self._query( "SELECT @last", connection )
else:
res = self._query( "SELECT LAST_INSERT_ID()", connection )

self.lock.release()
if not res['OK']:
return res
Expand Down Expand Up @@ -1423,15 +1458,21 @@ def cleanTransformation( self, transName, author = '', connection = False ):
return res
connection = res['Value']['Connection']
transID = res['Value']['TransformationID']
res = self.__deleteTransformationFiles( transID, connection = connection )
res = self.__deleteTransformationFileTasks( transID, connection = connection )
if not res['OK']:
return res
res = self.__deleteTransformationTasks( transID, connection = connection )
res = self.__deleteTransformationFiles( transID, connection = connection )
if not res['OK']:
return res
res = self.__deleteTransformationTaskInputs( transID, connection = connection )
if not res['OK']:
return res
res = self.__deleteTransformationTasks( transID, connection = connection )
if not res['OK']:
return res

self.__updateTransformationLogging( transID, "Transformation Cleaned", author, connection = connection )

return S_OK( transID )

def deleteTransformation( self, transName, author = '', connection = False ):
Expand Down Expand Up @@ -1462,10 +1503,13 @@ def __removeTransformationTask( self, transID, taskID, connection = False ):
res = self.__deleteTransformationTaskInputs( transID, taskID, connection = connection )
if not res['OK']:
return res
res = self.__deleteTransformationTask( transID, taskID, connection = connection )
res = self.__deleteTransformationFileTask( transID, taskID, connection = connection )
if not res['OK']:
return res
res = self.__resetTransformationFile( transID, taskID, connection = connection )
if not res['OK']:
return res
return self.__resetTransformationFile( transID, taskID, connection = connection )
return self.__deleteTransformationTask( transID, taskID, connection = connection )

def __checkUpdate( self, table, param, paramValue, selectDict = {}, connection = False ):
""" Check whether the update will perform an update """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def export_getTransformation( self, transName, extraParams = False ):
res = database.getTransformation( transName, extraParams = extraParams )
return self._parseRes( res )

types_getTransformationParameters = [transTypes, [ListType, TupleType]]
types_getTransformationParameters = [transTypes, list( StringTypes ) + [ListType, TupleType]]
def export_getTransformationParameters( self, transName, parameters ):
res = database.getTransformationParameters( transName, parameters )
return self._parseRes( res )
Expand Down