OPC-UA Python asyncua,使用证书进行身份验证

问题描述 投票:0回答:1

我正在 python 中测试 asyncua lib (opcua-asyncua)。
我想用证书测试身份验证:客户端 - 服务器。
我使用本地 CA 从客户端和服务器签署我的 CSR :

openssl verify -CAfile ca.pem ../server/asyncua_opcua_server_cert.pem

../server/asyncua_opcua_server_cert.pem:好的

openssl verify -CAfile ca.pem ../client/asyncua_opcua_client_cert.pem

../client/asyncua_opcua_client_cert.pem:好的
我还使用了 GitHub 上的示例:
[服务器-with.cryption.py][1]
[客户端加密.py][2]

我对这两个文件都做了一些修改:
服务器

import asyncio
import sys
from pathlib import Path
import socket

import logging

sys.path.insert(0, "..")
from asyncua import Server
from asyncua import ua
from asyncua.server.user_managers import CertificateUserManager
from asyncua.crypto.cert_gen import setup_self_signed_certificate
from asyncua.crypto.validator import CertificateValidator, CertificateValidatorOptions
from cryptography.x509.oid import ExtendedKeyUsageOID
from asyncua.crypto.truststore import TrustStore
from asyncua.common.methods import uamethod


logging.basicConfig(level=logging.INFO)


USE_TRUST_STORE = True 

@uamethod
def func(parent, value):
    return value * 2

async def main():
    _logger = logging.getLogger(__name__)
    cert_base = Path(__file__).parent
    certificate_local_path = Path(cert_base/ "certificate/server/asyncua_opcua_server_cert_subjectAltName.pem")
    private_key_local_path =Path(cert_base/ "certificate/server/asyncua_opcua_server_key.pem")
    host_name = socket.gethostname()
    server_app_uri = "asyncua:python:DNAMIC:server"

    server = Server()
    await server.init()

    await server.set_application_uri(server_app_uri)
    server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
    server.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_Sign, ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])
    policyIDs = ["Basic256Sha256"]         
    server.set_security_IDs(policyIDs)
    _logger.info("Security IDs du serveur : %s",policyIDs)
    # load server certificate and private key. This enables endpoints
    # with signing and encryption.
    await server.load_certificate(certificate_local_path)
    await server.load_private_key(private_key_local_path)

    if USE_TRUST_STORE:
        trust_store = TrustStore([Path(cert_base/ "certificate/ca")], [])
        await trust_store.load()
        validator = CertificateValidator(
            options=CertificateValidatorOptions.TRUSTED_VALIDATION | CertificateValidatorOptions.PEER_CLIENT,
            trust_store=trust_store,
        )
    else:
        validator = CertificateValidator(
            options=CertificateValidatorOptions.EXT_VALIDATION | CertificateValidatorOptions.PEER_CLIENT
        )
    server.set_certificate_validator(validator)

    idx = 0

    # populating our address space
    myobj = await server.nodes.objects.add_object(idx, "MyObject")
    myvar = await myobj.add_variable(idx, "MyVariable", 0.0)
    await myvar.set_writable()  # Set MyVariable to be writable by clients
    await server.nodes.objects.add_method(
        ua.NodeId("ServerMethod", idx),
        ua.QualifiedName("ServerMethod", idx),
        func,
        [ua.VariantType.Int64],
        [ua.VariantType.Int64],
    )

    # starting!
    async with server:
        while True:
            await asyncio.sleep(1)
            current_val = await myvar.get_value()
            count = current_val + 0.1
            await myvar.write_value(count)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(main())  

客户:

import asyncio
import logging
import sys
import socket
from pathlib import Path
from cryptography.x509.oid import ExtendedKeyUsageOID

sys.path.insert(0, "..")
from asyncua import Client
from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256
from asyncua.crypto.cert_gen import setup_self_signed_certificate
from asyncua.crypto.validator import CertificateValidator, CertificateValidatorOptions
from asyncua.crypto.truststore import TrustStore
from asyncua import ua


logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger(__name__)

USE_TRUST_STORE = True

cert_idx = 4

async def task(loop):
    host_name = socket.gethostname()
    client_app_uri = f"urn:{host_name}:foobar:myselfsignedclient"
    url = "opc.tcp://localhost:4840/freeopcua/server/"

    cert_base = Path(__file__).parent
    cert = Path(cert_base / "certificate/client/asyncua_opcua_client_cert_subjectAltName.pem")
    private_key = Path(cert_base / "certificate/client/asyncua_opcua_client_key.pem")
    ca_cert = Path(cert_base /"certificate/ca/ca.pem")

    client = Client(url=url)
    #client.application_uri = client_app_uri
    client.application_uri = "asyncua:python:DNAMIC:client"
    await client.set_security(
        policy=SecurityPolicyBasic256Sha256,
        certificate=str(cert),
        private_key=str(private_key),
        server_certificate=str(ca_cert),
    )

    if USE_TRUST_STORE:
        trust_store = TrustStore([Path(cert_base / "certificate/ca")], [])
        await trust_store.load()
        validator = CertificateValidator(
            CertificateValidatorOptions.TRUSTED_VALIDATION | CertificateValidatorOptions.PEER_SERVER, trust_store
        )
    else:
        validator = CertificateValidator(
            CertificateValidatorOptions.EXT_VALIDATION | CertificateValidatorOptions.PEER_SERVER
        )
    client.certificate_validator = validator
    try:
        async with client:
            objects = client.nodes.objects
            child = await objects.get_child(["0:MyObject", "0:MyVariable"])
            _logger.info(f"child : {child}")
        #test lecture variable OPC-UA
            print(await child.get_value())
        #test écriture variable OPC-UA
            #await child.write_value(42)
        #test appel méthode OPC-UA
            res = await client.nodes.objects.call_method("ServerMethod", 7)
            print(f"Calling ServerMethod returned {res}")
    except ua.UaError as exp:
        _logger.error(exp)


def main():
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    loop.run_until_complete(task(loop))
    loop.close()


if __name__ == "__main__":
    main()

当我尝试将客户端连接到服务器时,我从客户端控制台得到:

asyncio.exceptions.TimeoutError  

服务器控制台:

INFO:asyncua.server.binary_server_asyncio:New connection from ('127.0.0.1', 46434)
INFO:asyncua.uaprotocol:updating server limits to: TransportLimits(max_recv_buffer=65535, max_send_buffer=65535, max_chunk_count=1601, max_message_size=104857600)
ERROR:asyncua.server.binary_server_asyncio:Exception raised while processing message from client
Traceback (most recent call last):
  File "/home/arnaud/.local/lib/python3.10/site-packages/asyncua/server/binary_server_asyncio.py", line 99, in _process_received_message_loop
    await self._process_one_msg(header, buf)
  File "/home/arnaud/.local/lib/python3.10/site-packages/asyncua/server/binary_server_asyncio.py", line 105, in _process_one_msg
    ret = await self.processor.process(header, buf)
  File "/home/arnaud/.local/lib/python3.10/site-packages/asyncua/server/uaprocessor.py", line 104, in process
    msg = self._connection.receive_from_header_and_body(header, body)
  File "/home/arnaud/.local/lib/python3.10/site-packages/asyncua/common/connection.py", line 415, in receive_from_header_and_body
    chunk = MessageChunk.from_header_and_body(self.security_policy, header, body, use_prev_key=False)
  File "/home/arnaud/.local/lib/python3.10/site-packages/asyncua/common/connection.py", line 125, in from_header_and_body
    decrypted = crypto.decrypt(data.read(len(data)))
  File "/home/arnaud/.local/lib/python3.10/site-packages/asyncua/crypto/security_policies.py", line 198, in decrypt
    return self.Decryptor.decrypt(data)
  File "/home/arnaud/.local/lib/python3.10/site-packages/asyncua/crypto/security_policies.py", line 309, in decrypt
    decrypted += self.decryptor(self.client_pk,
  File "/home/arnaud/.local/lib/python3.10/site-packages/asyncua/crypto/uacrypto.py", line 200, in decrypt_rsa_oaep
    text = private_key.decrypt(
ValueError: Decryption failed
INFO:asyncua.server.binary_server_asyncio:Lost connection from ('127.0.0.1', 46434), None
INFO:asyncua.server.uaprocessor:Cleanup client connection: ('127.0.0.1', 46434)

我认为我们没有任何关于 asyncua 的文档,所以我很难找到我做错了什么任何想法?
[1]:https://github.com/FreeOpcUa/opcua-asyncio/blob/master/examples/server-with-encryption.py [2]: https://github.com/FreeOpcUa/opcua-asyncio/blob/master/examples/client-with-encryption.py

python security encryption python-asyncio opc-ua
1个回答
0
投票

好吧,我发现你必须在客户端代码中加载你的证书和私钥:

await client.load_client_certificate(cert)
await client.load_private_key(private_key)

现在我可以使用我的证书和 TrustStore 连接到我的服务器 但是,我不确定我是否正确使用了这些机制:

我尝试使用自签名客户端证书进行连接,服务器拒绝我的连接,这似乎是一个好点。

但是,如果我禁用客户端和服务器端的信任库并使用由我的 CA 签名的客户端证书,我仍然能够对自己进行身份验证并连接,这是正常的吗?

据我了解,certificateUserManager 仅用于管理自签名证书,这不是我的用例。

© www.soinside.com 2019 - 2024. All rights reserved.