Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add server-side session watchdog #1655

Merged
merged 7 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 69 additions & 3 deletions asyncua/server/internal_session.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import asyncio
import logging
from datetime import datetime, timedelta
from enum import Enum
from typing import Iterable, Optional, List, Tuple, TYPE_CHECKING
from functools import wraps
from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple

from asyncua import ua
from asyncua.common.session_interface import AbstractSession
Expand Down Expand Up @@ -48,6 +51,11 @@ def __init__(self, internal_server: "InternalServer", aspace: AddressSpace, subm
self.auth_token = ua.NodeId(self._auth_counter)
InternalSession._auth_counter += 1
self.logger.info('Created internal session %s', self.name)
self.session_timeout = None
self._closing: bool = False
self._session_watchdog_task: Optional[asyncio.Task] = None
self._watchdog_interval: float = 1.0
self._last_action_time: Optional[datetime] = None

def __str__(self):
return f'InternalSession(name:{self.name},' \
Expand All @@ -59,13 +67,44 @@ async def get_endpoints(self, params=None, sockname=None):
def is_activated(self) -> bool:
return self.state == SessionState.Activated

def update_last_action_time(func):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what you want to do, but I have bad experience with that kind of decorators. they make stack trace a mess and completlu breaks typing checks. if we want t omerge that we at least need typing and check things with mypy (which probbaly only does not complain because since you do not have typing on decorator so it disabled itself)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it is better to update some timestamp at lower level when we receive a request which is dispatched to thata internal session. IS that possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I moved it into the UA processor

"""Decorator to set last action time."""
if asyncio.iscoroutinefunction(func):
@wraps(func)
async def wrapper(self, *args, **kwargs):
self._last_action_time = datetime.now()
result = await func(self, *args, **kwargs)
return result
else:
@wraps(func)
def wrapper(self, *args, **kwargs):
self._last_action_time = datetime.now()
result = func(self, *args, **kwargs)
return result
return wrapper

async def _session_watchdog_loop(self):
"""
Checks if the session is alive
"""
timeout = min(self.session_timeout / 2, self._watchdog_interval)
timeout_timedelta = timedelta(seconds=self.session_timeout)
while not self._closing:
await asyncio.sleep(timeout)
print(self._last_action_time)
if (datetime.now() - timeout_timedelta) > self._last_action_time:
self.logger.warning('Session timed out after %ss of inactivity', timeout_timedelta.total_seconds())
await self.close_session()

@update_last_action_time
async def create_session(self, params: ua.CreateSessionParameters, sockname: Optional[Tuple[str, int]] = None):
self.logger.info('Create session request')
result = ua.CreateSessionResult()
result.SessionId = self.session_id
result.AuthenticationToken = self.auth_token
result.RevisedSessionTimeout = params.RequestedSessionTimeout
result.MaxRequestMessageSize = 65536
self.session_timeout = result.RevisedSessionTimeout / 1000

if self.iserver.certificate_validator and params.ClientCertificate:
await self.iserver.certificate_validator(x509.load_der_x509_certificate(params.ClientCertificate), params.ClientDescription)
Expand All @@ -76,18 +115,22 @@ async def create_session(self, params: ua.CreateSessionParameters, sockname: Opt
ep_params = ua.GetEndpointsParameters()
ep_params.EndpointUrl = params.EndpointUrl
result.ServerEndpoints = await self.get_endpoints(params=ep_params, sockname=sockname)

self._session_watchdog_task = asyncio.create_task(self._session_watchdog_loop())
return result

@update_last_action_time
async def close_session(self, delete_subs=True):
self.logger.info('close session %s', self.name)
self._closing = True
if self.state == SessionState.Activated:
InternalSession._current_connections -= 1
if InternalSession._current_connections < 0:
InternalSession._current_connections = 0
self.state = SessionState.Closed
await self.delete_subscriptions(self.subscriptions)
if delete_subs:
await self.delete_subscriptions(self.subscriptions)

@update_last_action_time
def activate_session(self, params, peer_certificate):
self.logger.info('activate session')
result = ua.ActivateSessionResult()
Expand Down Expand Up @@ -128,6 +171,7 @@ def activate_session(self, params, peer_certificate):
self.logger.info("Activated internal session %s for user %s", self.name, self.user)
return result

@update_last_action_time
async def read(self, params):
if self.user is None:
user = User()
Expand All @@ -140,9 +184,11 @@ async def read(self, params):
ServerItemCallback(params, results, user, self.external))
return results

@update_last_action_time
async def history_read(self, params) -> List[ua.HistoryReadResult]:
return await self.iserver.history_manager.read_history(params)

@update_last_action_time
async def write(self, params):
if self.user is None:
user = User()
Expand All @@ -155,51 +201,64 @@ async def write(self, params):
ServerItemCallback(params, write_result, user, self.external))
return write_result

@update_last_action_time
async def browse(self, params):
return self.iserver.view_service.browse(params)

@update_last_action_time
async def browse_next(self, parameters: ua.BrowseNextParameters) -> List[ua.BrowseResult]:
# TODO
# ContinuationPoint: https://reference.opcfoundation.org/v104/Core/docs/Part4/7.6/
# Add "ContinuationPoints" and some form of management for them to current sessionimplementation
# BrowseNext: https://reference.opcfoundation.org/Core/Part4/v104/5.8.3/
raise NotImplementedError

@update_last_action_time
async def register_nodes(self, nodes: List[ua.NodeId]) -> List[ua.NodeId]:
self.logger.info("Node registration not implemented")
return nodes

@update_last_action_time
async def unregister_nodes(self, nodes: List[ua.NodeId]) -> List[ua.NodeId]:
self.logger.info("Node registration not implemented")
return nodes

@update_last_action_time
async def translate_browsepaths_to_nodeids(self, params):
return self.iserver.view_service.translate_browsepaths_to_nodeids(params)

@update_last_action_time
async def add_nodes(self, params):
return self.iserver.node_mgt_service.add_nodes(params, self.user)

@update_last_action_time
async def delete_nodes(self, params):
return self.iserver.node_mgt_service.delete_nodes(params, self.user)

@update_last_action_time
async def add_references(self, params):
return self.iserver.node_mgt_service.add_references(params, self.user)

@update_last_action_time
async def delete_references(self, params):
return self.iserver.node_mgt_service.delete_references(params, self.user)

@update_last_action_time
def add_method_callback(self, methodid, callback):
return self.aspace.add_method_callback(methodid, callback)

@update_last_action_time
async def call(self, params):
"""COROUTINE"""
return await self.iserver.method_service.call(params)

@update_last_action_time
async def create_subscription(self, params, callback, request_callback=None):
result = await self.subscription_service.create_subscription(params, callback, request_callback=request_callback)
self.subscriptions.append(result.SubscriptionId)
return result

@update_last_action_time
async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
"""Returns Future"""
subscription_result = await self.subscription_service.create_monitored_items(params)
Expand All @@ -208,20 +267,24 @@ async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters
self.external))
return subscription_result

@update_last_action_time
async def modify_monitored_items(self, params):
subscription_result = self.subscription_service.modify_monitored_items(params)
await self.iserver.callback_service.dispatch(CallbackType.ItemSubscriptionModified,
ServerItemCallback(params, subscription_result, None,
self.external))
return subscription_result

@update_last_action_time
def republish(self, params):
return self.subscription_service.republish(params)

@update_last_action_time
async def delete_subscriptions(self, ids):
# This is an async method, dues to symmetry with client code
return await self.subscription_service.delete_subscriptions(ids)

@update_last_action_time
async def delete_monitored_items(self, params):
# This is an async method, dues to symmetry with client code
subscription_result = self.subscription_service.delete_monitored_items(params)
Expand All @@ -231,12 +294,15 @@ async def delete_monitored_items(self, params):

return subscription_result

@update_last_action_time
def publish(self, acks: Optional[Iterable[ua.SubscriptionAcknowledgement]] = None):
return self.subscription_service.publish(acks or [])

@update_last_action_time
def modify_subscription(self, params):
return self.subscription_service.modify_subscription(params)

@update_last_action_time
async def transfer_subscriptions(self, params: ua.TransferSubscriptionsParameters) -> List[ua.TransferResult]:
# Subscriptions aren't bound to a Session and can be transfered!
# https://reference.opcfoundation.org/Core/Part4/v104/5.13.7/
Expand Down
19 changes: 18 additions & 1 deletion tests/test_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import struct

from asyncua import Client, Server, ua
from asyncua.ua.uaerrors import BadMaxConnectionsReached
from asyncua.ua.uaerrors import BadMaxConnectionsReached, BadSessionNotActivated
from .conftest import port_num, find_free_port

pytestmark = pytest.mark.asyncio
Expand Down Expand Up @@ -77,3 +77,20 @@ def status_change_notification(self, status: ua.StatusChangeNotification):
# check if a exception is thrown when a normal function is called
with pytest.raises(ConnectionError):
await cl.get_namespace_array()


async def test_session_watchdog():
port = find_free_port()
srv = Server()
await srv.init()
srv.set_endpoint(f"opc.tcp://127.0.0.1:{port}")
await srv.start()
client = Client(f"opc.tcp://127.0.0.1:{port}", timeout=0.5, watchdog_intervall=1)
client.session_timeout = 1000 # 1 second
await client.connect()
client._closing = True # Kill the keepalive tasks
await asyncio.sleep(2) # Wait for the watchdog to terminate the session due to inactivity
with pytest.raises(BadSessionNotActivated):
server_time_node = client.get_node(ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
await server_time_node.read_value()
await client.disconnect()