189 lines
4.3 KiB
Python
189 lines
4.3 KiB
Python
import datetime
|
|
import json
|
|
import platform
|
|
from time import sleep
|
|
|
|
DELAY = 0.1
|
|
|
|
|
|
def is_raspberry_pi() -> bool:
|
|
return platform.machine() in ("armv7l", "armv6l")
|
|
|
|
|
|
if is_raspberry_pi():
|
|
import board
|
|
import busio
|
|
import adafruit_ms8607 as ms8607
|
|
|
|
|
|
class Reading:
|
|
def __init__(self, t, p, h, normal=False):
|
|
if not normal:
|
|
if t < -40.0 or t > 85.0:
|
|
raise TemperatureError(t)
|
|
if h < 0.0 or h > 100.0:
|
|
raise HumidityError(h)
|
|
if p < 10 or p > 2000:
|
|
raise PressureError(p)
|
|
|
|
self.time = datetime.datetime.now()
|
|
self.temp = t
|
|
self.press = p
|
|
self.hum = h
|
|
self.normal = normal
|
|
|
|
def __repr__(self):
|
|
return f'Reading@{self.time.timestamp()}(t={self.temp},p={self.press},h={self.hum})'
|
|
|
|
def __str__(self):
|
|
return f'{self.time.strftime("%F %T %Z")},{self.time.timestamp()},{self.temp},{self.press},{self.hum}'
|
|
|
|
def timestamp(self):
|
|
return int(self.time.timestamp())
|
|
|
|
def list(self):
|
|
return [
|
|
self.time.strftime("%F %T %Z"),
|
|
self.timestamp(),
|
|
self.temp,
|
|
self.press,
|
|
self.hum
|
|
]
|
|
|
|
def normalize(self):
|
|
"""
|
|
normal returns a normalised average of PHT.
|
|
"""
|
|
|
|
if self.normal:
|
|
return self
|
|
return Reading(
|
|
normal_temp(self.temp),
|
|
normal_press(self.press),
|
|
normal_humidity(self.hum),
|
|
normal=True
|
|
)
|
|
|
|
def average(self):
|
|
if not self.normal:
|
|
return self.normalize().average()
|
|
return average([self.temp, self.press, self.hum])
|
|
|
|
def json(self) -> bytes:
|
|
return bytes(json.dumps({
|
|
'timestamp': self.timestamp(),
|
|
'temperature': self.temp,
|
|
'pressure': self.press,
|
|
'relative_humidity': self.hum,
|
|
}).encode('utf-8'))
|
|
|
|
|
|
class SensorError(Exception):
|
|
def __init__(self, component, value):
|
|
self.component = component
|
|
self.value = value
|
|
|
|
def __repr__(self):
|
|
return f'SensorError: {self.component} out of range: {self.value}'
|
|
|
|
def __str__(self):
|
|
return repr(self)
|
|
|
|
|
|
class TemperatureError(SensorError):
|
|
def __init__(self, value):
|
|
super().__init__('temperature', value)
|
|
|
|
|
|
class PressureError(SensorError):
|
|
def __init__(self, value):
|
|
super().__init__('pressure', value)
|
|
|
|
|
|
class HumidityError(SensorError):
|
|
def __init__(self, value):
|
|
super().__init__('humidity', value)
|
|
|
|
|
|
def clamp(val):
|
|
return min(1.0, max(0.0, val))
|
|
|
|
|
|
def normalize(minval, maxval, val):
|
|
return clamp((val - minval) / maxval)
|
|
|
|
|
|
def normal_temp(temperature):
|
|
return normalize(0.0, 45.0, temperature)
|
|
|
|
|
|
def normal_press(pressure):
|
|
return normalize(1000.0, 1050.0, pressure)
|
|
|
|
|
|
def normal_humidity(hum):
|
|
return normalize(20, 90, hum)
|
|
|
|
|
|
def average(values):
|
|
return sum(values) / len(values)
|
|
|
|
|
|
class Sensor:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def reading(self) -> Reading:
|
|
return Reading(0, 0, 0)
|
|
|
|
def json(self) -> bytes:
|
|
pass
|
|
|
|
|
|
class TestSensor(Sensor):
|
|
def __init__(self, t, h, p):
|
|
super().__init__()
|
|
self.temperature = t
|
|
self.relative_humidity = h
|
|
self.pressure = p
|
|
|
|
def json(self) -> bytes:
|
|
return bytes(json.dumps({
|
|
'temperature': self.temperature,
|
|
'pressure': self.pressure,
|
|
'relative_humidity': self.relative_humidity,
|
|
}))
|
|
|
|
|
|
class MS8607(Sensor):
|
|
|
|
def __init__(self, i2c=None):
|
|
super().__init__()
|
|
if i2c is None:
|
|
i2c = busio.I2C(board.SCL, board.SDA)
|
|
self.sensor = get_sensor(i2c)
|
|
|
|
def reading(self) -> Reading:
|
|
while True:
|
|
try:
|
|
reading = Reading(self.sensor.temperature, self.sensor.pressure, self.sensor.relative_humidity)
|
|
except (OSError, ValueError, SensorError) as err:
|
|
print(err)
|
|
sleep(DELAY)
|
|
else:
|
|
return reading
|
|
|
|
|
|
def get_sensor(i2c):
|
|
while True:
|
|
try:
|
|
sensor = ms8607.MS8607(i2c)
|
|
except OSError as os_error:
|
|
print(os_error)
|
|
sleep(DELAY)
|
|
except ValueError as i2c_error:
|
|
print(i2c_error)
|
|
sleep(DELAY)
|
|
else:
|
|
return sensor
|