forked from FreeOpcUa/opcua-asyncio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_external_server.py
142 lines (103 loc) · 3.41 KB
/
test_external_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""
Test an OPC-UA server with freeopcua python client
"""
import sys
import asyncio
import logging
from datetime import datetime, timezone
from asyncua import ua
from asyncua import Client
import unittest
class MySubHandler:
"""
More advanced subscription client using Future, so we can wait for events in tests
"""
def __init__(self):
self.future = asyncio.Future()
def reset(self):
self.future = asyncio.Future()
def datachange_notification(self, node, val, data):
self.future.set_result((node, val, data))
def event_notification(self, event):
self.future.set_result(event)
class MySubHandler2(object):
def __init__(self):
self.results = []
def datachange_notification(self, node, val, data):
self.results.append((node, val))
def event_notification(self, event):
self.results.append(event)
def connect(func):
def wrapper(self):
try:
client = Client(URL)
client.connect()
func(self, client)
finally:
client.disconnect()
return wrapper
def test_connect_anonymous(self):
c = Client(URL)
c.connect()
c.disconnect()
def FINISH_test_connect_basic256(self):
c = Client(URL)
c.set_security_string("basic256,sign,XXXX")
c.connect()
c.disconnect()
def test_find_servers(self):
c = Client(URL)
res = c.connect_and_find_servers()
assert len(res) > 0
def test_find_endpoints(self):
c = Client(URL)
res = c.connect_and_get_server_endpoints()
assert len(res) > 0
# @connect
def test_get_root(self, client):
root = client.nodes.root
self.assertEqual(root.read_browse_name(), ua.QualifiedName("Root", 0))
# @connect
def test_get_root_children(self, client):
root = client.nodes.root
childs = root.get_children()
assert len(childs) > 2
# @connect
async def test_get_namespace_array(self, client):
array = await client.get_namespace_array()
assert len(array) > 0
# @connect
def test_get_server_node(self, client):
srv = client.nodes.server
self.assertEqual(srv.read_browse_name(), ua.QualifiedName("Server", 0))
#childs = srv.get_children()
#assert len(childs) > 4)
# @connect
def test_browsepathtonodeid(self, client):
root = client.nodes.root
node = root.get_child(["0:Objects", "0:Server", "0:ServerArray"])
self.assertEqual(node.read_browse_name(), ua.QualifiedName("ServerArray", 0))
# @connect
def test_subscribe_server_time(self, client):
msclt = MySubHandler()
server_time_node = client.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
sub = client.create_subscription(200, msclt)
handle = sub.subscribe_data_change(server_time_node)
node, val, data = msclt.future.result()
self.assertEqual(node, server_time_node)
delta = datetime.now(timezone.utc) - val
print("Timedelta is ", delta)
#assert delta < timedelta(seconds=2))
sub.unsubscribe(handle)
sub.delete()
if __name__ == "__main__":
logging.basicConfig(level=logging.WARN)
# FIXME add better arguments parsing with possibility to specify
# username and password and encryption
if len(sys.argv) < 2:
print("This script is meant to test compatibilty to a server with freeopcua python client library")
print("Usage: test_server.py url")
sys.exit(1)
else:
URL = sys.argv[1]
unittest.main(verbosity=30, argv=sys.argv[:1])