chime/chime/sensor.py

186 lines
4.3 KiB
Python

import datetime
import json
import platform
from time import sleep
DELAY = 0.1
def is_raspberry_pi() -> bool:
return platform.machine() in ("armv7l", "armv6l")
if is_raspberry_pi():
import board
import busio
import adafruit_ms8607 as ms8607
class Reading:
def __init__(self, t, p, h, normal=False):
if not normal:
if t < -40.0 or t > 85.0:
raise TemperatureError(t)
if h < 0.0 or h > 100.0:
raise HumidityError(h)
if p < 10 or p > 2000:
raise PressureError(p)
self.time = datetime.datetime.now()
self.temp = t
self.press = p
self.hum = h
self.normal = normal
def __repr__(self):
return f'Reading@{self.time.timestamp()}(t={self.temp},p={self.press},h={self.hum})'
def __str__(self):
return f'{self.time.strftime("%F %T %Z")},{self.time.timestamp()},{self.temp},{self.press},{self.hum}'
def list(self):
return [
self.time.strftime("%F %T %Z"),
self.time.timestamp(),
self.temp,
self.press,
self.hum
]
def normalize(self):
"""
normal returns a normalised average of PHT.
"""
if self.normal:
return self
return Reading(
normal_temp(self.temp),
normal_press(self.press),
normal_humidity(self.hum),
normal=True
)
def average(self):
if not self.normal:
return self.normalize().average()
return average([self.temp, self.press, self.hum])
def json(self) -> bytes:
return bytes(json.dumps({
'timestamp': self.time.timestamp(),
'temperature': self.temp,
'pressure': self.press,
'relative_humidity': self.hum,
}).encode('utf-8'))
class SensorError(Exception):
def __init__(self, component, value):
self.component = component
self.value = value
def __repr__(self):
return f'SensorError: {self.component} out of range: {self.value}'
def __str__(self):
return repr(self)
class TemperatureError(SensorError):
def __init__(self, value):
super().__init__('temperature', value)
class PressureError(SensorError):
def __init__(self, value):
super().__init__('pressure', value)
class HumidityError(SensorError):
def __init__(self, value):
super().__init__('humidity', value)
def clamp(val):
return min(1.0, max(0.0, val))
def normalize(minval, maxval, val):
return clamp((val - minval) / maxval)
def normal_temp(temperature):
return normalize(0.0, 45.0, temperature)
def normal_press(pressure):
return normalize(1000.0, 1050.0, pressure)
def normal_humidity(hum):
return normalize(20, 90, hum)
def average(values):
return sum(values) / len(values)
class Sensor:
def __init__(self):
pass
def reading(self) -> Reading:
return Reading(0, 0, 0)
def json(self) -> bytes:
pass
class TestSensor(Sensor):
def __init__(self, t, h, p):
super().__init__()
self.temperature = t
self.relative_humidity = h
self.pressure = p
def json(self) -> bytes:
return bytes(json.dumps({
'temperature': self.temperature,
'pressure': self.pressure,
'relative_humidity': self.relative_humidity,
}))
class MS8607(Sensor):
def __init__(self, i2c=None):
super().__init__()
if i2c is None:
i2c = busio.I2C(board.SCL, board.SDA)
self.sensor = get_sensor(i2c)
def reading(self) -> Reading:
while True:
try:
reading = Reading(self.sensor.temperature, self.sensor.pressure, self.sensor.relative_humidity)
except (OSError, ValueError, SensorError) as err:
print(err)
sleep(DELAY)
else:
return reading
def get_sensor(i2c):
while True:
try:
sensor = ms8607.MS8607(i2c)
except OSError as os_error:
print(os_error)
sleep(DELAY)
except ValueError as i2c_error:
print(i2c_error)
sleep(DELAY)
else:
return sensor