我在Python中使用一个简单的OPC-UA服务器,我的麻烦是我需要读取一个变量“astParam”,它是一个结构ST_NameValue的数组[20],以便在我的OPC-UA方法中使用它。我真的不知道如何创建这样的 OPC-UA 变量。
import time
import subprocess
import sys
sys.path.insert(0, "..")
from opcua import ua, Server, uamethod
import threading
# Définir la structure ST_NameValue
class ST_NameValue:
def __init__(self, name="", value=""):
self.Name = name
self.Value = value
def lire_adc():
return 100
def set_line_state(gpiochip, toto, tata):
print("set_line_state called")
def indexeurMoveDown(choixIndexeur):
valeur_adc = lire_adc()
print("Valeur ADC avant déplacement: ", valeur_adc)
match choixIndexeur:
case 1: #indexeur 1
gpiochipNbrSens1 = "gpiochip2"
lineNbrSens1 = 15
gpiochipNbrSens2 = "gpiochip2"
lineNbrSens2 = 3
case 2: #indexeur 2
gpiochipNbrSens1 = "gpiochip3"
lineNbrSens1 = 25
gpiochipNbrSens2 = "gpiochip0"
lineNbrSens2 = 11
case 3: #indexeur 3
gpiochipNbrSens1 = "gpiochip0"
lineNbrSens1 = 8
gpiochipNbrSens2 = "gpiochip0"
lineNbrSens2 = 9
#Sens d'avance
set_line_state(gpiochipNbrSens1, lineNbrSens1, 1) #exemple avec indexeur 1 : GPIO_1_P_3.3V -> enable sens 1
set_line_state(gpiochipNbrSens2, lineNbrSens2, 0) #exemple avec indexeur 1 : GPIO_1_N_3.3V -> disable sens 2
time.sleep(0.5) #simu déplacement
while valeur_adc > 120:
valeur_adc = lire_adc()
set_line_state(gpiochipNbrSens1, lineNbrSens1, 0)
print("Valeur ADC après déplacement: ", valeur_adc)
def indexeurMoveUp(choixIndexeur):
valeur_adc = lire_adc()
print("Valeur ADC avant déplacement: ", valeur_adc)
match choixIndexeur:
case 1: #indexeur 1
gpiochipNbrSens1 = 2
lineNbrSens1 = 15
gpiochipNbrSens2 = 2
lineNbrSens2 = 3
case 2: #indexeur 2
gpiochipNbrSens1 = 3
lineNbrSens1 = 25
gpiochipNbrSens2 = 0
lineNbrSens2 = 11
case 3: #indexeur 3
gpiochipNbrSens1 = 0
lineNbrSens1 = 8
gpiochipNbrSens2 = 0
lineNbrSens2 = 9
#Sens d'avance
set_line_state(gpiochipNbrSens1, lineNbrSens1, 0) #exemple avec indexeur 1 : GPIO_1_P_3.3V -> disable sens 1
set_line_state(gpiochipNbrSens2, lineNbrSens2, 1) #exemple avec indexeur 1 : GPIO_1_N_3.3V -> enable sens 2
while valeur_adc < 1560:
valeur_adc = lire_adc()
set_line_state(gpiochipNbrSens2, lineNbrSens2, 0)
print("Valeur ADC après déplacement: ", valeur_adc)
@uamethod
def mAckError(parent):
print("mAckError called")
xError_var.set_value(False)
return "xAckError"
@uamethod
def mInitialize(parent):
print("mInitialize called")
# ici il faut descendre l'indexeur car un index vide est à l'état bas.
# il faut aussi init toutes les variables opc ua
xBusy_var.set_value(False)
xReady_var.set_value(True)
xError_var.set_value(False)
thread = threading.Thread(target=thread_mInitialize, args=(parent,))
thread.start()
return "xAckInit"
def thread_mInitialize(parent): #permet de mettre l'indexeur en position basse
xBusy_var.set_value(True)
xReady_var.set_value(False)
#indexeurMoveDown("all") à revoir car il faut envoyer tout les indexeurs vers le bas
xBusy_var.set_value(False)
xReady_var.set_value(True)
@uamethod
def mReset(parent):
print("mReset called")
@uamethod
def mSafetyStop(parent):
print("mSafetyStop called")
@uamethod
def mRetractUnload(parent):
print("mRetractUnload called")
return "xAckRetractUnload"
@uamethod
def mUnloadElement(parent):
print("mLoadElement called")
#ici il faudra en input pouvoir préciser quel indexeurs il faut déplacer (1 à 3)
thread = threading.Thread(target=thread_unLoadElement, args=(parent,))
thread.start()
return "xAckUnloadElement"
def thread_unLoadElement(parent):
xBusy_var.set_value(True)
xReady_var.set_value(False)
astParams_node_id = ua.NodeId("astParams", 10)
astParams_var = server.get_node(astParams_node_id)
astParams_array = astParams_var.get_value()
print("astParams: "+str(astParams_array))
# Parcourir le tableau pour trouver l'élément correspondant au nom choixIndexeur
for item in astParams_array:
if item.name == "choixIndexeur":
choixIndexeur = item.value
break
indexeurMoveDown(choixIndexeur)
xBusy_var.set_value(False)
xReady_var.set_value(True)
@uamethod
def mLoadElement(parent):
print("mUnloadElement called")
thread = threading.Thread(target=thread_LoadElement, args=(parent,))
thread.start()
return "xAckLoadElement"
def thread_LoadElement(parent):
xBusy_var.set_value(True)
xReady_var.set_value(False)
astParams_node_id = ua.NodeId("astParams", 10)
astParams_var = server.get_node(astParams_node_id)
astParams_array = astParams_var.get_value()
print("astParams: "+str(astParams_array))
# Parcourir le tableau pour trouver l'élément correspondant au nom choixIndexeur
for item in astParams_array:
if item.name == "choixIndexeur":
choixIndexeur = item.value
break
if(lire_adc > 150): #valider que l'indexeur est bien en position basse
indexeurMoveDown(choixIndexeur)
xBusy_var.set_value(False)
xReady_var.set_value(True)
@uamethod
def mRetractElement(parent):
print("mRetractElement called")
thread = threading.Thread(target=thread_RetractElement, args=(parent,))
thread.start()
return "xAckRetractElement"
def thread_RetractElement(parent):
xBusy_var.set_value(True)
xReady_var.set_value(False)
astParams_node_id = ua.NodeId("astParams", 10)
astParams_var = server.get_node(astParams_node_id)
astParams_array = astParams_var.get_value()
print("astParams: "+str(astParams_array))
# Parcourir le tableau pour trouver l'élément correspondant au nom choixIndexeur
for item in astParams_array:
if item.name == "choixIndexeur":
choixIndexeur = item.value
break
indexeurMoveUp(choixIndexeur)
xBusy_var.set_value(False)
xReady_var.set_value(True)
if __name__ == "__main__":
print("Début du programme")
#OPC-UA SERVER
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
base = server.get_objects_node()
base.add_method(1, "mLoadElement", mLoadElement, [], [ua.VariantType.String])
base.add_method(2, "mUnloadElement",mUnloadElement,[],[ua.VariantType.String])
base.add_method(3, "mAckError", mAckError,[],[ua.VariantType.String])
base.add_method(4, "mInitialize", mInitialize,[],[ua.VariantType.String])
base.add_method(5, "mReset", mReset,[],[])
base.add_method(6, "mSafetyStop", mSafetyStop,[],[])
base.add_method(1, "mRetractUnload", mRetractUnload, [], [ua.VariantType.String])
base.add_method(1, "mRetractElement", mRetractElement, [], [ua.VariantType.String])
xBusy_var = base.add_variable(ua.NodeId("xBusy", 7), "xBusy", False)
xReady_var = base.add_variable(ua.NodeId("xReady", 8), "xReady", True)
xError_var = base.add_variable(ua.NodeId("xError", 9), "xError", False)
initial_values = [ST_NameValue("nameTest","valueTest") for _ in range(20)]
astParams_var = base.add_variable(ua.NodeId("astParams", 10), "astParams", initial_values,varianttype=ua.VariantType.ExtensionObject)
astParams_var.set_array_dimensions([20])
server.start()
这是我得到的错误:
Traceback (most recent call last):
File "/home/dev/opcuaServer.py", line 227, in <module>
astParams_var = base.add_variable(ua.NodeId("astParams", 10), "astParams", initial_values,varianttype=ua.VariantType.ExtensionObject)
File "/home/.local/lib/python3.10/site-packages/opcua/common/node.py", line 664, in add_variable
return opcua.common.manage_nodes.create_variable(self, nodeid, bname, val, varianttype, datatype)
File "/home/.local/lib/python3.10/site-packages/opcua/common/manage_nodes.py", line 89, in create_variable
return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=False))
File "/home/.local/lib/python3.10/site-packages/opcua/common/manage_nodes.py", line 232, in _create_variable
attrs.DataType = _guess_datatype(var)
File "/home/.local/lib/python3.10/site-packages/opcua/common/manage_nodes.py", line 370, in _guess_datatype
return ua.NodeId(getattr(ua.ObjectIds, classname))
AttributeError: type object 'ObjectIds' has no attribute 'ST_NameValue'
我尝试创建一个 int 数组,然后使用测试客户端将我的 Struct 的一些值放入其中,例如:
toto = ST_NameValue("choixIndexeur", 1)
astParams_node_id = ua.NodeId("astParams", 10)
astParams_var = client.get_node(astParams_node_id)
astParams_var.set_value([toto])
但我有以下错误:
Traceback (most recent call last):
File "/home/dev/opcuaClient.py", line 36, in <module>
astParams_var.set_value([toto])
File "/home/.local/lib/python3.10/site-packages/opcua/common/node.py", line 217, in set_value
self.set_attribute(ua.AttributeIds.Value, datavalue)
File "/home/.local/lib/python3.10/site-packages/opcua/common/node.py", line 262, in set_attribute
result = self.server.write(params)
File "/home/.local/lib/python3.10/site-packages/opcua/client/ua_client.py", line 367, in write
data = self._uasocket.send_request(request)
File "/home/.local/lib/python3.10/site-packages/opcua/client/ua_client.py", line 81, in send_request
future = self._send_request(request, callback, timeout, message_type)
File "/home/.local/lib/python3.10/site-packages/opcua/client/ua_client.py", line 55, in _send_request
binreq = struct_to_binary(request)
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 261, in struct_to_binary
packet.append(to_binary(uatype, val))
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 284, in to_binary
return struct_to_binary(val)
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 256, in struct_to_binary
packet.append(list_to_binary(uatype[6:], val))
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 296, in list_to_binary
pack = [to_binary(uatype, el) for el in val]
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 296, in <listcomp>
pack = [to_binary(uatype, el) for el in val]
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 284, in to_binary
return struct_to_binary(val)
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 261, in struct_to_binary
packet.append(to_binary(uatype, val))
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 274, in to_binary
return pack_uatype(vtype, val)
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 194, in pack_uatype
return struct_to_binary(value)
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 261, in struct_to_binary
packet.append(to_binary(uatype, val))
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 274, in to_binary
return pack_uatype(vtype, val)
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 192, in pack_uatype
return variant_to_binary(value)
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 373, in variant_to_binary
b.append(pack_uatype_array(var.VariantType, ua.flatten(var.Value)))
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 224, in pack_uatype_array
b = [pack_uatype(vtype, val) for val in array]
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 224, in <listcomp>
b = [pack_uatype(vtype, val) for val in array]
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 188, in pack_uatype
return extensionobject_to_binary(value)
File "/home/.local/lib/python3.10/site-packages/opcua/ua/ua_binary.py", line 458, in extensionobject_to_binary
TypeId = ua.extension_object_ids[obj.__class__.__name__]
KeyError: 'ST_NameValue'
不能使用Python结构。您必须创建特定的 OPC UA 数据类型:
snode1, _ = await new_struct(server, idx, "ST_NameValue", [
new_struct_field("Name", ua.VariantType.String),
new_struct_field("Value", ua.VariantType.String),
])
要使用它们,请使用
ua.ST_NameValue
:
initial_values = [ua.ST_NameValue("nameTest","valueTest") for _ in range(20)]
astParams_var = base.add_variable(ua.NodeId("astParams", 10), "astParams", initial_values,varianttype=ua.VariantType.ExtensionObject)
有关更完整的示例,请参阅 https://github.com/FreeOpcUa/opcua-asyncio/blob/master/examples/server-custom-structs-and-enums.py。