Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions canopen/objectdictionary/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def import_od(
source: Union[str, TextIO, None],
node_id: Optional[int] = None,
) -> ObjectDictionary:
"""Parse an EDS, DCF, or EPF file.
"""Parse an EDS, DCF, EPF or XDD file.

:param source:
The path to object dictionary file, a file like object, or an EPF XML tree.
Expand Down Expand Up @@ -106,9 +106,12 @@ def import_od(
elif suffix == ".epf":
from canopen.objectdictionary import epf
return epf.import_epf(source)
elif suffix == ".xdd":
from canopen.objectdictionary import xdd
return xdd.import_xdd(source, node_id)
else:
doc_type = suffix[1:]
allowed = ", ".join(["eds", "dcf", "epf"])
allowed = ", ".join(["eds", "dcf", "epf", "xdd"])
raise ValueError(
f"Cannot import from the {doc_type!r} format; "
f"supported formats: {allowed}"
Expand Down
357 changes: 357 additions & 0 deletions canopen/objectdictionary/xdd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,357 @@
import functools
import logging
import os
import re
import xml.etree.ElementTree as etree
from typing import Union, Optional
from canopen.objectdictionary import (
ODArray,
ODRecord,
ODVariable,
ObjectDictionary,
datatypes,
objectcodes,
)

logger = logging.getLogger(__name__)

autoint = functools.partial(int, base=0)
hex = functools.partial(int, base=16)

def import_xdd(
xdd: Union[etree.Element, str, bytes, os.PathLike],
node_id: Optional[int],
) -> ObjectDictionary:
od = ObjectDictionary()
if etree.iselement(xdd):
root = xdd
else:
root = etree.parse(xdd).getroot()

if node_id is None:
device_commissioning = root.find('.//{*}DeviceCommissioning')
if device_commissioning is not None:
od.node_id = int(device_commissioning['nodeID'], 0)
else:
od.node_id = None
else:
od.node_id = node_id

_add_device_information(od, root)
_add_object_list(od, root)
_add_dummy_objects(od, root)
return od


def _add_device_information(
od: ObjectDictionary,
root: etree.Element
):
device_identity = root.find('.//{*}DeviceIdentity')
if device_identity is not None:
for src_prop, dst_prop, f in [
("vendorName", "vendor_name", str),
("vendorID", "vendor_number", hex),
("productName", "product_name", str),
("productID", "product_number", hex),
]:
val = device_identity.find(f'{{*}}{src_prop}')
if val is not None and val.text:
setattr(od.device_information, dst_prop, f(val.text))

general_features = root.find('.//{*}CANopenGeneralFeatures')
if general_features is not None:
for src_prop, dst_prop, f, default in [
# properties without default value (default=None) are required
("granularity", "granularity", autoint, None),
("nrOfRxPDO", "nr_of_RXPDO", autoint, "0"),
("nrOfTxPDO", "nr_of_TXPDO", autoint, "0"),
("bootUpSlave", "simple_boot_up_slave", bool, 0),
]:
val = general_features.get(src_prop, default)
if val is None:
raise ValueError(f"Missing required '{src_prop}' property in XDD file")
setattr(od.device_information, dst_prop, f(val))


baud_rate = root.find('.//{*}PhysicalLayer/{*}baudRate')
for baud in baud_rate:
try:
rate = int(baud.get("value").replace(' Kbps', ''), 10) * 1000
od.device_information.allowed_baudrates.add(rate)
except (ValueError, TypeError):
pass

if default_baud := baud_rate.get('defaultValue', None):
try:
od.bitrate = int(default_baud.replace(' Kbps', ''), 10) * 1000
except (ValueError, TypeError):
pass


def _add_object_list(
od: ObjectDictionary,
root: etree.Element
):
# Process all CANopen objects in the file
for obj in root.findall('.//{*}CANopenObjectList/{*}CANopenObject'):
name = obj.get('name', '')
index = int(obj.get('index', '0'), 16)
object_type = int(obj.get('objectType', '0'))
sub_number = obj.get('subNumber')

# Simple variable
if object_type == objectcodes.VAR:
unique_id_ref = obj.get('uniqueIDRef', None)
parameters = root.find(f'.//{{*}}parameter[@uniqueID="{unique_id_ref}"]')

var = _build_variable(parameters, od.node_id, name, index)
_set_parameters_from_xdd_canopen_object(od.node_id, var, obj)
od.add_object(var)

# Array
elif object_type == objectcodes.ARRAY and sub_number:
array = ODArray(name, index)
for sub_obj in obj:
sub_name = sub_obj.get('name', '')
sub_index = int(sub_obj.get('subIndex'), 16)
sub_unique_id = sub_obj.get('uniqueIDRef', None)
sub_parameters = root.find(f'.//{{*}}parameter[@uniqueID="{sub_unique_id}"]')

sub_var = _build_variable(sub_parameters, od.node_id, sub_name, index, sub_index)
_set_parameters_from_xdd_canopen_object(od.node_id, sub_var, sub_obj)
array.add_member(sub_var)
od.add_object(array)

# Record/Struct
elif object_type == objectcodes.RECORD and sub_number:
record = ODRecord(name, index)
for sub_obj in obj:
sub_name = sub_obj.get('name', '')
sub_index = int(sub_obj.get('subIndex'), 16)
sub_unique_id = sub_obj.get('uniqueIDRef', None)
sub_parameters = root.find(f'.//{{*}}parameter[@uniqueID="{sub_unique_id}"]')
sub_var = _build_variable(sub_parameters, od.node_id, sub_name, index, sub_index)
_set_parameters_from_xdd_canopen_object(od.node_id, sub_var, sub_obj)
record.add_member(sub_var)
od.add_object(record)


def _add_dummy_objects(
od: ObjectDictionary,
root: etree.Element
):
dummy_section = root.find('.//{*}ApplicationLayers/{*}dummyUsage')
for dummy in dummy_section:
p = dummy.get('entry').split('=')
key = p[0]
value = int(p[1], 10)
index = int(key.replace('Dummy', ''), 10)
if value == 1:
var = ODVariable(key, index, 0)
var.data_type = index
var.access_type = "const"
od.add_object(var)


def _set_parameters_from_xdd_canopen_object(
node_id: Optional[int],
dst: ODVariable,
src: etree.Element
):
# PDO mapping of the object, optional, string
# Valid values:
# * no - not mappable
# * default - mapped by default
# * optional - optionally mapped
# * TPDO - may be mapped into TPDO only
# * RPDO - may be mapped into RPDO only
pdo_mapping = src.get('PDOmapping', 'no')
dst.pdo_mappable = pdo_mapping != 'no'

# Name of the object, optional, string
if var_name := src.get('name', None):
dst.name = var_name

# CANopen data type (two hex digits), optional
# data_type matches canopen library, no conversion needed
if var_data_type := src.get('dataType', None):
try:
dst.data_type = int(var_data_type, 16)
except (ValueError, TypeError):
pass

# Access type of the object; valid values, optional, string
# * const - read access only; the value is not changing
# * ro - read access only
# * wo - write access only
# * rw - both read and write access
# strings match with access_type in canopen library, no conversion needed
if access_type := src.get('accessType', None):
dst.access_type = access_type

# Low limit of the parameter value, optional, string
if min_value := src.get('lowLimit', None):
try:
dst.min = _convert_variable(node_id, dst.data_type, min_value)
except (ValueError, TypeError):
pass

# High limit of the parameter value, optional, string
if max_value := src.get('highLimit', None):
try:
dst.max = _convert_variable(node_id, dst.data_type, max_value)
except (ValueError, TypeError):
pass

# Default value of the object, optional, string
if default_value := src.get('defaultValue', None):
try:
dst.default_raw = default_value
if '$NODEID' in dst.default_raw:
dst.relative = True
dst.default = _convert_variable(node_id, dst.data_type, dst.default_raw)
except (ValueError, TypeError):
pass


def _build_variable(
par_tree: Optional[etree.Element],
node_id: Optional[int],
name: str,
index: int,
subindex: int = 0
) -> ODVariable:
var = ODVariable(name, index, subindex)
# Set default parameters
var.default_raw = None
var.access_type = 'ro'
if par_tree is None:
return var

var.description = par_tree.get('description', '')

# Extract data type
data_types = {
'BOOL': datatypes.BOOLEAN,
'SINT': datatypes.INTEGER8,
'INT': datatypes.INTEGER16,
'DINT': datatypes.INTEGER32,
'LINT': datatypes.INTEGER64,
'USINT': datatypes.UNSIGNED8,
'UINT': datatypes.UNSIGNED16,
'UDINT': datatypes.UNSIGNED32,
'ULINT': datatypes.UNSIGNED32,
'REAL': datatypes.REAL32,
'LREAL': datatypes.REAL64,
'STRING': datatypes.VISIBLE_STRING,
'BITSTRING': datatypes.DOMAIN,
'WSTRING': datatypes.UNICODE_STRING
}

for k, v in data_types.items():
if par_tree.find(f'{{*}}{k}') is not None:
var.data_type = v

# Extract access type
if access_type_str := par_tree.get('access', None):
# Defines which operations are valid for the parameter:
# * const - read access only; the value is not changing
# * read - read access only (default value)
# * write - write access only
# * readWrite - both read and write access
# * readWriteInput - both read and write access, but represents process input data
# * readWriteOutput - both read and write access, but represents process output data
# * noAccess - access denied
access_types = {
'const': 'const',
'read': 'ro',
'write': 'wo',
'readWrite': 'rw',
'readWriteInput': 'rw',
'readWriteOutput': 'rw',
'noAccess': 'const',
}
var.access_type = access_types.get(access_type_str)

# Extract default value
default_value = par_tree.find('{*}defaultValue')
if default_value is not None:
try:
var.default_raw = default_value.get('value')
if '$NODEID' in var.default_raw:
var.relative = True
var.default = _convert_variable(node_id, var.data_type, var.default_raw)
except (ValueError, TypeError):
pass

# Extract allowed values range
min_value = par_tree.find('{*}allowedValues/{*}range/{*}minValue')
if min_value is not None:
try:
var.min = _convert_variable(node_id, var.data_type, min_value.get('value'))
except (ValueError, TypeError):
pass

max_value = par_tree.find('{*}allowedValues/{*}range/{*}maxValue')
if max_value is not None:
try:
var.max = _convert_variable(node_id, var.data_type, max_value.get('value'))
except (ValueError, TypeError):
pass
return var


def _calc_bit_length(
data_type: int
) -> int:
if data_type == datatypes.INTEGER8:
return 8
elif data_type == datatypes.INTEGER16:
return 16
elif data_type == datatypes.INTEGER32:
return 32
elif data_type == datatypes.INTEGER64:
return 64
else:
raise ValueError(f"Invalid data_type '{data_type}', expecting a signed integer data_type.")


def _signed_int_from_hex(
hex_str: str,
bit_length: int
) -> int:
number = int(hex_str, 0)
max_value = (1 << (bit_length - 1)) - 1

if number > max_value:
return number - (1 << bit_length)
else:
return number


def _convert_variable(
node_id: Optional[int],
var_type: int,
value: str
) -> Optional[Union[bytes, str, float, int]]:
if var_type in (datatypes.OCTET_STRING, datatypes.DOMAIN):
return bytes.fromhex(value)
elif var_type in (datatypes.VISIBLE_STRING, datatypes.UNICODE_STRING):
return str(value)
elif var_type in datatypes.FLOAT_TYPES:
return float(value)
else:
# COB-ID can contain '$NODEID+' so replace this with node_id before converting
value = value.replace(" ", "").upper()
if '$NODEID' in value:
if node_id is None:
logger.warn("Cannot convert value with $NODEID, skipping conversion")
return None
else:
return int(re.sub(r'\+?\$NODEID\+?', '', value), 0) + node_id
else:
if var_type in datatypes.SIGNED_TYPES:
return _signed_int_from_hex(value, _calc_bit_length(var_type))
else:
return int(value, 0)
Loading