Skip to content
63 changes: 54 additions & 9 deletions ssdutil/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,61 @@
#

try:
import argparse
import os
import sys
import argparse
import psutil
from blkinfo import BlkDiskInfo

from sonic_py_common import device_info, logger
except ImportError as e:
raise ImportError("%s - required module not found" % str(e))

DEFAULT_DEVICE="/dev/sda"
DEFAULT_DEVICE = "/dev/sda"
SYSLOG_IDENTIFIER = "ssdutil"
DISK_TYPE_SSD = "sata"

# Global logger instance
log = logger.Logger(SYSLOG_IDENTIFIER)


def get_default_disk():
"""Check default disk"""
default_device = DEFAULT_DEVICE
host_mnt = '/host'
host_partition = None
partitions = psutil.disk_partitions()

if partitions is None:
return (default_device, None)

for parts in partitions:
if parts.mountpoint == host_mnt:
host_partition = parts
break

disk_major = os.major(os.stat(host_partition.device).st_rdev)
filters = {
'maj:min': '{}:0'.format(disk_major)
}

myblkd = BlkDiskInfo()
my_filtered_disks = myblkd.get_disks(filters)

if my_filtered_disks is None:
return (default_device, None)

json_output = my_filtered_disks[0]
blkdev = json_output['name']
disk_type = json_output['tran']
default_device = os.path.join("/dev/", blkdev)

# Disk Type Support for eMMC devices
disk_type = 'eMMC' if len(disk_type) == 0 and 'mmcblk' in host_partition.device else disk_type # noqa: E501

return default_device, disk_type


def import_ssd_api(diskdev):
"""
Loads platform specific or generic ssd_util module from source
Expand All @@ -37,44 +77,49 @@ def import_ssd_api(diskdev):
sys.path.append(os.path.abspath(platform_plugins_path))
from ssd_util import SsdUtil
except ImportError as e:
log.log_warning("Platform specific SsdUtil module not found. Falling down to the generic implementation")
log.log_warning("Platform specific SsdUtil module not found. Falling down to the generic implementation") # noqa: E501
try:
from sonic_platform_base.sonic_storage.ssd import SsdUtil
except ImportError as e:
log.log_error("Failed to import default SsdUtil. Error: {}".format(str(e)), True)
log.log_error("Failed to import default SsdUtil. Error: {}".format(str(e)), True) # noqa: E501
raise e

return SsdUtil(diskdev)


def is_number(s):
try:
float(s)
return True
except ValueError:
return False


# ==================== Entry point ====================
def ssdutil():
if os.geteuid() != 0:
print("Root privileges are required for this operation")
sys.exit(1)

parser = argparse.ArgumentParser()
parser.add_argument("-d", "--device", help="Device name to show health info", default=DEFAULT_DEVICE)
parser.add_argument("-v", "--verbose", action="store_true", default=False, help="Show verbose output (some additional parameters)")
parser.add_argument("-e", "--vendor", action="store_true", default=False, help="Show vendor output (extended output if provided by platform vendor)")
(default_device, disk_type) = get_default_disk()
parser.add_argument("-d", "--device", help="Device name to show health info", default=default_device) # noqa: E501
parser.add_argument("-v", "--verbose", action="store_true", default=False, help="Show verbose output (some additional parameters)") # noqa: E501
parser.add_argument("-e", "--vendor", action="store_true", default=False, help="Show vendor output (extended output if provided by platform vendor)") # noqa: E501
args = parser.parse_args()

print("Disk Type : {0}".format(disk_type.upper()))
ssd = import_ssd_api(args.device)

print("Device Model : {}".format(ssd.get_model()))
if args.verbose:
print("Firmware : {}".format(ssd.get_firmware()))
print("Serial : {}".format(ssd.get_serial()))
print("Health : {}{}".format(ssd.get_health(), "%" if is_number(ssd.get_health()) else ""))
print("Temperature : {}{}".format(ssd.get_temperature(), "C" if is_number(ssd.get_temperature()) else ""))
print("Health : {}{}".format(ssd.get_health(), "%" if is_number(ssd.get_health()) else "")) # noqa: E501
print("Temperature : {}{}".format(ssd.get_temperature(), "C" if is_number(ssd.get_temperature()) else "")) # noqa: E501
if args.vendor:
print(ssd.get_vendor_output())


if __name__ == '__main__':
ssdutil()
Empty file added tests/mocked_libs/__init__.py
Empty file.
90 changes: 90 additions & 0 deletions tests/mocked_libs/blkinfo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
mock_json_op = \
[
{
"name": "sdx",
"kname": "sdx",
"fstype": "",
"label": "",
"mountpoint": "",
"size": "3965714432",
"maj:min": "8:0",
"rm": "0",
"model": "SMART EUSB",
"vendor": "SMART EUSB",
"serial": "SPG200807J1",
"hctl": "2:0:0:0",
"tran": "usb",
"rota": "1",
"type": "disk",
"ro": "0",
"owner": "",
"group": "",
"mode": "brw-rw----",
"children": [
{
"name": "sdx1",
"kname": "sdx1",
"fstype": "ext4",
"label": "",
"mountpoint": "/host",
"size": "3964665856",
"maj:min": "8:1",
"rm": "0",
"model": " ",
"vendor": " ",
"serial": "",
"hctl": "",
"tran": "",
"rota": "1",
"type": "part",
"ro": "0",
"owner": "",
"group": "",
"mode": "brw-rw----",
"children": [],
"parents": ["sdx"],
"statistics": {
"major": "8",
"minor": "1",
"kname": "sdx1",
"reads_completed": "22104",
"reads_merged": "5299",
"sectors_read": "1091502",
"time_spent_reading_ms": "51711",
"writes_completed": "11283",
"writes_merged": "13401",
"sectors_written": "443784",
"time_spent_ writing": "133398",
"ios_in_progress": "0",
"time_spent_doing_ios_ms": "112040",
"weighted_time_ios_ms": "112040",
},
}
],
"parents": [],
"statistics": {
"major": "8",
"minor": "0",
"kname": "sdx",
"reads_completed": "22151",
"reads_merged": "5299",
"sectors_read": "1093606",
"time_spent_reading_ms": "52005",
"writes_completed": "11283",
"writes_merged": "13401",
"sectors_written": "443784",
"time_spent_ writing": "133398",
"ios_in_progress": "0",
"time_spent_doing_ios_ms": "112220",
"weighted_time_ios_ms": "112220",
},
}
]


class BlkDiskInfo:
def __init__(self):
return

def get_disks(self, filters):
return mock_json_op
6 changes: 6 additions & 0 deletions tests/mocked_libs/psutil.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from collections import namedtuple


def disk_partitions():
sdiskpart = namedtuple('sdiskpart', ['mountpoint', 'device'])
return [sdiskpart(mountpoint="/host", device="/dev/sdx1")]
38 changes: 38 additions & 0 deletions tests/ssdutil_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
import os
import sys
import argparse
from unittest.mock import patch, MagicMock
import sonic_platform_base # noqa: F401

tests_path = os.path.dirname(os.path.abspath(__file__))

# Add mocked_libs path so that the file under test
# can load mocked modules from there
mocked_libs_path = os.path.join(tests_path, "mocked_libs") # noqa: E402,F401
sys.path.insert(0, mocked_libs_path)

from .mocked_libs import psutil # noqa: E402,F401
from .mocked_libs.blkinfo import BlkDiskInfo # noqa: E402,F401

sys.modules['os.stat'] = MagicMock()
sys.modules['os.major'] = MagicMock(return_value=8)
sys.modules['sonic_platform'] = MagicMock()
sys.modules['sonic_platform_base.sonic_ssd.ssd_generic'] = MagicMock()

Expand Down Expand Up @@ -32,8 +45,33 @@ def get_vendor_output(self):

class TestSsdutil:

@patch('os.geteuid', MagicMock(return_value=0))
@patch('os.stat', MagicMock(st_rdev=2049))
@patch('os.major', MagicMock(return_value=8))
def test_get_default_disk(self):
(default_device, disk_type) = ssdutil.get_default_disk()

assert default_device == "/dev/sdx"
assert disk_type == 'usb'

@patch('os.geteuid', MagicMock(return_value=0))
@patch('os.stat', MagicMock(st_rdev=2049))
@patch('os.major', MagicMock(return_value=8))
@patch('psutil.disk_partitions', MagicMock(return_value=None))
def test_get_default_disk_none_partitions(self):
(default_device, disk_type) = ssdutil.get_default_disk()

assert default_device == "/dev/sda"
assert disk_type is None

def test_is_number_valueerror(self):
outcome = ssdutil.is_number("nope")
assert outcome is False

@patch('sonic_py_common.device_info.get_paths_to_platform_and_hwsku_dirs', MagicMock(return_value=("test_path", ""))) # noqa: E501
@patch('os.geteuid', MagicMock(return_value=0))
@patch('os.stat', MagicMock(st_rdev=2049))
@patch('os.major', MagicMock(return_value=8))
def test_sonic_storage_path(self):

with patch('argparse.ArgumentParser.parse_args', MagicMock()) as mock_args: # noqa: E501
Expand Down