forked from IanDesuyo/CloudflareGatewayAdBlock
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cloudflare.py
120 lines (84 loc) · 3.34 KB
/
cloudflare.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from typing import List
import requests
import logging
import os
logger = logging.getLogger("cloudflare")
CF_API_TOKEN = os.environ["CF_API_TOKEN"]
CF_IDENTIFIER = os.environ["CF_IDENTIFIER"]
if not CF_API_TOKEN or not CF_IDENTIFIER:
raise Exception("Missing Cloudflare credentials")
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {CF_API_TOKEN}"})
def get_lists(name_prefix: str):
r = session.get(
f"https://api.cloudflare.com/client/v4/accounts/{CF_IDENTIFIER}/gateway/lists",
)
logger.debug(f"[get_lists] {r.status_code}")
if r.status_code != 200:
raise Exception("Failed to get Cloudflare lists")
lists = r.json()["result"] or []
return [l for l in lists if l["name"].startswith(name_prefix)]
def create_list(name: str, domains: List[str]):
r = session.post(
f"https://api.cloudflare.com/client/v4/accounts/{CF_IDENTIFIER}/gateway/lists",
json={
"name": name,
"description": "Created by script.",
"type": "DOMAIN",
"items": [*map(lambda d: {"value": d}, domains)],
},
)
logger.debug(f"[create_list] {r.status_code}")
if r.status_code != 200:
raise Exception("Failed to create Cloudflare list")
return r.json()["result"]
def delete_list(list_id: str):
r = session.delete(
f"https://api.cloudflare.com/client/v4/accounts/{CF_IDENTIFIER}/gateway/lists/{list_id}",
)
logger.debug(f"[delete_list] {r.status_code}")
if r.status_code != 200:
raise Exception("Failed to delete Cloudflare list")
return r.json()["result"]
def get_firewall_policies(name_prefix: str):
r = session.get(
f"https://api.cloudflare.com/client/v4/accounts/{CF_IDENTIFIER}/gateway/rules",
)
logger.debug(f"[get_firewall_policies] {r.status_code}")
if r.status_code != 200:
raise Exception("Failed to get Cloudflare firewall policies")
lists = r.json()["result"] or []
return [l for l in lists if l["name"].startswith(name_prefix)]
def create_gateway_policy(name: str, list_ids: List[str]):
r = session.post(
f"https://api.cloudflare.com/client/v4/accounts/{CF_IDENTIFIER}/gateway/rules",
json={
"name": name,
"description": "Created by script.",
"action": "block",
"enabled": True,
"filters": ["dns"],
"traffic": "or".join([f"any(dns.domains[*] in ${l})" for l in list_ids]),
"rule_settings": {
"block_page_enabled": False,
},
},
)
logger.debug(f"[create_gateway_policy] {r.status_code}")
if r.status_code != 200:
raise Exception("Failed to create Cloudflare firewall policy")
return r.json()["result"]
def update_gateway_policy(name: str, policy_id: str, list_ids: List[str]):
r = session.put(
f"https://api.cloudflare.com/client/v4/accounts/{CF_IDENTIFIER}/gateway/rules/{policy_id}",
json={
"name": name,
"action": "block",
"enabled": True,
"traffic": "or".join([f"any(dns.domains[*] in ${l})" for l in list_ids]),
},
)
logger.debug(f"[update_gateway_policy] {r.status_code}")
if r.status_code != 200:
raise Exception("Failed to update Cloudflare firewall policy")
return r.json()["result"]