Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ The Monitoring system is used to monitor various components of DIRAC. Currently,

- WMSHistory: for monitoring the DIRAC WMS
- PilotsHistory: for monitoring of DIRAC pilots
- Component Monitoring: for monitoring DIRAC components such as services, agents, etc.
- Agent Monitoring: for monitoring DIRAC agents
- Service Monitoring: for monitoring DIRAC services
- RMS Monitoring: for monitoring the DIRAC RequestManagement System (mostly the Request Executing Agent).
- PilotSubmission Monitoring: for monitoring the DIRAC pilot submission statistics from SiteDirector agents
- DataOperation Monitoring: for monitoring the DIRAC data operation statistics
Expand Down Expand Up @@ -128,16 +129,13 @@ Enable PilotsHistory monitoring
In order to enable PilotsHistory monitoring you need to set the flag ``monitoringEnabled = True`` in Operations/Defaults.


Enable Component monitoring
===========================

.. warning::

not yet fully working/ready
Enable Monitoring of DIRAC Agents and Services
==============================================

You have to set ``EnableActivityMonitoring=True`` in the CS.
It can be done globally, the ``Operations`` section, or per single component.


Enable RMS Monitoring
=====================

Expand Down
53 changes: 24 additions & 29 deletions src/DIRAC/Core/Base/AgentModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties={}):

self.__basePath = gConfig.getValue("/LocalSite/InstancePath", rootPath)
self.__agentModule = None
self.agentName = agentName
self.__codeProperties = {}
self.__getCodeInfo()

Expand Down Expand Up @@ -295,7 +296,7 @@ def __initializeMonitor(self):
# this class (see https://github.com/DIRACGrid/DIRAC/issues/4793)
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter

self.activityMonitoringReporter = MonitoringReporter(monitoringType="ComponentMonitoring")
self.activityMonitoringReporter = MonitoringReporter(monitoringType="AgentMonitoring")
# With the help of this periodic task we commit the data to ES at an interval of 100 seconds.
gThreadScheduler.addPeriodicTask(100, self.__activityMonitoringReporting)
self.__monitorLastStatsUpdate = time.time()
Expand Down Expand Up @@ -340,10 +341,11 @@ def am_go(self):
signal.signal(signal.SIGALRM, signal.SIG_DFL)
signal.alarm(watchdogInt)
elapsedTime = time.time()
cpuStats = self._startReportToMonitoring()
if self.activityMonitoring:
initialWallTime, initialCPUTime, mem = self._startReportToMonitoring()
cycleResult = self.__executeModuleCycle()
if cpuStats:
self._endReportToMonitoring(*cpuStats)
if self.activityMonitoring and initialWallTime and initialCPUTime:
cpuPercentage = self._endReportToMonitoring(initialWallTime, initialCPUTime)
# Increment counters
self.__moduleProperties["cyclesDone"] += 1
# Show status
Expand All @@ -362,15 +364,15 @@ def am_go(self):
self.log.notice(" Cycle was successful")
if self.activityMonitoring:
# Here we record the data about the cycle duration along with some basic details about the
# component and right now it isn't committed to the ES backend.
# agent and right now it isn't committed to the ES backend.
self.activityMonitoringReporter.addRecord(
{
"timestamp": int(Time.toEpoch()),
"host": Network.getFQDN(),
"componentType": "agent",
"component": "_".join(self.__moduleProperties["fullName"].split("/")),
"cycleDuration": elapsedTime,
"cycles": 1,
"AgentName": self.agentName,
"Timestamp": int(Time.toEpoch()),
"Host": Network.getFQDN(),
"MemoryUsage": mem,
"CpuPercentage": cpuPercentage,
"CycleDuration": elapsedTime,
}
)
else:
Expand All @@ -383,24 +385,17 @@ def am_go(self):
return cycleResult

def _startReportToMonitoring(self):
try:
if not self.activityMonitoring:
now = time.time()
stats = os.times()
cpuTime = stats[0] + stats[2]
if now - self.__monitorLastStatsUpdate < 10:
return (now, cpuTime)
# Send CPU consumption mark
self.__monitorLastStatsUpdate = now
# Send Memory consumption mark
membytes = MemStat.VmB("VmRSS:")
if membytes:
mem = membytes / (1024.0 * 1024.0)
return (now, cpuTime)
else:
return False
except Exception:
return False
now = time.time()
stats = os.times()
mem = None
cpuTime = stats[0] + stats[2]
if now - self.__monitorLastStatsUpdate < 10:
return (now, cpuTime, mem)
self.__monitorLastStatsUpdate = now
membytes = MemStat.VmB("VmRSS:")
if membytes:
mem = membytes / (1024.0 * 1024.0)
return (now, cpuTime, mem)

def _endReportToMonitoring(self, initialWallTime, initialCPUTime):
wallTime = time.time() - initialWallTime
Expand Down
64 changes: 41 additions & 23 deletions src/DIRAC/Core/DISET/private/Service.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def initialize(self):
# this class (see https://github.com/DIRACGrid/DIRAC/issues/4793)
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter

self.activityMonitoringReporter = MonitoringReporter(monitoringType="ComponentMonitoring")
self.activityMonitoringReporter = MonitoringReporter(monitoringType="ServiceMonitoring")
gThreadScheduler.addPeriodicTask(100, self.__activityMonitoringReporting)
self._initMonitoring()
# Call static initialization function
Expand All @@ -134,16 +134,15 @@ def initialize(self):
gLogger.exception(e)
gLogger.exception(errMsg)
return S_ERROR(errMsg)
if self.activityMonitoring:
gThreadScheduler.addPeriodicTask(30, self.__reportActivity)

# Load actions after the handler has initialized itself
result = self._loadActions()
if not result["OK"]:
return result
self._actions = result["Value"]

if not self.activityMonitoring:
gThreadScheduler.addPeriodicTask(30, self.__reportThreadPoolContents)

return S_OK()

def __searchInitFunctions(self, handlerClass, currentClass=None):
Expand Down Expand Up @@ -254,10 +253,25 @@ def _initMonitoring(self):
self._validNames.append(secondaryName)
return S_OK()

def __reportThreadPoolContents(self):
# TODO: remove later
def __reportActivity(self):
initialWallTime, initialCPUTime, mem = self.__startReportToMonitoring()
pendingQueries = self._threadPool._work_queue.qsize()
activeQuereies = len(self._threadPool._threads)
percentage = self.__endReportToMonitoring(initialWallTime, initialCPUTime)
self.activityMonitoringReporter.addRecord(
{
"timestamp": int(Time.toEpoch()),
"Host": Network.getFQDN(),
"ServiceName": "_".join(self._name.split("/")),
"Location": self._cfg.getURL(),
"MemoryUsage": mem,
"CpuPercentage": percentage,
"PendingQueries": pendingQueries,
"ActiveQueries": activeQuereies,
"RunningThreads": threading.activeCount(),
"MaxFD": self.__maxFD,
}
)
self.__maxFD = 0

def getConfig(self):
Expand Down Expand Up @@ -351,7 +365,7 @@ def _processInThread(self, clientTransport):
finally:
self._lockManager.unlockGlobal()
if monReport:
self.__endReportToMonitoring(*monReport)
self.__endReportToMonitoring(monReport[0], monReport[1])

@staticmethod
def _createIdentityString(credDict, clientTransport=None):
Expand Down Expand Up @@ -547,18 +561,21 @@ def _mbConnect(self, trid, handlerObj=None):

def _executeAction(self, trid, proposalTuple, handlerObj):
try:
initialWallTime, initialCPUTime, mem = self.__startReportToMonitoring()
response = handlerObj._rh_executeAction(proposalTuple)
if not response["OK"]:
return response
if self.activityMonitoring:
percentage = self.__endReportToMonitoring(initialWallTime, initialCPUTime)
self.activityMonitoringReporter.addRecord(
{
"timestamp": int(Time.toEpoch()),
"host": Network.getFQDN(),
"componentType": "service",
"component": "_".join(self._name.split("/")),
"componentLocation": self._cfg.getURL(),
"ServiceResponseTime": response["Value"][1],
"Host": Network.getFQDN(),
"serviceName": "_".join(self._name.split("/")),
"Location": self._cfg.getURL(),
"ResponseTime": response["Value"][1],
"MemoryUsage": mem,
"CpuPercentage": percentage,
}
)
return response["Value"][0]
Expand All @@ -567,6 +584,7 @@ def _executeAction(self, trid, proposalTuple, handlerObj):
return S_ERROR("Server error while executing action: %s" % str(e))

def _mbReceivedMsg(self, trid, msgObj):
initialWallTime, initialCPUTime, mem = self.__startReportToMonitoring()
result = self._authorizeProposal(
("Message", msgObj.getName()), trid, self._transportPool.get(trid).getConnectingCredentials()
)
Expand All @@ -578,14 +596,16 @@ def _mbReceivedMsg(self, trid, msgObj):
handlerObj = result["Value"]
response = handlerObj._rh_executeMessageCallback(msgObj)
if self.activityMonitoring and response["OK"]:
percentage = self.__endReportToMonitoring(initialWallTime, initialCPUTime)
self.activityMonitoringReporter.addRecord(
{
"timestamp": int(Time.toEpoch()),
"host": Network.getFQDN(),
"componentType": "service",
"component": "_".join(self._name.split("/")),
"componentLocation": self._cfg.getURL(),
"ServiceResponseTime": response["Value"][1],
"Host": Network.getFQDN(),
"ServiceName": "_".join(self._name.split("/")),
"Location": self._cfg.getURL(),
"ResponseTime": response["Value"][1],
"MemoryUsage": mem,
"CpuPercentage": percentage,
}
)
if response["OK"]:
Expand All @@ -606,22 +626,20 @@ def __activityMonitoringReporting(self):

:return: True / False
"""
result = self.activityMonitoringReporter.commit()
return result["OK"]
return self.activityMonitoringReporter.commit()

def __startReportToMonitoring(self):
now = time.time()
stats = os.times()
cpuTime = stats[0] + stats[2]
mem = None
if now - self.__monitorLastStatsUpdate < 0:
return (now, cpuTime)
# Send CPU consumption mark
return (now, cpuTime, mem)
self.__monitorLastStatsUpdate = now
# Send Memory consumption mark
membytes = MemStat.VmB("VmRSS:")
if membytes:
mem = membytes / (1024.0 * 1024.0)
return (now, cpuTime)
return (now, cpuTime, mem)

def __endReportToMonitoring(self, initialWallTime, initialCPUTime):
wallTime = time.time() - initialWallTime
Expand Down
65 changes: 41 additions & 24 deletions src/DIRAC/Core/Tornado/Server/TornadoServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import DIRAC
from DIRAC import gConfig, gLogger, S_OK
from DIRAC.Core.Security import Locations
from DIRAC.Core.Utilities import MemStat
from DIRAC.Core.Utilities import MemStat, Time, Network
from DIRAC.Core.Tornado.Server.HandlerManager import HandlerManager
from DIRAC.ConfigurationSystem.Client import PathFinder
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations

sLog = gLogger.getSubLogger(__name__)
DEBUG_M2CRYPTO = os.getenv("DIRAC_DEBUG_M2CRYPTO", "No").lower() in ("yes", "true")
Expand Down Expand Up @@ -85,6 +86,7 @@ def __init__(self, services=True, endpoints=False, port=None):
:param int port: Port to listen to.
If ``None``, the port is resolved following the logic described in the class documentation
"""
self.__startTime = time.time()
# Application metadata, routes and settings mapping on the ports
self.__appsSettings = {}
# Default port, if enother is not discover
Expand All @@ -101,12 +103,17 @@ def __init__(self, services=True, endpoints=False, port=None):
self.__monitorLastStatsUpdate = None
self.__monitoringLoopDelay = 60 # In secs

self.activityMonitoring = False
self.monitoringOption = Operations().getValue("MonitoringBackends", ["Accounting"])
if "Monitoring" in self.monitoringOption:
self.activityMonitoring = True
# If services are defined, load only these ones (useful for debug purpose or specific services)
retVal = self.handlerManager.loadServicesHandlers()
if not retVal["OK"]:
sLog.error(retVal["Message"])
raise ImportError("Some services can't be loaded, check the service names and configuration.")

# Response time to load services
self.__elapsedTime = time.time() - self.__startTime
retVal = self.handlerManager.loadEndpointsHandlers()
if not retVal["OK"]:
sLog.error(retVal["Message"])
Expand Down Expand Up @@ -170,7 +177,6 @@ def startTornado(self):
Starts the tornado server when ready.
This method never returns.
"""

# If there is no services loaded:
if not self.__calculateAppSettings():
raise Exception("There is no services loaded, please check your configuration")
Expand All @@ -192,17 +198,22 @@ def startTornado(self):
}

# Init monitoring
self._initMonitoring()
self.__monitorLastStatsUpdate = time.time()
self.__report = self.__startReportToMonitoringLoop()

# Starting monitoring, IOLoop waiting time in ms, __monitoringLoopDelay is defined in seconds
tornado.ioloop.PeriodicCallback(self.__reportToMonitoring, self.__monitoringLoopDelay * 1000).start()

# If we are running with python3, Tornado will use asyncio,
# and we have to convince it to let us run in a different thread
# Doing this ensures a consistent behavior between py2 and py3
asyncio.set_event_loop_policy(tornado.platform.asyncio.AnyThreadEventLoopPolicy())
if self.activityMonitoring:
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter

self.activityMonitoringReporter = MonitoringReporter(monitoringType="ServiceMonitoring")
self.__monitorLastStatsUpdate = time.time()
self.__report = self.__startReportToMonitoringLoop()
# Response time
# Starting monitoring, IOLoop waiting time in ms, __monitoringLoopDelay is defined in seconds
tornado.ioloop.PeriodicCallback(
self.__reportToMonitoring(self.__elapsedTime), self.__monitoringLoopDelay * 1000
).start()

# If we are running with python3, Tornado will use asyncio,
# and we have to convince it to let us run in a different thread
# Doing this ensures a consistent behavior between py2 and py3
asyncio.set_event_loop_policy(tornado.platform.asyncio.AnyThreadEventLoopPolicy())

for port, app in self.__appsSettings.items():
sLog.debug(" - %s" % "\n - ".join(["%s = %s" % (k, ssl_options[k]) for k in ssl_options]))
Expand All @@ -224,19 +235,25 @@ def startTornado(self):

tornado.ioloop.IOLoop.current().start()

def _initMonitoring(self):
def __reportToMonitoring(self, responseTime):
"""
Initialize the monitoring
"""

def __reportToMonitoring(self):
"""
Periodically report to the monitoring of the CPU and MEM
Periodically reports to Monitoring
"""

# Calculate CPU usage by comparing realtime and cpu time since last report
self.__endReportToMonitoringLoop(*self.__report)

percentage = self.__endReportToMonitoringLoop(self.__report[0], self.__report[1])
# Send record to Monitoring
self.activityMonitoringReporter.addRecord(
{
"timestamp": int(Time.toEpoch()),
"Host": Network.getFQDN(),
"ServiceName": "Tornado",
"MemoryUsage": self.__report[2],
"CpuPercentage": percentage,
"ResponseTime": responseTime,
}
)
self.activityMonitoringReporter.commit()
# Save memory usage and save realtime/CPU time for next call
self.__report = self.__startReportToMonitoringLoop()

Expand All @@ -262,7 +279,7 @@ def __startReportToMonitoringLoop(self):
membytes = MemStat.VmB("VmRSS:")
if membytes:
mem = membytes / (1024.0 * 1024.0)
return (now, cpuTime)
return (now, cpuTime, mem)

def __endReportToMonitoringLoop(self, initialWallTime, initialCPUTime):
"""
Expand Down
Loading