Skip to content

Instantly share code, notes, and snippets.

@suchmememanyskill
Created August 18, 2024 10:40
Show Gist options
  • Select an option

  • Save suchmememanyskill/5c56d76f1a13aed860c494ef293bcb68 to your computer and use it in GitHub Desktop.

Select an option

Save suchmememanyskill/5c56d76f1a13aed860c494ef293bcb68 to your computer and use it in GitHub Desktop.
Script that creates an ESPHome filter based on 2 sensors (one reference, one target). Input is a history graph from home assistant ("Download Data")
import csv, sys, datetime, numpy
if len(sys.argv) <= 1:
print(f"{sys.argv[0]} data.csv step[default=0.1] reference_sensor[default=None]")
exit()
csv_file_path = sys.argv[1]
step = float(sys.argv[2]) if len(sys.argv) >= 3 else 0.1
first = True
class Sensor:
def __init__(self, name):
self.name = name
self.data = []
def add_row(self, time, value):
if value == "unavailable":
return
self.data.append({"datetime": datetime.datetime.strptime(time, "%Y-%m-%dT%H:%M:%S.%fZ"), "value": float(value)})
def closest_value(self, time : datetime):
prev_value = 99999999999
for x in self.data:
cur_value = abs((time - x['datetime']).total_seconds())
if cur_value < prev_value:
prev_value = cur_value
else:
return (cur_value, x["value"])
return (prev_value, self.data[-1]["value"])
class ValueSetForSensor:
def __init__(self, reference : float, sensor : Sensor):
self.values_near_reference = []
self.reference = round(reference, 1)
self.sensor = sensor
def add(self, value : float):
self.values_near_reference.append(value)
def avg(self):
return sum(self.values_near_reference) / len(self.values_near_reference)
def is_empty(self):
return len(self.values_near_reference) <= 0
def advice(self):
return (self.reference, round(self.avg(), 1))
def __str__(self) -> str:
mi = min(self.values_near_reference)
ma = max(self.values_near_reference)
return f"Sensor: {self.sensor.name}, Reference value: {self.reference}, Min: {mi}, Max: {ma}, Avg: {self.avg():.2f}, Delta: {ma - mi:.2f}"
sensors : dict[str, Sensor] = {}
if len(sys.argv) >= 4:
sensors[sys.argv[3]] = Sensor(sys.argv[3])
with open(csv_file_path, 'r') as csv_file:
for row in csv.reader(csv_file):
if first:
first = False
continue
print(row)
if row[0] not in sensors:
sensors[row[0]] = Sensor(row[0])
sensors[row[0]].add_row(row[2], row[1])
reference_sensor = sensors[list(sensors.keys())[0]]
sensor_calibrations : dict[str, list[ValueSetForSensor]] = {}
for i, (sensor_name, sensor) in enumerate(sensors.items()):
if sensor_name == reference_sensor.name:
continue
ref_min = min([x["value"] for x in sensor.data])
ref_max = max([x["value"] for x in sensor.data])
start = (step - (ref_min % step)) + ref_min
cur = start
while (cur < ref_max):
cur_min = cur - (step / 2)
cur_max = cur + (step / 2)
value_set = ValueSetForSensor(cur, sensor)
for x in sensor.data:
reference_sensor_value = x["value"]
reference_sensor_value_datetime = x["datetime"]
if reference_sensor_value >= cur_min and reference_sensor_value < cur_max:
closest_value = reference_sensor.closest_value(reference_sensor_value_datetime)
value_set.add(closest_value[1])
if not value_set.is_empty():
print(str(value_set))
if sensor_name not in sensor_calibrations:
sensor_calibrations[sensor_name] = []
sensor_calibrations[sensor_name].append(value_set)
cur += step
for x in sensor_calibrations:
print()
advices : dict[float, list[float]] = {}
for y in sensor_calibrations[x]:
advice = y.advice()
if advice[0] not in advices:
advices[advice[0]] = []
advices[advice[0]].append(advice[1])
advices_summed : dict [float, float] = {}
for y in advices:
advices_summed[y] = round(sum(advices[y]) / len(advices[y]), 1)
deltas = [advices_summed[y] - y for y in advices_summed]
average_delta_of_deltas = round(sum(deltas) / len(deltas), 1)
fifth_percentile = numpy.percentile(deltas, 5)
ninety_fifth_percentile = numpy.percentile(deltas, 95)
delta = abs(ninety_fifth_percentile - fifth_percentile)
print(f"For sensor {x}, constant (Precision: {delta:0.1f}):")
print("filters:")
print(f" - offset: {average_delta_of_deltas:0.1f}")
print()
print(f"For sensor {x}, linear:")
print("filters:")
print(" - calibrate_linear:")
print(" method: exact")
print(" datapoints:")
for y in advices_summed:
print(f" - {y} -> {advices_summed[y]}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment