-
Notifications
You must be signed in to change notification settings - Fork 0
/
purple-air.py
executable file
·172 lines (151 loc) · 5.26 KB
/
purple-air.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#!/usr/bin/env python3
"""Purple-Air Sensor Reader
This script loads the current data (pm2.5, humidity, temperature,
pressure) from a given PurpleAir sensor and then calculates the
AQI, Describers (name, color, description), and a color value to
apply to a Inovelli switch.
This expects the sensor ID to be passed as the first value of the
script.
"""
import urllib.request
import json
import math
import sys
if len(sys.argv) != 2:
print('Usage: ./purple-air.py SENSORID', file=sys.stderr)
exit(1)
SensorID = sys.argv[1]
AQINames = [
'Very Hazardous',
'Hazardous',
'Very Unhealthy',
'Unhealthy',
'Unhealthy for Sensitive Groups',
'Moderate',
'Good'
]
AQICalculations = {
# AQI Range, PM2.5 Range, Color Range, Name, Color, Description
'Good': {
'aqi': (0, 50),
'pm2.5': (0, 12),
'colorv': (84, 42),
'name': 'Good',
'color': 'Green',
'desc': 'Air quality is considered satisfactory, and air pollution poses little or no risk.'
},
'Moderate': {
'aqi': (51, 100),
'pm2.5': (12.1, 35.4),
'colorv': (42, 21),
'name': 'Moderate',
'color': 'Yellow',
'desc': 'Air quality is acceptable; however, for some pollutants there may be a moderate health concern for a very small number of people who are unusually sensitive to air pollution.'
},
'Unhealthy for Sensitive Groups': {
'aqi': (101, 150),
'pm2.5': (35.5, 55.4),
'colorv': (21, 1),
'name': 'Unhealthy for Sensitive Groups',
'color': 'Orange',
'desc': 'Members of sensitive groups may experience health effects. The general public is not likely to be affected.'
},
'Unhealthy': {
'aqi': (151, 200),
'pm2.5': (55.5, 150.4),
'colorv': (255, 210),
'name': 'Unhealthy',
'color': 'Red',
'desc': 'Everyone may begin to experience health effects; members of sensitive groups may experience more serious health effects.'
},
'Very Unhealthy': {
'aqi': (201, 300),
'pm2.5': (150.5, 250.4),
'colorv': (210, 189),
'name': 'Very Unhealthy',
'color': 'Purple',
'desc': 'Health warnings of emergency conditions. The entire population is more likely to be affected.'
},
'Hazardous': {
'aqi': (301, 400),
'pm2.5': (250.5, 350.4),
'colorv': (189, 189),
'name': 'Hazardous',
'color': 'Black',
'desc': 'Health alert: everyone may experience more serious health effects.'
},
'Very Hazardous': {
'aqi': (401, 500),
'pm2.5': (350.5, 500),
'colorv': (189, 189),
'name': 'Very Hazardous',
'color': 'Black',
'desc': 'Health alert: everyone may experience more serious health effects.'
}
}
def aqiFromPM(pm):
for name in AQINames:
if pm > AQICalculations[name]['pm2.5'][0]:
return calcAvgRange(pm,
AQICalculations[name]['aqi'][1],
AQICalculations[name]['aqi'][0],
AQICalculations[name]['pm2.5'][1],
AQICalculations[name]['pm2.5'][0]
)
return 0
def getColorV(aqi):
name = getName(aqi)
return calcAvgRange(aqi,
AQICalculations[name]['colorv'][1],
AQICalculations[name]['colorv'][0],
AQICalculations[name]['aqi'][1],
AQICalculations[name]['aqi'][0]
)
def getName(aqi):
for name in AQINames:
if aqi > AQICalculations[name]['aqi'][0]:
return name
return 'Good'
def getColor(aqi):
return AQICalculations[getName(aqi)]['color']
def getDesc(aqi):
return AQICalculations[getName(aqi)]['desc']
# Cp = Current Value
# Ih = High Output
# Il = Low Output
# BPh = Value High
# BPl = Value Low
def calcAvgRange(Cp, Ih, Il, BPh, BPl):
a = (Ih - Il)
b = (BPh - BPl)
c = (Cp - BPl)
return round((a/b) * c + Il)
with urllib.request.urlopen('https://www.purpleair.com/json?show={}'.format(SensorID)) as url:
data = json.loads(url.read().decode())
if len(data['results']) == 0:
print('Sensor {} not found on PurpleAir servers'.format(SensorID), file=sys.stderr)
exit(1)
pm2a = float(data['results'][0]['PM2_5Value'])
pm2b = float(data['results'][1]['PM2_5Value'])
pm2 = (pm2a + pm2b) / 2
aqi = aqiFromPM(pm2)
output = {
# Raw PM2.5 (µg/m³)
# Raw PM2.5 measurements in micrograms per cubic meter of air
'pm2.5': pm2,
# Humidity (%)
# Humidity, corrected by adding 4% as a general correction to account for heat generated by the WiFi module in a PurpleAir sensor. This will not be accurate for all situations.
'humidity': float(data['results'][0]['humidity']) + 4,
# Temperature (°F)
# A correction of -8°F is applied to the temperature readings in a PurpleAir sensor. This is a general correction and may not be applicable to all situations.
'temp_f': float(data['results'][0]['temp_f']) - 8,
# Raw Pressure (mbar)
# Atmospheric pressure in millibars
'pressure': float(data['results'][0]['pressure']),
'aqi': aqi,
'name': getName(aqi),
'description': getDesc(aqi),
'color': getColor(aqi),
'colorv': getColorV(aqi)
}
print(json.dumps(output))