我正在 python 中测试 asyncua lib (opcua-asyncua)。
我想用证书测试身份验证:客户端 - 服务器。
我使用本地 CA 从客户端和服务器签署我的 CSR :
openssl verify -CAfile ca.pem ../server/asyncua_opcua_server_cert.pem
../server/asyncua_opcua_server_cert.pem:好的
openssl verify -CAfile ca.pem ../client/asyncua_opcua_client_cert.pem
../client/asyncua_opcua_client_cert.pem:好的
我还使用了 GitHub 上的示例:
[服务器-with.cryption.py][1]
[客户端加密.py][2]
我对这两个文件都做了一些修改:
服务器:
import asyncio
import sys
from pathlib import Path
import socket
import logging
sys.path.insert(0, "..")
from asyncua import Server
from asyncua import ua
from asyncua.server.user_managers import CertificateUserManager
from asyncua.crypto.cert_gen import setup_self_signed_certificate
from asyncua.crypto.validator import CertificateValidator, CertificateValidatorOptions
from cryptography.x509.oid import ExtendedKeyUsageOID
from asyncua.crypto.truststore import TrustStore
from asyncua.common.methods import uamethod
logging.basicConfig(level=logging.INFO)
USE_TRUST_STORE = True
@uamethod
def func(parent, value):
return value * 2
async def main():
_logger = logging.getLogger(__name__)
cert_base = Path(__file__).parent
certificate_local_path = Path(cert_base/ "certificate/server/asyncua_opcua_server_cert_subjectAltName.pem")
private_key_local_path =Path(cert_base/ "certificate/server/asyncua_opcua_server_key.pem")
host_name = socket.gethostname()
server_app_uri = "asyncua:python:DNAMIC:server"
server = Server()
await server.init()
await server.set_application_uri(server_app_uri)
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
server.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_Sign, ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])
policyIDs = ["Basic256Sha256"]
server.set_security_IDs(policyIDs)
_logger.info("Security IDs du serveur : %s",policyIDs)
# load server certificate and private key. This enables endpoints
# with signing and encryption.
await server.load_certificate(certificate_local_path)
await server.load_private_key(private_key_local_path)
if USE_TRUST_STORE:
trust_store = TrustStore([Path(cert_base/ "certificate/ca")], [])
await trust_store.load()
validator = CertificateValidator(
options=CertificateValidatorOptions.TRUSTED_VALIDATION | CertificateValidatorOptions.PEER_CLIENT,
trust_store=trust_store,
)
else:
validator = CertificateValidator(
options=CertificateValidatorOptions.EXT_VALIDATION | CertificateValidatorOptions.PEER_CLIENT
)
server.set_certificate_validator(validator)
idx = 0
# populating our address space
myobj = await server.nodes.objects.add_object(idx, "MyObject")
myvar = await myobj.add_variable(idx, "MyVariable", 0.0)
await myvar.set_writable() # Set MyVariable to be writable by clients
await server.nodes.objects.add_method(
ua.NodeId("ServerMethod", idx),
ua.QualifiedName("ServerMethod", idx),
func,
[ua.VariantType.Int64],
[ua.VariantType.Int64],
)
# starting!
async with server:
while True:
await asyncio.sleep(1)
current_val = await myvar.get_value()
count = current_val + 0.1
await myvar.write_value(count)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
客户:
import asyncio
import logging
import sys
import socket
from pathlib import Path
from cryptography.x509.oid import ExtendedKeyUsageOID
sys.path.insert(0, "..")
from asyncua import Client
from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256
from asyncua.crypto.cert_gen import setup_self_signed_certificate
from asyncua.crypto.validator import CertificateValidator, CertificateValidatorOptions
from asyncua.crypto.truststore import TrustStore
from asyncua import ua
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)
USE_TRUST_STORE = True
cert_idx = 4
async def task(loop):
host_name = socket.gethostname()
client_app_uri = f"urn:{host_name}:foobar:myselfsignedclient"
url = "opc.tcp://localhost:4840/freeopcua/server/"
cert_base = Path(__file__).parent
cert = Path(cert_base / "certificate/client/asyncua_opcua_client_cert_subjectAltName.pem")
private_key = Path(cert_base / "certificate/client/asyncua_opcua_client_key.pem")
ca_cert = Path(cert_base /"certificate/ca/ca.pem")
client = Client(url=url)
#client.application_uri = client_app_uri
client.application_uri = "asyncua:python:DNAMIC:client"
await client.set_security(
policy=SecurityPolicyBasic256Sha256,
certificate=str(cert),
private_key=str(private_key),
server_certificate=str(ca_cert),
)
if USE_TRUST_STORE:
trust_store = TrustStore([Path(cert_base / "certificate/ca")], [])
await trust_store.load()
validator = CertificateValidator(
CertificateValidatorOptions.TRUSTED_VALIDATION | CertificateValidatorOptions.PEER_SERVER, trust_store
)
else:
validator = CertificateValidator(
CertificateValidatorOptions.EXT_VALIDATION | CertificateValidatorOptions.PEER_SERVER
)
client.certificate_validator = validator
try:
async with client:
objects = client.nodes.objects
child = await objects.get_child(["0:MyObject", "0:MyVariable"])
_logger.info(f"child : {child}")
#test lecture variable OPC-UA
print(await child.get_value())
#test écriture variable OPC-UA
#await child.write_value(42)
#test appel méthode OPC-UA
res = await client.nodes.objects.call_method("ServerMethod", 7)
print(f"Calling ServerMethod returned {res}")
except ua.UaError as exp:
_logger.error(exp)
def main():
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(task(loop))
loop.close()
if __name__ == "__main__":
main()
当我尝试将客户端连接到服务器时,我从客户端控制台得到:
asyncio.exceptions.TimeoutError
服务器控制台:
INFO:asyncua.server.binary_server_asyncio:New connection from ('127.0.0.1', 46434)
INFO:asyncua.uaprotocol:updating server limits to: TransportLimits(max_recv_buffer=65535, max_send_buffer=65535, max_chunk_count=1601, max_message_size=104857600)
ERROR:asyncua.server.binary_server_asyncio:Exception raised while processing message from client
Traceback (most recent call last):
File "/home/arnaud/.local/lib/python3.10/site-packages/asyncua/server/binary_server_asyncio.py", line 99, in _process_received_message_loop
await self._process_one_msg(header, buf)
File "/home/arnaud/.local/lib/python3.10/site-packages/asyncua/server/binary_server_asyncio.py", line 105, in _process_one_msg
ret = await self.processor.process(header, buf)
File "/home/arnaud/.local/lib/python3.10/site-packages/asyncua/server/uaprocessor.py", line 104, in process
msg = self._connection.receive_from_header_and_body(header, body)
File "/home/arnaud/.local/lib/python3.10/site-packages/asyncua/common/connection.py", line 415, in receive_from_header_and_body
chunk = MessageChunk.from_header_and_body(self.security_policy, header, body, use_prev_key=False)
File "/home/arnaud/.local/lib/python3.10/site-packages/asyncua/common/connection.py", line 125, in from_header_and_body
decrypted = crypto.decrypt(data.read(len(data)))
File "/home/arnaud/.local/lib/python3.10/site-packages/asyncua/crypto/security_policies.py", line 198, in decrypt
return self.Decryptor.decrypt(data)
File "/home/arnaud/.local/lib/python3.10/site-packages/asyncua/crypto/security_policies.py", line 309, in decrypt
decrypted += self.decryptor(self.client_pk,
File "/home/arnaud/.local/lib/python3.10/site-packages/asyncua/crypto/uacrypto.py", line 200, in decrypt_rsa_oaep
text = private_key.decrypt(
ValueError: Decryption failed
INFO:asyncua.server.binary_server_asyncio:Lost connection from ('127.0.0.1', 46434), None
INFO:asyncua.server.uaprocessor:Cleanup client connection: ('127.0.0.1', 46434)
我认为我们没有任何关于 asyncua 的文档,所以我很难找到我做错了什么任何想法?
[1]:https://github.com/FreeOpcUa/opcua-asyncio/blob/master/examples/server-with-encryption.py
[2]: https://github.com/FreeOpcUa/opcua-asyncio/blob/master/examples/client-with-encryption.py
好吧,我发现你必须在客户端代码中加载你的证书和私钥:
await client.load_client_certificate(cert)
await client.load_private_key(private_key)
现在我可以使用我的证书和 TrustStore 连接到我的服务器 但是,我不确定我是否正确使用了这些机制:
我尝试使用自签名客户端证书进行连接,服务器拒绝我的连接,这似乎是一个好点。
但是,如果我禁用客户端和服务器端的信任库并使用由我的 CA 签名的客户端证书,我仍然能够对自己进行身份验证并连接,这是正常的吗?
据我了解,certificateUserManager 仅用于管理自签名证书,这不是我的用例。