summaryrefslogtreecommitdiffstats
path: root/esilib.py
blob: e68b6cde9c27de9261cce2da559a0e23f1ab3443 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
#!/usr/bin/python
# vim: set fileencoding=utf-8 tabstop=8 expandtab shiftwidth=4 softtabstop=4 :

""" esilib.py
Python-library to write the Device Profile part of an ESI XML file
Florian Bruhin
"""

from lxml import etree
import logging
import warnings
import datetime

import canlib

class ESIWarning(UserWarning):
    """ Base class for all errors in this module """
    pass

class DatatypeNotImplementedWarning(ESIWarning):
    """ Gets raised when a datatype is not implemented yet """
    pass

class NoSignInformationWarning(ESIWarning):
    """ Gets raised when there is a datatype without sign information """
    pass

class InvalidSizeWarning(ESIWarning):
    """ Gets raised when a datatype has an invalid size """
    pass

class UnknownBitsizeWarning(ESIWarning):
    """ Gets raised when a bitsize of a datatype can't be determinated """
    pass

class DoubleIndexWarning(ESIWarning):
    """ Gets raised when a index already exists """
    pass

class InvalidESIWarning(ESIWarning):
    """ Gets raised when the ESI validation fails """
    pass

class IgnoredAttributeWarning(ESIWarning):
    """ Gets raised when an attribute gets ignored """
    pass

class ESIFile:
    """ Represents the device profile part of an ESI XML file """
    def __init__(self, versioninfo=None):
        self.dicttree = etree.Element("Dictionary")
        infoline = "Generated by {} on {}".format(
            versioninfo, datetime.datetime.now().ctime())
        _append_comment(self.dicttree, infoline)
        self.dt_simple = etree.Element("DataTypes")
        self.dt_middle = etree.Element("DataTypes")
        self.dt_complex = etree.Element("DataTypes")
        self.dttree = etree.SubElement(self.dicttree, "DataTypes")
        self.objtree = etree.SubElement(self.dicttree, "Objects")
        self.bitsizes = {}
        self.added_objs = []

    def flush_datatypes(self):
        """ Flushes the datatype-"caches" to the final tree """
        for elem in self.dt_simple:
            self.dttree.append(elem)
        for elem in self.dt_middle:
            self.dttree.append(elem)
        for elem in self.dt_complex:
            self.dttree.append(elem)

    def totree(self):
        """ Converts dicttree to an ElementTree if it's an _Element """
        if type(self.dicttree) is etree._Element:
            self.dicttree = etree.ElementTree(self.dicttree)

    def tofile(self, filename, pretty_print=True):
        """ Writes the XML-tree to a file """
        self.dicttree.write(filename, pretty_print=pretty_print,
                            xml_declaration=True)

    def dump(self, pretty_print=True):
        """ Dumps the XML-tree to stdout """
        bytestring = etree.tostring(self.dicttree, encoding='utf-8',
                                    pretty_print=pretty_print)
        print(bytestring.decode('utf-8'))

    def validate(self, schema):
        """ Validates the generated ESI with an XML schema """
        try:
            xmlschema_doc = etree.parse(schema)
        except IOError:
            logging.error("Could not read schema file \"{}\", skipping "
                          "validation!".format(schema))
        except etree.XMLSyntaxError:
            logging.error("Schema file is not valid XML, skipping "
                          "validation!")
        else:
            try:
                xmlschema = etree.XMLSchema(xmlschema_doc)
            except etree.XMLSchemaParseError:
                logging.error("Schema file is not a valid XML schema, "
                              "skipping validation!")
            else:
                if not xmlschema.validate(self.dicttree):
                    error = xmlschema.error_log
                    logging.error("Validation failed!\n")
                    prevmsg = None
                    rcnt = 0
                    for message in error:
                        if str(prevmsg) == str(message):
                            rcnt += 1
                        else:
                            if rcnt != 0:
                                logging.error("=== Previous message repated "
                                              "{} times ===".format(rcnt))
                                rcnt = 0
                            logging.error(message)
                        prevmsg = message
                    warnings.warn("The output is not a valid ESI!",
                                  InvalidESIWarning)

    def merge_from_template(self, filename):
        """ Replaces dicttree with the tree of the template file, with the
        dictionary-part replaced by the new data
        """
        xpath = ("/EtherCATInfo/Descriptions/Devices/Device[1]/Profile[1]/"
                 "Dictionary")
        parser = etree.XMLParser(remove_blank_text=True, strip_cdata=False)
        try:
            templatetree = etree.parse(filename, parser)
        except IOError:
            logging.error("Could not read template file \"{}\", will ignore "
                          "template option and only output dictionary-"
                          "tree".format(filename))
        except etree.XMLSyntaxError:
            logging.error("Template file is not valid XML, will ignore "
                          "template option and only output dictionary-tree")
        else:
            try:
                templatedict = templatetree.getroot().xpath(xpath)[0]
            except IndexError:
                logging.error("Could not find Dictionary-tree in template "
                              "XML, will ignore template option and only "
                              "output dictionary-tree")
            else:
                templatedict.getparent().replace(templatedict, self.dicttree)
                self.dicttree = templatetree

    def add_var_datatype(self, datatype):
        """ Adds a new basic/var datatype to the ESI """
        (esitype, esisize) = type_to_esitype(datatype)
        if esitype not in self.bitsizes: # Datatype does not already exist
            properties = [("Name", esitype), ("BitSize", esisize)]
            _append_comment(self.dt_simple, esitype)
            _append_elements(self.dt_simple, "DataType", properties)
            self.bitsizes[esitype] = esisize
        return (esitype, esisize)

    def add_record_datatype(self, esiname):
        """ Adds a datatype for a record.
        esiname: A human readable name like "DT1003"
        """
        # Bitsize will be set later
        properties = [ ("Name", esiname), ("BitSize", 'x') ]
        _append_comment(self.dt_complex, esiname)
        record = _append_elements(self.dt_complex, "DataType", properties)
        return record

    def add_array_datatype(self, esiname, basetype, elements, lbound=0,
                           octet=False):
        """ Adds a datatype for an array.
        datatype:           Basetype of the array
        elements:           Number of elements
        lbound:             ???, <LBound>-Tag
        """
        (esitype, esisize) = type_to_esitype(basetype)
        if octet:
            bitsize = esisize * elements
            target = self.dt_middle
        else:
            bitsize = esisize
            target = self.dt_complex
        if esiname not in self.bitsizes:
            properties = [
                ("Name", esiname),
                ("BaseType", esitype),
                ("BitSize", bitsize),
            ]
            arrayinfo = [ ("LBound", lbound), ("Elements", elements) ]
            _append_comment(target, esiname)
            array = _append_elements(target, "DataType", properties)
            _append_elements(array, "ArrayInfo", arrayinfo)
            self.bitsizes[esiname] = bitsize
            retval = array
        else:
            retval = None
        return retval

    def add_object(self, canobj):
        """ Adds a new object to the tree """
        _append_comment(self.objtree, canobj.index)
        if canobj.index in self.added_objs:
            warnings.warn("Object {} already exists!".format(canobj.index),
                          DoubleIndexWarning)
        else:
            self.added_objs.append(canobj.index)
        esisize = self._get_esisize(canobj.datatype)
        properties = [
            ("Index", canobj.index),
            ("Name", canobj.name),
            ("Type", canobj.datatype),
            ("BitSize", esisize),
        ]
        info = _get_info_tree(canobj)
        flags = [
            ("Access", canobj.accesstype),
            ("Category", canobj.category),
            ("PdoMapping", canobj.pdomapping),
        ]
        ignored = [
            ("objFlags setting (see CiA-306)", canobj.objflags)
        ]
        obj = _add_to_xml(self.objtree, "Object", properties, ignored)
        if canobj.infocomment is not None:
            _append_comment(obj, canobj.infocomment)
        _append_elements(obj, "Info", info, force=True)
        _append_elements(obj, "Flags", flags)
        return obj

    def add_subobject(self, canobj, parent=None):
        """ Adds a new object to the tree """
        esiname = parent.xpath('Name')[0].text
        bitoffset = self._get_bitoffset(esiname)
        esisize = self._get_esisize(canobj.datatype)
        properties = [
            ("SubIdx", canobj.subindex),
            ("Name", canobj.name),
            ("Type", canobj.datatype),
            ("BitSize", esisize),
            ("BitOffs", bitoffset),
        ]
        flags = [
            ("Access", canobj.accesstype),
            ("Category", canobj.category),
            ("PdoMapping", canobj.pdomapping),
        ]
        ignored = [
            ("objFlags setting (see CiA-306)", canobj.objflags)
        ]
        obj = _add_to_xml(parent, "SubItem", properties, ignored)
        _append_elements(obj, "Flags", flags)
        self._set_bitoffset(parent, esiname, bitoffset, esisize)
        return parent

    def clear_bitoffset(self, esiname):
        """ Clears the bitoffset of an esiname """
        self.bitsizes[esiname] = 0

    def write_infotree(self, canobj, xmlobj):
        """ Writes the info-trees of an object """
        infotree = xmlobj.xpath('Info')[0]
        for subobj in canobj.subobjects:
            name = [('Name', subobj.name)]
            info = _get_info_tree(subobj)
            if info is not None:
                subitem = _append_elements(infotree, "SubItem", name)
                _append_elements(subitem, "Info", info)

    def _set_bitoffset(self, parent, esiname, bitoffset, esisize):
        """ Sets the bitoffset of an esiname """
        if esisize != 'x':
            bitoffset += esisize
        # The next bit offset after all elements is the total size
        bitsize = parent.xpath('BitSize')[0]
        bitsize.text = str(bitoffset)
        self.bitsizes[esiname] = bitoffset

    def _get_bitoffset(self, esiname):
        """ Gets the bitoffset of an esiname """
        # Offset of the object we're inserting
        if esiname in self.bitsizes:
            # size of the whole record is also the offset for the next object
            bitoffset = self.bitsizes[esiname]
        else:
            # we're inserting the first subobject
            bitoffset = 0
        return bitoffset

    def _get_esisize(self, datatype):
        """ Returns the ESI-size of a datatype """
        try:
            esisize = self.bitsizes[datatype]
        except KeyError:
            warnings.warn("Can't find the bitsize of {}".format(datatype),
                          UnknownBitsizeWarning)
            esisize = 'x'
        return esisize

def type_to_esitype(datatype):
    """ Takes a py-datatype and returns the corresponding esi-type """
    esitype = ""
    esisize = 0
    convfuncs = {
        'boolean': _esitype_bool,
        'integer': _esitype_int,
        'string':  _esitype_str,
        'octet':   _esitype_oct,
        'unicode': _esitype_oct,
        'float':   _esitype_float,
    }
    try:
        convfunc = convfuncs[datatype.type_]
    except KeyError:
        warnings.warn("Datatype {} of type {} is not implemented!".format(
                       datatype.name, repr(datatype.type_)),
                       DatatypeNotImplementedWarning)
        esitype = "UNKNOWN ({})".format(datatype.name)
        esisize = "x"
    else:
        (esitype, esisize) = convfunc(datatype)
    return (esitype, esisize)

def _esitype_int(datatype):
    """ Converts integer-py-datatypes to ESI """
    esitype = ""
    signstrings = { False: 'U', True: '' }
    sizestrings = {  8: 'SINT',  16: 'INT',   24: 'INT24', 32: 'DINT',
                    40: 'INT40', 48: 'INT48', 56: 'INT56', 64: 'LINT' }
    try:
        signstring = signstrings[datatype.signed]
    except KeyError:
        warnings.warn(
            "Got datatype {} of type int without sign information, assuming "
            "signed!".format(datatype.name), NoSignInformationWarning)
        signstring = ''
    try:
        sizestring = sizestrings[datatype.size]
    except KeyError:
        warnings.warn(
            "Got datatype {} of type int with an invalid size, assuming 16 "
            "bit!".format(datatype.name), InvalidSizeWarning)
        esisize = 16
        sizestring = sizestrings[esisize]
    else:
        esisize = datatype.size
    esitype = signstring + sizestring
    return (esitype, esisize)

def _esitype_float(datatype):
    """ Converts float-py-datatypes to ESI """
    allowed_sizes = [ 32, 64 ]
    if datatype.size not in allowed_sizes:
        warnings.warn(
            "Got datatype {} of type float with an invalid size, assuming 32 "
            "bit!".format(datatype.name), InvalidSizeWarning)
        esisize = 32
    else:
        esisize = datatype.size
    esitype = 'REAL{}'.format(esisize)
    return (esitype, esisize)

def _esitype_str(datatype):
    """ Converts string-py-datatypes to ESI """
    esilength = datatype.length
    esisize = datatype.size
    esitype = "STRING({})".format(esilength)
    return (esitype, esisize)

def _esitype_bool(datatype):
    """ Converts bool-py-datatypes to ESI """
    esitype = 'BOOL'
    esisize = datatype.size
    return (esitype, esisize)

def _esitype_oct(datatype):
    """ Converts octetstring-py-datatypes to ESI """
    esilength = datatype.length
    esisize = datatype.size
    if datatype.type_ == 'octet':
        pybasetype = canlib.Datatype(False, 'integer', 8, "OCTET({})".format(
                                     esilength), esilength)
    elif datatype.type_ == 'unicode':
        pybasetype = canlib.Datatype(False, 'integer', 16, "UNICODE({})".
                                     format(esilength), esilength)
    esibasetype = type_to_esitype(pybasetype)[0]
    esitype = "ARRAY [0..{}] OF {}".format(esilength - 1, esibasetype)
    return (esitype, esisize)

def _append_comment(root, comment):
    """ Appends a comment to the tree """
    root.append(etree.Comment(comment))

def _append_elements(root, subnode, attrs, force=False):
    """ Appends new datatype or object to the tree.
    root:       dttree, objtree, ...
    subnode:   "DataType", "Object", "SubItem", ...
    attrs:     [ ("Name", "UDINT"), ("BitSize", 32), ... ]
    force:     If true, add subnode even if attrs are None
    An example: If the objtree is empty at the moment, and you do:
    _append_elements(objtree, "Object", [("Name", "foobar")]
    it gets:
    <Object><Name>foobar</Name></Object>
    """
    # If all values in properties are None,
    # we don't create the subtree-tag
    if ((attrs is not None and any(attr[1] is not None for attr in attrs))
        or force):
        subtree = etree.SubElement(root, subnode)
        for attr in attrs:
            # If the value in the attr is None, the tag is omitted
            if attr[1] is not None:
                elem = etree.SubElement(subtree, str(attr[0]))
                elem.text = str(attr[1])
        retval = subtree
    else:
        retval = None
    return retval

def _add_to_xml(xmlparent, xmlnodename, properties, ignored):
    """ Adds a new node with it's properties to the XML """
    obj = _append_elements(xmlparent, xmlnodename, properties)
    for elem in ignored:
        if elem[1] is not None:
            _append_comment(obj, "ignored: {} (value: {})".format(elem[0],
                            elem[1]))
            warnings.warn("Ignored attribute '{}' (value: {})".format(
                           elem[0], elem[1]), IgnoredAttributeWarning)
    return obj

def _get_info_tree(canobj):
    """ Returns the info-tree of an object """
    if getattr(canobj.sibling.datatype, 'type_', None) == 'octet':
        return None
    elif canobj.dataorvalue == "data":
        info = [
            ("MinData", canobj.minimum),
            ("MaxData", canobj.maximum),
            ("DefaultData", canobj.default),
        ]
    elif canobj.dataorvalue == "value":
        info = [
            ("MinValue", canobj.minimum),
            ("MaxValue", canobj.maximum),
            ("DefaultValue", canobj.default),
        ]
    return info