diff --git a/asyncua/client/client.py b/asyncua/client/client.py index e2ba6b25e..e95799e3f 100644 --- a/asyncua/client/client.py +++ b/asyncua/client/client.py @@ -165,9 +165,9 @@ async def set_security_string(self, string: str) -> None: Set SecureConnection mode. :param string: Mode format ``Policy,Mode,certificate,private_key[,server_certificate]`` where: - - ``Policy`` is ``Basic128Rsa15``, ``Basic256`` or ``Basic256Sha256`` + - ``Policy`` is ``Basic256Sha256``, ``Aes128Sha256RsaOaep`` or ``Aes256Sha256RsaPss`` - ``Mode`` is ``Sign`` or ``SignAndEncrypt`` - - ``certificate`` and ``server_private_key`` are paths to ``.pem`` or ``.der`` files + - ``certificate`` and ``server_certificate`` are paths to ``.pem`` or ``.der`` files - ``private_key`` may be a path to a ``.pem`` or ``.der`` file or a conjunction of ``path``::``password`` where ``password`` is the private key password. Call this before connect() @@ -669,18 +669,11 @@ def _add_certificate_auth(self, params, certificate, challenge): params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate) # specs part 4, 5.6.3.1: the data to sign is created by appending # the last serverNonce to the serverCertificate - params.UserTokenSignature = ua.SignatureData() - # use signature algorithm that was used for certificate generation - if certificate.signature_hash_algorithm.name == "sha256": - params.UserIdentityToken.PolicyId = self.server_policy(ua.UserTokenType.Certificate).PolicyId - sig = uacrypto.sign_sha256(self.user_private_key, challenge) - params.UserTokenSignature.Algorithm = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256" - params.UserTokenSignature.Signature = sig - else: - params.UserIdentityToken.PolicyId = self.server_policy(ua.UserTokenType.Certificate).PolicyId - sig = uacrypto.sign_sha1(self.user_private_key, challenge) - params.UserTokenSignature.Algorithm = "http://www.w3.org/2000/09/xmldsig#rsa-sha1" - params.UserTokenSignature.Signature = sig + policy = self.server_policy(ua.UserTokenType.Certificate) + sig, alg = security_policies.sign_asymmetric(self.user_private_key, challenge, policy.SecurityPolicyUri) + params.UserIdentityToken.PolicyId = policy.PolicyId + params.UserTokenSignature.Algorithm = alg + params.UserTokenSignature.Signature = sig def _add_user_auth(self, params, username: str, password: str): params.UserIdentityToken = ua.UserNameIdentityToken() diff --git a/asyncua/crypto/security_policies.py b/asyncua/crypto/security_policies.py index 1a583dc64..8cb23394f 100644 --- a/asyncua/crypto/security_policies.py +++ b/asyncua/crypto/security_policies.py @@ -564,6 +564,10 @@ class SecurityPolicyAes128Sha256RsaOaep(SecurityPolicy): def encrypt_asymmetric(pubkey, data): return uacrypto.encrypt_rsa_oaep(pubkey, data) + @staticmethod + def sign_asymmetric(privkey, data): + return uacrypto.sign_sha256(privkey, data) + def __init__(self, peer_cert, host_cert, host_privkey, mode, permission_ruleset=None): if isinstance(peer_cert, bytes): peer_cert = uacrypto.x509_from_der(peer_cert) @@ -638,6 +642,10 @@ class SecurityPolicyAes256Sha256RsaPss(SecurityPolicy): def encrypt_asymmetric(pubkey, data): return uacrypto.encrypt_rsa_oaep_sha256(pubkey, data) + @staticmethod + def sign_asymmetric(privkey, data): + return uacrypto.sign_pss_sha256(privkey, data) + def __init__(self, peer_cert, host_cert, host_privkey, mode, permission_ruleset=None): if isinstance(peer_cert, bytes): peer_cert = uacrypto.x509_from_der(peer_cert) @@ -718,6 +726,10 @@ class SecurityPolicyBasic128Rsa15(SecurityPolicy): def encrypt_asymmetric(pubkey, data): return uacrypto.encrypt_rsa15(pubkey, data) + @staticmethod + def sign_asymmetric(privkey, data): + return uacrypto.sign_sha1(privkey, data) + def __init__(self, peer_cert, host_cert, host_privkey, mode, permission_ruleset=None): _logger.warning("DEPRECATED! Do not use SecurityPolicyBasic128Rsa15 anymore!") @@ -798,6 +810,10 @@ class SecurityPolicyBasic256(SecurityPolicy): def encrypt_asymmetric(pubkey, data): return uacrypto.encrypt_rsa_oaep(pubkey, data) + @staticmethod + def sign_asymmetric(privkey, data): + return uacrypto.sign_sha1(privkey, data) + def __init__(self, peer_cert, host_cert, host_privkey, mode, permission_ruleset=None): _logger.warning("DEPRECATED! Do not use SecurityPolicyBasic256 anymore!") @@ -878,6 +894,10 @@ class SecurityPolicyBasic256Sha256(SecurityPolicy): def encrypt_asymmetric(pubkey, data): return uacrypto.encrypt_rsa_oaep(pubkey, data) + @staticmethod + def sign_asymmetric(privkey, data): + return uacrypto.sign_sha256(privkey, data) + def __init__(self, peer_cert, host_cert, host_privkey, mode, permission_ruleset=None): if isinstance(peer_cert, bytes): peer_cert = uacrypto.x509_from_der(peer_cert) @@ -961,6 +981,26 @@ def create(self, peer_certificate): ) +def sign_asymmetric(privkey, data, policy_uri): + """ + Sign data with privkey using an asymmetric algorithm. + The algorithm is selected by policy_uri. + Returns a tuple (signature, algorithm_uri) + """ + for cls in [ + SecurityPolicyBasic256Sha256, + SecurityPolicyBasic256, + SecurityPolicyBasic128Rsa15, + SecurityPolicyAes128Sha256RsaOaep, + SecurityPolicyAes256Sha256RsaPss, + ]: + if policy_uri == cls.URI: + return (cls.sign_asymmetric(privkey, data), cls.AsymmetricSignatureURI) + if not policy_uri or policy_uri == SecurityPolicyNone.URI: + return data, "" + raise UaError(f"Unsupported security policy `{policy_uri}`") + + # policy, mode, security_level SECURITY_POLICY_TYPE_MAP = { SecurityPolicyType.NoSecurity: [SecurityPolicyNone, MessageSecurityMode.None_, 0], diff --git a/asyncua/server/internal_server.py b/asyncua/server/internal_server.py index d03085786..e1ea82c0d 100644 --- a/asyncua/server/internal_server.py +++ b/asyncua/server/internal_server.py @@ -409,3 +409,34 @@ def decrypt_user_token(self, isession, token): password = password.decode("utf-8") return user_name, password + + def verify_x509_token(self, isession, token, signature): + """ + verify certificate signature + """ + cert = uacrypto.x509_from_der(token.CertificateData) + alg = signature.Algorithm + sig = signature.Signature + + # TODO check if algorithm is allowed, throw BadSecurityPolicyRejected if not + + challenge = b"" + if self.certificate is not None: + challenge += uacrypto.der_from_x509(self.certificate) + if isession.nonce is not None: + challenge += isession.nonce + + if not (alg and sig): + raise ValueError("No signature") + + if alg == "http://www.w3.org/2000/09/xmldsig#rsa-sha1": + uacrypto.verify_sha1(cert, challenge, sig) + elif alg == "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256": + uacrypto.verify_sha256(cert, challenge, sig) + elif alg == "http://opcfoundation.org/UA/security/rsa-pss-sha2-256": + uacrypto.verify_pss_sha256(cert, challenge, sig) + else: + self.logger.warning("Unknown certificate signature algorithm %s", alg) + raise ValueError("Unknown algorithm") + + return token.CertificateData diff --git a/asyncua/server/internal_session.py b/asyncua/server/internal_session.py index 15a62e0d1..6c171f5f1 100644 --- a/asyncua/server/internal_session.py +++ b/asyncua/server/internal_session.py @@ -126,8 +126,7 @@ def activate_session(self, params, peer_certificate): if isinstance(id_token, ua.UserNameIdentityToken): username, password = self.iserver.decrypt_user_token(self, id_token) elif isinstance(id_token, ua.X509IdentityToken): - # TODO implement verify_x509_token - peer_certificate = id_token.CertificateData + peer_certificate = self.iserver.verify_x509_token(self, id_token, params.UserTokenSignature) username, password = None, None else: username, password = None, None diff --git a/asyncua/server/server.py b/asyncua/server/server.py index 56a95ff40..d3bf038e0 100644 --- a/asyncua/server/server.py +++ b/asyncua/server/server.py @@ -5,7 +5,6 @@ import asyncio import logging import math -from cryptography import x509 from datetime import timedelta, datetime import socket from urllib.parse import urlparse @@ -113,7 +112,6 @@ def __init__(self, iserver: InternalServer = None, user_manager=None): ] # allow all certificates by default self._permission_ruleset = SimpleRoleRuleset() - self.certificate: Optional[x509.Certificate] = None # Use acceptable limits buffer_sz = 65535 max_msg_sz = 100 * 1024 * 1024 # 100mb @@ -233,7 +231,7 @@ async def load_certificate(self, path_or_content: Union[str, bytes, Path], forma """ load server certificate from file, either pem or der """ - self.certificate = await uacrypto.load_certificate(path_or_content, format) + self.iserver.certificate = await uacrypto.load_certificate(path_or_content, format) async def load_private_key(self, path_or_content: Union[str, Path, bytes], password=None, format=None): self.iserver.private_key = await uacrypto.load_private_key(path_or_content, password, format) @@ -385,7 +383,7 @@ async def _setup_server_nodes(self): for policy_type in self._security_policy: policy, mode, level = security_policies.SECURITY_POLICY_TYPE_MAP[policy_type] if policy is not security_policies.SecurityPolicyNone and not ( - self.certificate and self.iserver.private_key + self.iserver.certificate and self.iserver.private_key ): no_cert = True continue @@ -394,7 +392,7 @@ async def _setup_server_nodes(self): security_policies.SecurityPolicyFactory( policy, mode, - self.certificate, + self.iserver.certificate, self.iserver.private_key, permission_ruleset=self._permission_ruleset, ) @@ -417,9 +415,21 @@ def _set_endpoints(self, policy, mode, level): idtoken = ua.UserTokenPolicy() idtoken.PolicyId = "certificate" idtoken.TokenType = ua.UserTokenType.Certificate - idtoken.SecurityPolicyUri = policy.URI - # TODO request signing if mode == ua.MessageSecurityMode.None_ (also need to verify signature then) - idtokens.append(idtoken) + # always request signing + if mode == ua.MessageSecurityMode.None_: + # find first policy with signing + for token_policy_type in self._security_policy: + token_policy, token_mode, _ = security_policies.SECURITY_POLICY_TYPE_MAP[token_policy_type] + if token_mode == ua.MessageSecurityMode.None_: + continue + idtoken.SecurityPolicyUri = token_policy.URI + idtokens.append(idtoken) + break + else: + _logger.warning("No signing policy available, user certificate cannot get verified") + else: + idtoken.SecurityPolicyUri = policy.URI + idtokens.append(idtoken) if ua.UserNameIdentityToken in tokens: idtoken = ua.UserTokenPolicy() @@ -432,7 +442,7 @@ def _set_endpoints(self, policy, mode, level): # use same policy for encryption idtoken.SecurityPolicyUri = policy.URI # try to avoid plaintext password, find first policy with encryption - elif self.certificate and self.iserver.private_key: + elif self.iserver.certificate and self.iserver.private_key: for token_policy_type in self._security_policy: token_policy, token_mode, _ = security_policies.SECURITY_POLICY_TYPE_MAP[token_policy_type] if token_mode != ua.MessageSecurityMode.SignAndEncrypt: @@ -457,8 +467,8 @@ def _set_endpoints(self, policy, mode, level): edp = ua.EndpointDescription() edp.EndpointUrl = self.endpoint.geturl() edp.Server = appdesc - if self.certificate: - edp.ServerCertificate = uacrypto.der_from_x509(self.certificate) + if self.iserver.certificate: + edp.ServerCertificate = uacrypto.der_from_x509(self.iserver.certificate) edp.SecurityMode = mode edp.SecurityPolicyUri = policy.URI edp.UserIdentityTokens = idtokens @@ -473,9 +483,9 @@ async def start(self): """ Start to listen on network """ - if self.certificate is not None: + if self.iserver.certificate is not None: # Log warnings about the certificate - uacrypto.check_certificate(self.certificate, self._application_uri, socket.gethostname()) + uacrypto.check_certificate(self.iserver.certificate, self._application_uri, socket.gethostname()) await self._setup_server_nodes() await self.iserver.start() try: diff --git a/tests/test_permissions.py b/tests/test_permissions.py index 61ba83267..4171cf3c3 100644 --- a/tests/test_permissions.py +++ b/tests/test_permissions.py @@ -10,17 +10,10 @@ pytestmark = pytest.mark.asyncio -port_num1 = 48515 -port_num2 = 48512 -port_num3 = 48516 -uri_crypto = "opc.tcp://127.0.0.1:{0:d}".format(port_num1) -uri_no_crypto = "opc.tcp://127.0.0.1:{0:d}".format(port_num2) -uri_crypto_cert = "opc.tcp://127.0.0.1:{0:d}".format(port_num3) +uri_crypto_cert = "opc.tcp://127.0.0.1:48516" BASE_DIR = Path(__file__).parent.parent EXAMPLE_PATH = BASE_DIR / "examples" -srv_crypto_params = [ - (EXAMPLE_PATH / "private-key-example.pem", EXAMPLE_PATH / "certificate-example.der"), -] +srv_crypto_params = (EXAMPLE_PATH / "private-key-example.pem", EXAMPLE_PATH / "certificate-example.der") admin_peer_creds = { "certificate": EXAMPLE_PATH / "certificates/peer-certificate-example-1.der", @@ -38,20 +31,20 @@ } -@pytest.fixture(params=srv_crypto_params) +@pytest.fixture(scope="module") async def srv_crypto_one_cert(request): cert_user_manager = CertificateUserManager() admin_peer_certificate = admin_peer_creds["certificate"] user_peer_certificate = user_peer_creds["certificate"] anonymous_peer_certificate = anonymous_peer_creds["certificate"] - key, cert = request.param + key, cert = srv_crypto_params await cert_user_manager.add_admin(admin_peer_certificate, name="Admin") await cert_user_manager.add_user(user_peer_certificate, name="User") await cert_user_manager.add_role(anonymous_peer_certificate, name="Anonymous", user_role=UserRole.Anonymous) srv = Server(user_manager=cert_user_manager) srv.set_endpoint(uri_crypto_cert) - srv.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt]) + srv.set_security_policy([ua.SecurityPolicyType.NoSecurity, ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt]) await srv.init() await srv.load_certificate(cert) await srv.load_private_key(key) @@ -67,14 +60,14 @@ async def srv_crypto_one_cert(request): await srv.stop() -async def test_permissions_admin(srv_crypto_one_cert): +async def test_client_admin(srv_crypto_one_cert): clt = Client(uri_crypto_cert) await clt.set_security( security_policies.SecurityPolicyBasic256Sha256, admin_peer_creds["certificate"], admin_peer_creds["private_key"], None, - server_certificate=srv_crypto_params[0][1], + server_certificate=srv_crypto_params[1], mode=ua.MessageSecurityMode.SignAndEncrypt, ) @@ -87,14 +80,14 @@ async def test_permissions_admin(srv_crypto_one_cert): await child.add_property(0, "MyProperty1", 3) -async def test_permissions_user(srv_crypto_one_cert): +async def test_client_user(srv_crypto_one_cert): clt = Client(uri_crypto_cert) await clt.set_security( security_policies.SecurityPolicyBasic256Sha256, user_peer_creds["certificate"], user_peer_creds["private_key"], None, - server_certificate=srv_crypto_params[0][1], + server_certificate=srv_crypto_params[1], mode=ua.MessageSecurityMode.SignAndEncrypt, ) async with clt: @@ -107,17 +100,62 @@ async def test_permissions_user(srv_crypto_one_cert): await child.add_property(0, "MyProperty2", 3) -async def test_permissions_anonymous(srv_crypto_one_cert): +async def test_client_anonymous(srv_crypto_one_cert): clt = Client(uri_crypto_cert) await clt.set_security( security_policies.SecurityPolicyBasic256Sha256, anonymous_peer_creds["certificate"], anonymous_peer_creds["private_key"], None, - server_certificate=srv_crypto_params[0][1], + server_certificate=srv_crypto_params[1], mode=ua.MessageSecurityMode.SignAndEncrypt, ) async with clt: await clt.get_endpoints() with pytest.raises(ua.uaerrors.BadUserAccessDenied): await clt.nodes.objects.get_children() + + +async def test_x509identity_user(srv_crypto_one_cert): + clt = Client(uri_crypto_cert) + await clt.load_client_certificate(user_peer_creds["certificate"]) + await clt.load_private_key(user_peer_creds["private_key"]) + async with clt: + assert await clt.get_objects_node().get_children() + objects = clt.nodes.objects + child = await objects.get_child(["0:MyObject", "0:MyVariable"]) + await child.set_value(46.0) + assert await child.read_value() == 46.0 + with pytest.raises(ua.uaerrors.BadUserAccessDenied): + await child.add_property(0, "MyProperty3", 3) + + +async def test_x509identity_anonymous(srv_crypto_one_cert): + clt = Client(uri_crypto_cert) + await clt.load_client_certificate(anonymous_peer_creds["certificate"]) + await clt.load_private_key(anonymous_peer_creds["private_key"]) + async with clt: + await clt.get_endpoints() + with pytest.raises(ua.uaerrors.BadUserAccessDenied): + await clt.nodes.objects.get_children() + + +async def test_client_user_x509identity_admin(srv_crypto_one_cert): + clt = Client(uri_crypto_cert) + await clt.set_security( + security_policies.SecurityPolicyBasic256Sha256, + user_peer_creds["certificate"], + user_peer_creds["private_key"], + None, + server_certificate=srv_crypto_params[1], + mode=ua.MessageSecurityMode.SignAndEncrypt, + ) + await clt.load_client_certificate(admin_peer_creds["certificate"]) + await clt.load_private_key(admin_peer_creds["private_key"]) + async with clt: + assert await clt.get_objects_node().get_children() + objects = clt.nodes.objects + child = await objects.get_child(["0:MyObject", "0:MyVariable"]) + await child.set_value(48.0) + assert await child.read_value() == 48.0 + await child.add_property(0, "MyProperty4", 3)