from HilscherFramework.Test import Testcase, TestFailed from HilscherFramework.Packet import createResponsePacket from HilscherFramework.Utils.Helpers import StructureFieldToString from struct import pack,unpack_from,calcsize,error from HilscherFramework.Stacks.rcX.Header.rcX_Public import RCX_REGISTER_APP_REQ_T, RCX_REGISTER_APP_CNF,\ RCX_UNREGISTER_APP_CNF, RCX_UNREGISTER_APP_REQ_T from HilscherFramework.Stacks.PNS.Header.PNSIF_API import PNS_IF_ADD_CHANNEL_DIAG_REQ_T, PNS_IF_SEND_DIAG_ALARM_REQ_T,\ PNS_IF_SEND_DIAG_ALARM_CNF, PNS_IF_REMOVE_DIAG_REQ_T, PNS_IF_REMOVE_DIAG_CNF, PNS_IF_ADD_EXTENDED_DIAG_REQ_T from HilscherFramework.Stacks.PNM.Header.PNM_PNIOAPCTL_Public import PNIO_APCTL_CMD_APPL_DIAG_DATA_IND import HilscherFramework.Stacks.PNM.Device as PNM import HilscherFramework.Stacks.PNS.Device as PNS import collections Diag = collections.namedtuple("Diag", ["api", "slot", "subslot", "channel_num", "channel_prop", "channel_err_type"]) DiagExt = collections.namedtuple("Diag", ["api", "slot", "subslot", "channel_num", "channel_prop", "channel_err_type", "channel_err_type2", "channel_add_value"]) DiagResponse = collections.namedtuple("DiagResponse", ["usi", "channel_num", "channel_prop", "channel_err_type"]) DiagResponseExt = collections.namedtuple("DiagResponseExt", ["usi", "channel_num", "channel_prop", "channel_err_type", "channel_err_type2", "channel_add_value"]) DiagResponseQualif = collections.namedtuple("DiagResponseQualif", ["usi", "channel_num", "channel_prop", "channel_err_type", "channel_err_type2", "channel_add_value", "channel_qualifier"]) def create_diag_response(r): if len(r) == 4: return DiagResponse(*r) elif len(r) == 6: return DiagResponseExt(*r) elif len(r) == 7: return DiagResponseQualif(*r) else: raise TestFailed("Failed to create the DiagResponse with {}.".format(r)) def add_diag(iod, diag): """Add a diagnosis on the device. Return the diagnosis handler""" req = PNS_IF_ADD_CHANNEL_DIAG_REQ_T() req.tData.ulApi = diag.api req.tData.ulSlot = diag.slot req.tData.ulSubslot = diag.subslot req.tData.usChannelNum = diag.channel_num req.tData.usChannelProp = diag.channel_prop req.tData.usChannelErrType = diag.channel_err_type iod.sendPacket(req) cnf = iod.waitPacket() cnf.checkPacketStatus() return cnf.tData.hDiagHandle def add_ext_diag(iod, diag): """Add an extended diagnosis on the device. Return the diagnosis handler""" req = PNS_IF_ADD_EXTENDED_DIAG_REQ_T() req.tData.ulApi = diag.api req.tData.ulSlot = diag.slot req.tData.ulSubslot = diag.subslot req.tData.usChannelNum = diag.channel_num req.tData.usChannelProp = diag.channel_prop req.tData.usChannelErrType = diag.channel_err_type req.tData.usExtChannelErrType = diag.channel_err_type2 req.tData.ulExtChannelAddValue = diag.channel_add_value iod.sendPacket(req) cnf = iod.waitPacket() cnf.checkPacketStatus() return cnf.tData.hDiagHandle def test_send(ioc, iod, diag_handler, expected_responses): """ Test the capacity to send this diagnosis. The controller will react with an indication. expected_response is a list of DiagResponse, DiagResponseExt and DiagResponseQualif. """ # Send the diagnosis from the device side req = PNS_IF_SEND_DIAG_ALARM_REQ_T() req.tData.hAlarmHandle = 0 req.tData.hDiagHandle = diag_handler iod.sendPacket(req) try: # Handle the diagnosis on the controller side rs = ioc.handle_diag_data() responses = [create_diag_response(r) for r in rs] if responses != expected_responses: raise TestFailed("Invalid Channel Diagnosis Alarm Content(expected: {0}, received: {1})" .format(expected_responses, responses)) finally: iod.waitPacket(PNS_IF_SEND_DIAG_ALARM_CNF) return diag_handler def remove_diag(ioc, iod, diag_handler, expected_responses): """ Remove a diagnosis on the device. The controller will react with an indication. expected_response is a list of DiagResponse, DiagResponseExt and DiagResponseQualif. """ # Remove the diagnosis on the device side req = PNS_IF_REMOVE_DIAG_REQ_T() req.tData.hDiagHandle = diag_handler iod.sendPacket(req) try: # Handle the remove of the diagnosis on the controller side rs = ioc.handle_diag_data() responses = [create_diag_response(r) for r in rs] if set(responses) != set(expected_responses): raise TestFailed("Invalid Channel Diagnosis Alarm Content(expected: {0}, received: {1})" .format(expected_responses, responses)) finally: iod.waitPacket(PNS_IF_REMOVE_DIAG_CNF) class Coding_DiagnosisAlarm(Testcase): ''' @brief Validate the encoding of diagnosis alarms @testenv -# 1 Cifx card used as Profinet Controller -# 1 Cifx card used as Profinet Device @ingroup Device, Conformance @author AndreasMe @date 05.10.2010 @mantis 11104 ''' _device_iod_ = PNS.PNIODeviceV3 _device_ioc_ = PNM.PNIOController def prepare(self, iod, ioc): self.log.trace("Configuring devices") iod.configure(input_length=8, output_length=8) # By default the ControllerV3 is configured to manage the diagnosis # alarms internally. # If the controller is configured with alarmhandling_flags to # PNM_AP_CFG_ALARM_HANDLING_FLAG_DIAGNOSIS to zero, diagnsis alarms will be # forwarded to the application. if isinstance(ioc, PNM.PNIOControllerV3): ioc.configure(slaves=[iod], alarmhandling_flags=~(0x1 | 0x40 | 0x80)) else: ioc.configure(slaves=[iod]) iod.waitCommunicating(timeout=30) ioc.waitCommunicating(timeout=30) ioc.sendPacket(RCX_REGISTER_APP_REQ_T()) ioc.waitPacket(RCX_REGISTER_APP_CNF).checkPacketStatus() def test(self, iod, ioc): h1 = add_diag(iod, Diag(api=0, slot=1, subslot=1, channel_num=0x8000, channel_prop=0, channel_err_type=1)) test_send(ioc, iod, h1, [ DiagResponse(usi=0x8000, channel_num=0x8000, channel_prop=0x0800, channel_err_type=1) ]) h2 = add_diag(iod, Diag(api=0, slot=1, subslot=1, channel_num=0x8000, channel_prop=0x0200, channel_err_type=2)) test_send(ioc, iod, h2, [ #DiagResponseQualif(usi=0x8100, channel_num=0x0F00, channel_prop=8, channel_err_type=1, # channel_err_type2=0, channel_add_value=0, channel_qualifier=0x1), DiagResponse(usi=0x8000, channel_num=0x8000, channel_prop=(0x0800|0x0200), channel_err_type=2), ]) h3 = add_diag(iod, Diag(api=0, slot=1, subslot=1, channel_num=0x8000, channel_prop=0x0400, channel_err_type=3)) test_send(ioc, iod, h3, [ #DiagResponseQualif(usi=0x8100, channel_num=0x0F00, channel_prop=8, channel_err_type=1, #channel_err_type2=0, channel_add_value=0, channel_qualifier=(0x1|0x2)), DiagResponse(usi=0x8000, channel_num=0x8000, channel_prop=(0x0800|0x0400), channel_err_type=3) ]) remove_diag(ioc, iod, h1, [ #DiagResponseQualif(usi=0x8100, channel_num=0x0F00, channel_prop=8, channel_err_type=1, # channel_err_type2=0, channel_add_value=0, channel_qualifier=(0x1|0x2)), DiagResponse(usi=0x8000, channel_num=0x8000, channel_prop=0x1000, channel_err_type=1) ]) remove_diag(ioc, iod, h2, [ #DiagResponseQualif(usi=0x8100, channel_num=0x0F00, channel_prop=8, channel_err_type=1, # channel_err_type2=0, channel_add_value=0, channel_qualifier=0x2), DiagResponse(usi=0x8000, channel_num=0x8000, channel_prop=0x1000|0x0200, channel_err_type=2) ]) remove_diag(ioc, iod, h3, [ DiagResponse(usi=0x8000, channel_num=0x8000, channel_prop=0x1000|0x0400, channel_err_type=3), ]) h1 = add_ext_diag(iod, DiagExt(api=0, slot=1, subslot=1, channel_num=0x8000, channel_prop=0, channel_err_type=1, channel_err_type2=4, channel_add_value=7)) test_send(ioc, iod, h1, [ DiagResponseExt(usi=0x8002, channel_num=0x8000, channel_prop=0x0800, channel_err_type=1, channel_err_type2=4, channel_add_value=7) ]) h2 = add_ext_diag(iod, DiagExt(api=0, slot=1, subslot=1, channel_num=0x8000, channel_prop=0x0200, channel_err_type=2, channel_err_type2=5, channel_add_value=8)) test_send(ioc, iod, h2, [ #DiagResponseQualif(usi=0x8100, channel_num=0x0F00, channel_prop=8, channel_err_type=1, # channel_err_type2=0, channel_add_value=0, channel_qualifier=0x1), DiagResponseExt(usi=0x8002, channel_num=0x8000, channel_prop=0x0800|0x0200, channel_err_type=2, channel_err_type2=5, channel_add_value=8), ]) h3 = add_ext_diag(iod, DiagExt(api=0, slot=1, subslot=1, channel_num=0x8000, channel_prop=0x0400, channel_err_type=3, channel_err_type2=6, channel_add_value=9)) test_send(ioc, iod, h3, [ #DiagResponseQualif(usi=0x8100, channel_num=0x0F00, channel_prop=8, channel_err_type=1, # channel_err_type2=0, channel_add_value=0, channel_qualifier=0x1|0x2), DiagResponseExt(usi=0x8002, channel_num=0x8000, channel_prop=0x0800|0x0400, channel_err_type=3, channel_err_type2=6, channel_add_value=9), ]) remove_diag(ioc, iod, h1, [ # DiagResponseQualif(usi=0x8100, channel_num=0x0F00, channel_prop=8, # channel_err_type=1, channel_err_type2=0, channel_add_value=0, channel_qualifier=0x1|0x2), DiagResponseExt(usi=0x8002, channel_num=0x8000, channel_prop=0x1000, channel_err_type=1, channel_err_type2=4, channel_add_value=7), ]) remove_diag(ioc, iod, h2, [ # DiagResponseQualif(usi=0x8100, channel_num=0x0F00, channel_prop=8, # channel_err_type=1, channel_err_type2=0, channel_add_value=0, channel_qualifier=0x2), DiagResponseExt(usi=0x8002, channel_num=0x8000, channel_prop=0x1000|0x0200, channel_err_type=2, channel_err_type2=5, channel_add_value=8), ]) remove_diag(ioc, iod, h3, [ DiagResponseExt(usi=0x8002, channel_num=0x8000, channel_prop=0x1000|0x0400, channel_err_type=3, channel_err_type2=6, channel_add_value=9) ]) def cleanup(self, iod, ioc): ioc.sendPacket(RCX_UNREGISTER_APP_REQ_T()) ioc.waitPacket(RCX_UNREGISTER_APP_CNF) self.log.trace("Deconfiguring device") try: iod.deconfigure() finally: ioc.deconfigure()