From ceeb2a2a2be107033835bc726f8f4c2355b1296d Mon Sep 17 00:00:00 2001 From: Vladimir Prusakov Date: Fri, 17 May 2024 09:40:08 +0300 Subject: [PATCH] pull_request_28 --- docs/conf.py | 2 +- fortigate_api/fortigate.py | 12 ++-- fortigate_api/fortigate_api.py | 8 +-- fortigate_api/fortigate_base.py | 38 +++++-------- pyproject.toml | 4 +- tests/test__fortigate_base.py | 99 +++++++++++++++++++++------------ 6 files changed, 88 insertions(+), 75 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index f6fa9ff6..5643e0b3 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -8,7 +8,7 @@ project = "fortigate-api" copyright = "2021, Vladimirs Prusakovs" author = "Vladimirs Prusakovs" -release = "2.0.1" +release = "2.0.2" extensions = [ "sphinx.ext.autodoc", diff --git a/fortigate_api/fortigate.py b/fortigate_api/fortigate.py index c1b1a125..c6d888bd 100644 --- a/fortigate_api/fortigate.py +++ b/fortigate_api/fortigate.py @@ -44,16 +44,16 @@ def __init__( # pylint: disable=too-many-arguments :param int timeout: Session timeout (minutes). Default is 15. :param bool verify: Transport Layer Security. - `True` - A TLS certificate required, + `True` - A trusted TLS certificate is required. `False` - Requests will accept any TLS certificate. Default is `False`. :param str vdom: Name of the virtual domain. Default is `root`. :param bool logging: Logging REST API response. - `Ture` - Enable response logging, `False` - otherwise. Default is `False`. + `True` - Enable response logging, `False` - otherwise. Default is `False`. :param bool logging_error: Logging only the REST API response with error. - `Ture` - Enable errors logging, `False` - otherwise. Default is `False`. + `True` - Enable errors logging, `False` - otherwise. Default is `False`. """ kwargs = { "host": host, @@ -74,8 +74,8 @@ def __init__( # pylint: disable=too-many-arguments def login(self) -> None: # pylint: disable=useless-parent-delegation """Login to the Fortigate using REST API and creates a Session object. - - Validate 'token' if object has been initialized with `token` parameter. - - Validate `password` if object has been initialized with `username` parameter. + - Validate `token` if object has been initialized with `token` parameter. + - Validate `password` if object has been initialized with `username` parameter. :return: None. Creates Session object. """ @@ -84,7 +84,7 @@ def login(self) -> None: # pylint: disable=useless-parent-delegation def logout(self) -> None: # pylint: disable=useless-parent-delegation """Logout from the Fortigate using REST API, deletes Session object. - - No need to logo ut if object has been initialized with `token` parameter. + - No need to log out if object has been initialized with `token` parameter. - Log out if object has been initialized with `username` parameter. :return: None. Deletes Session object diff --git a/fortigate_api/fortigate_api.py b/fortigate_api/fortigate_api.py index 6ae46d76..351164e8 100644 --- a/fortigate_api/fortigate_api.py +++ b/fortigate_api/fortigate_api.py @@ -44,17 +44,17 @@ def __init__( :param int timeout: Session timeout (minutes). Default is 15. :param bool verify: Transport Layer Security. - `True` - A TLS certificate required, + `True` - A trusted TLS certificate is required. `False` - Requests will accept any TLS certificate. Default is `False`. :param str vdom: Name of the virtual domain. Default is `root`. :param bool logging: Logging REST API response. - `Ture` - Enable response logging, `False` - otherwise. Default is `False`. + `True` - Enable response logging, `False` - otherwise. Default is `False`. :param bool logging_error: Logging only the REST API response with error. - `Ture` - Enable errors logging, `False` - otherwise. Default is `False`. + `True` - Enable errors logging, `False` - otherwise. Default is `False`. """ api_params = { "host": host, @@ -117,7 +117,7 @@ def login(self) -> None: """Login to the Fortigate using REST API and creates a Session. - Validate `token` if object has been initialized with `token` parameter. - - Validate `password` if object has been initialized with `username` parameter. + - Validate `password` if object has been initialized with `username` parameter. :return: None. Creates Session. """ diff --git a/fortigate_api/fortigate_base.py b/fortigate_api/fortigate_base.py index a9b7dd09..76b6b6b9 100644 --- a/fortigate_api/fortigate_base.py +++ b/fortigate_api/fortigate_base.py @@ -53,10 +53,10 @@ def __init__(self, **kwargs): :param str vdom: Name of the virtual domain. Default is `root`. :param bool logging: Logging REST API response. - `Ture` - Enable response logging, `False` - otherwise. Default is `False`. + `True` - Enable response logging, `False` - otherwise. Default is `False`. :param bool logging_error: Logging only the REST API response with error. - `Ture` - Enable errors logging, `False` - otherwise. Default is `False`. + `True` - Enable errors logging, `False` - otherwise. Default is `False`. """ self.host = str(kwargs.get("host")) self.username = str(kwargs.get("username")) @@ -133,8 +133,8 @@ def url(self) -> str: def login(self) -> None: """Login to the Fortigate using REST API and creates a Session object. - - Validate 'token' if object has been initialized with `token` parameter. - - Validate `password` if object has been initialized with `username` parameter. + - Validate `token` if object has been initialized with `token` parameter. + - Validate `password` if object has been initialized with `username` parameter. :return: None. Creates Session object. """ @@ -144,7 +144,7 @@ def login(self) -> None: if self.token: try: response: Response = session.get( - url=f"{self.url}/api/v2/cmdb/system/status", + url=f"{self.url}/api/v2/monitor/system/status", headers=self._bearer_token(), verify=self.verify, ) @@ -156,7 +156,7 @@ def login(self) -> None: # password try: - session.post( + response = session.post( url=f"{self.url}/logincheck", data=urlencode([("username", self.username), ("secretkey", self.password)]), timeout=self.timeout, @@ -164,12 +164,9 @@ def login(self) -> None: ) except Exception as ex: raise self._hide_secret_ex(ex) - + response.raise_for_status() token = self._get_token_from_cookies(session) session.headers.update({"X-CSRFTOKEN": token}) - - response = session.get(url=f"{self.url}/api/v2/cmdb/system/vdom") - response.raise_for_status() self._session = session def logout(self) -> None: @@ -220,21 +217,12 @@ def _get_token_from_cookies(session: Session) -> str: :raises ValueError: If the ccsrftoken cookie is absent. """ - while True: - # fortios < v7 - cookie_name = "ccsrftoken" - if cookies := [o for o in session.cookies if o and o.name == cookie_name]: - break - - # fortios >= v7 - cookie_name += "_" - if cookies := [o for o in session.cookies if o and o.name.startswith(cookie_name)]: - break - - raise ValueError("Invalid login credentials. Cookie 'ccsrftoken' is missing.") - - token = str(cookies[0].value).strip('"') - return token + cookie_prefix = "ccsrftoken" + if cookies := [o for o in session.cookies if o and o.name.startswith(cookie_prefix)]: + token = str(cookies[0].value) + token = token.strip('"') + return token + raise ValueError("Invalid login credentials. Cookie 'ccsrftoken' is missing.") def _hide_secret(self, string: str) -> str: """Hide password, secretkey in text (for safe logging).""" diff --git a/pyproject.toml b/pyproject.toml index a5435013..df39c6e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fortigate_api" -version = "2.0.1" +version = "2.0.2" description = "Python package to configure Fortigate (Fortios) devices using REST API and SSH" authors = ["Vladimirs Prusakovs "] readme = "README.rst" @@ -55,7 +55,7 @@ test = ["pytest"] [tool.poetry.urls] "Bug Tracker" = "https://github.com/vladimirs-git/fortigate-api/issues" -"Download URL" = "https://github.com/vladimirs-git/fortigate-api/archive/refs/tags/2.0.1.tar.gz" +"Download URL" = "https://github.com/vladimirs-git/fortigate-api/archive/refs/tags/2.0.2.tar.gz" [tool.pylint] max-line-length = 100 diff --git a/tests/test__fortigate_base.py b/tests/test__fortigate_base.py index 10d872e4..b14a1ff4 100644 --- a/tests/test__fortigate_base.py +++ b/tests/test__fortigate_base.py @@ -1,9 +1,12 @@ """Test fortigate_base.py""" +from collections import OrderedDict from unittest.mock import patch import pytest +import requests_mock from pytest_mock import MockerFixture from requests import Session +from requests_mock import Mocker from fortigate_api import fortigate_base from fortigate_api.fortigate import FortiGate @@ -96,12 +99,53 @@ def test__url(scheme, host, port, expected): assert actual == expected +# ============================ login ============================= + +# noinspection PyUnresolvedReferences +@pytest.mark.parametrize("token, expected, headers", [ + ("", "TOKEN", ("X-CSRFTOKEN", "TOKEN")), + ("TOKEN", "", None), +]) +def test__login(token, expected, headers): + """FortiGateBase.login() for username.""" + api = FortiGate(host="HOST", token=token) + with requests_mock.Mocker() as mock: + if token: + mock.get("https://host/api/v2/monitor/system/status") + else: + mock.post("https://host/logincheck") + + with patch("fortigate_api.FortiGate._get_token_from_cookies", return_value=expected): + api.login() + assert isinstance(api._session, Session) + store: OrderedDict = getattr(api._session.headers, "_store") + actual = store.get("x-csrftoken") + assert actual == headers + + +# =========================== helpers ============================ + +def test__get_session(api: FortiGate, mocker: MockerFixture): + """FortiGateBase._get_session()""" + # session + api._session = Session() + session = api._get_session() + assert isinstance(session, Session) + + # login + api._session = None + mock_response = mocker.Mock() + mocker.patch(target="requests.Session.post", return_value=mock_response) + mocker.patch(target="requests.Session.get", return_value=tst.crate_response(200)) + with patch("fortigate_api.FortiGate._get_token_from_cookies", return_value="token"): + session = api._get_session() + assert isinstance(session, Session) is True + + @pytest.mark.parametrize("name, expected", [ ("ccsrftoken", "token"), # < v7 ("ccsrftoken_443", "token"), # >= v7 ("ccsrftoken_443_3334d10", "token"), # >= v7 - ("ccsrftokenother-name", ValueError), - ("ccsrftoken-other-name", ValueError), ("other-name", ValueError), ]) def test__get_token_from_cookies(api: FortiGate, name, expected): @@ -115,6 +159,22 @@ def test__get_token_from_cookies(api: FortiGate, name, expected): api._get_token_from_cookies(session=session) +@pytest.mark.parametrize("string, password, expected", [ + ("", "", ""), + ("", "secret", ""), + ("text", "", "text"), + ("text", "secret", "text"), + ("_secret_", "secret", "__"), + ("_%5B_", "secret", "_%5B_"), + ("_secret%5B_", "secret[", "__"), +]) +def test__hide_secret(string, password, expected): + """FortiGateBase._hide_secret()""" + fgt = FortiGate(host="host", password=password) + actual = fgt._hide_secret(string=string) + assert actual == expected + + @pytest.mark.parametrize("kwargs, scheme, expected", [ ({}, "https", 443), ({}, "http", 80), @@ -160,41 +220,6 @@ def test__valid_url(kwargs, url, expected): assert actual == expected -@pytest.mark.parametrize("string, password, expected", [ - ("", "", ""), - ("", "secret", ""), - ("text", "", "text"), - ("text", "secret", "text"), - ("_secret_", "secret", "__"), - ("_%5B_", "secret", "_%5B_"), - ("_secret%5B_", "secret[", "__"), -]) -def test__hide_secret(string, password, expected): - """FortiGateBase._hide_secret()""" - fgt = FortiGate(host="host", password=password) - actual = fgt._hide_secret(string=string) - assert actual == expected - - -# =========================== helpers ============================ - -def test__get_session(api: FortiGate, mocker: MockerFixture): - """FortiGateBase._get_session()""" - # session - api._session = Session() - session = api._get_session() - assert isinstance(session, Session) - - # login - api._session = None - mock_response = mocker.Mock() - mocker.patch(target="requests.Session.post", return_value=mock_response) - mocker.patch(target="requests.Session.get", return_value=tst.crate_response(200)) - with patch("fortigate_api.FortiGate._get_token_from_cookies", return_value="token"): - session = api._get_session() - assert isinstance(session, Session) is True - - # =========================== helpers ============================ @pytest.mark.parametrize("kwargs, expected", [