-
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
96 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,52 +1,128 @@ | ||
import pytest | ||
import socket | ||
from unittest.mock import patch, MagicMock | ||
from testproxy import check_open_ports, get_ssl_info, check_http_headers, detect_proxy | ||
from proxy_detector import check_open_ports, get_ssl_info, check_http_headers, detect_waf, detect_proxy | ||
|
||
# Mock host to use in tests | ||
MOCK_HOST = 'example.com' | ||
|
||
def test_check_open_ports(): | ||
ports = [80, 443, 8080] | ||
# Assuming the ports are closed, we expect an empty list in return | ||
with patch('socket.create_connection', side_effect=socket.timeout): | ||
with patch('socket.create_connection') as mock_connection: | ||
mock_connection.side_effect = [socket.timeout, MagicMock(), socket.timeout] | ||
open_ports = check_open_ports(MOCK_HOST, ports) | ||
assert open_ports == [] | ||
assert open_ports == [443] | ||
|
||
def test_get_ssl_info(): | ||
# Mock the SSL socket and cert data | ||
mock_cert = { | ||
'subject': ((('commonName', 'example.com'),),), | ||
'issuer': ((('countryName', 'US'),), (('organizationName', 'DigiCert Inc'),), (('commonName', 'DigiCert SHA2 Secure Server CA'),)), | ||
'version': 3 | ||
} | ||
mock_cert = MagicMock() | ||
mock_cert.subject.rfc4514_string.return_value = "CN=example.com" | ||
mock_cert.issuer.rfc4514_string.return_value = "C=US,O=DigiCert Inc,CN=DigiCert SHA2 Secure Server CA" | ||
mock_cert.version = 3 | ||
mock_cert.not_valid_before.strftime.return_value = "2023-01-01 00:00:00" | ||
mock_cert.not_valid_after.strftime.return_value = "2024-01-01 00:00:00" | ||
mock_cert.serial_number = 12345678901234567890 | ||
mock_cert.signature_algorithm_oid._name = "sha256WithRSAEncryption" | ||
|
||
with patch('ssl.create_default_context'), \ | ||
patch('socket.create_connection', MagicMock()), \ | ||
patch('ssl.SSLSocket.getpeercert', return_value=mock_cert): | ||
patch('socket.create_connection'), \ | ||
patch('ssl.SSLSocket.getpeercert', return_value=mock_cert), \ | ||
patch('cryptography.x509.load_der_x509_certificate', return_value=mock_cert): | ||
|
||
ssl_info = get_ssl_info(MOCK_HOST) | ||
assert ssl_info['subject'] == ((('commonName', 'example.com'),),) | ||
assert ssl_info['subject'] == "CN=example.com" | ||
assert ssl_info['issuer'] == "C=US,O=DigiCert Inc,CN=DigiCert SHA2 Secure Server CA" | ||
assert ssl_info['version'] == 3 | ||
assert ssl_info['not_valid_before'] == "2023-01-01 00:00:00" | ||
assert ssl_info['not_valid_after'] == "2024-01-01 00:00:00" | ||
assert ssl_info['serial_number'] == 12345678901234567890 | ||
assert ssl_info['signature_algorithm'] == "sha256WithRSAEncryption" | ||
|
||
def test_check_http_headers(): | ||
mock_headers = { | ||
'Server': 'Apache', | ||
'Content-Type': 'text/html; charset=UTF-8' | ||
'Content-Type': 'text/html; charset=UTF-8', | ||
'X-Forwarded-For': '10.0.0.1' | ||
} | ||
|
||
with patch('requests.head') as mock_request: | ||
mock_response = MagicMock() | ||
mock_response.headers = mock_headers | ||
mock_response.status_code = 200 | ||
mock_response.history = [] | ||
mock_request.return_value = mock_response | ||
|
||
headers = check_http_headers(f'http://{MOCK_HOST}') | ||
headers, status_code, history = check_http_headers(f'http://{MOCK_HOST}') | ||
assert headers == mock_headers | ||
assert status_code == 200 | ||
assert history == [] | ||
|
||
def test_detect_waf(): | ||
headers_with_waf = { | ||
'Server': 'Apache', | ||
'X-WAF-Rate-Limit': '100', | ||
'cf-ray': '12345678901234567-IAD' | ||
} | ||
detected_wafs = detect_waf(headers_with_waf) | ||
assert 'Generic WAF' in detected_wafs | ||
assert 'Cloudflare WAF' in detected_wafs | ||
|
||
headers_without_waf = { | ||
'Server': 'Apache', | ||
'Content-Type': 'text/html; charset=UTF-8' | ||
} | ||
detected_wafs = detect_waf(headers_without_waf) | ||
assert len(detected_wafs) == 0 | ||
|
||
def test_detect_proxy(): | ||
# Mock detect_proxy by just checking for open ports and headers | ||
mock_results = { | ||
'host': MOCK_HOST, | ||
'ip': '93.184.216.34', | ||
'open_ports': [80, 443], | ||
'ssl_info': { | ||
'subject': 'CN=example.com', | ||
'issuer': 'C=US,O=DigiCert Inc,CN=DigiCert SHA2 Secure Server CA', | ||
'version': 3, | ||
'not_valid_before': '2023-01-01 00:00:00', | ||
'not_valid_after': '2024-01-01 00:00:00', | ||
'serial_number': 12345678901234567890, | ||
'signature_algorithm': 'sha256WithRSAEncryption' | ||
}, | ||
'http_headers': { | ||
'Server': 'Apache', | ||
'X-Forwarded-For': '10.0.0.1' | ||
}, | ||
'https_headers': { | ||
'Server': 'Apache', | ||
'Strict-Transport-Security': 'max-age=31536000' | ||
}, | ||
'proxy_indicators': ['X-Forwarded-For'], | ||
'waf_detected': ['Generic WAF'], | ||
'redirects': [] | ||
} | ||
|
||
with patch('socket.gethostbyname', return_value='93.184.216.34'), \ | ||
patch('test_proxy.check_open_ports', return_value=[80, 443]), \ | ||
patch('test_proxy.get_ssl_info', return_value=None), \ | ||
patch('test_proxy.check_http_headers', return_value=None): | ||
patch('proxy_detector.check_open_ports', return_value=[80, 443]), \ | ||
patch('proxy_detector.get_ssl_info', return_value=mock_results['ssl_info']), \ | ||
patch('proxy_detector.check_http_headers', side_effect=[ | ||
(mock_results['http_headers'], 200, []), | ||
(mock_results['https_headers'], 200, []) | ||
]), \ | ||
patch('proxy_detector.detect_waf', return_value=['Generic WAF']): | ||
|
||
results = detect_proxy(MOCK_HOST, 'json') | ||
assert isinstance(results, str) # Ensure JSON string is returned | ||
|
||
detect_proxy(MOCK_HOST) # We only want to make sure the function runs without exceptions | ||
import json | ||
parsed_results = json.loads(results) | ||
assert parsed_results['host'] == MOCK_HOST | ||
assert parsed_results['ip'] == '93.184.216.34' | ||
assert parsed_results['open_ports'] == [80, 443] | ||
assert parsed_results['ssl_info'] == mock_results['ssl_info'] | ||
assert parsed_results['http_headers'] == mock_results['http_headers'] | ||
assert parsed_results['https_headers'] == mock_results['https_headers'] | ||
assert parsed_results['proxy_indicators'] == ['X-Forwarded-For'] | ||
assert parsed_results['waf_detected'] == ['Generic WAF'] | ||
assert parsed_results['redirects'] == [] | ||
|
||
if __name__ == "__main__": | ||
pytest.main() |