Clean up and refactoring of CSV approach

This fixes a few minor issues and cleans up the changes introduced by
Paweł Jochym.
This commit is contained in:
Michał Leśniewski
2023-05-18 22:10:59 +02:00
committed by Michał Leśniewski
parent e770d37f2b
commit 3c5ce038a6

View File

@@ -1,15 +1,13 @@
#!/usr/bin/env python3
import csv
import datetime
from .session import Session
from collections import defaultdict
import csv
class ELicznik:
LOGIN_URL = "https://logowanie.tauron-dystrybucja.pl/login"
CHART_URL = "https://elicznik.tauron-dystrybucja.pl/energia/api"
READINGS_URL = "https://elicznik.tauron-dystrybucja.pl/odczyty/api"
DATA_URL = "https://elicznik.tauron-dystrybucja.pl/energia/do/dane"
def __init__(self, username, password):
@@ -35,90 +33,60 @@ class ELicznik:
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def _get_raw_readings(self, type_, start_date, end_date=None):
end_date = end_date or start_date
data = self.session.post(
self.CHART_URL,
data={
"type": type_,
"from": start_date.strftime("%d.%m.%Y"),
"to": end_date.strftime("%d.%m.%Y"),
"profile": "full time",
},
).json()
data = data.get("data", {}).get("allData", {})
for element in data:
date = element.get("Date")
hour = int(element.get("Hour"))
# TODO: There's also an "Extra" field, which seems to be set to be set to "T" only for the one extra hour
# when switching from CEST to CET (e.g. 3 AM on 2021-10-31)
timestamp = datetime.datetime.strptime(date, "%Y-%m-%d")
timestamp += datetime.timedelta(hours=hour)
value = element.get("EC")
yield timestamp, value
def get_readings_production(self, start_date, end_date=None):
return dict(self._get_raw_readings("oze", start_date, end_date))
def get_readings_consumption(self, start_date, end_date=None):
return dict(self._get_raw_readings("consum", start_date, end_date))
def get_readings(self, start_date, end_date=None):
consumed = self.get_readings_consumption(start_date, end_date)
produced = self.get_readings_production(start_date, end_date)
return sorted(
(timestamp, float(consumed.get(timestamp)), float(produced.get(timestamp)))
for timestamp in set(consumed) | set(produced)
)
def get_raw_data(self, start_date, end_date=None):
end_date = end_date or start_date
if start_date == end_date:
start_date -= datetime.timedelta(days=1)
return self.session.post(
self.DATA_URL,
data={
"form[from]": start_date.strftime("%d.%m.%Y"),
"form[to]" : end_date.strftime("%d.%m.%Y"),
"form[type]": "godzin", # or "dzien"
"form[to]": end_date.strftime("%d.%m.%Y"),
"form[type]": "godzin", # or "dzien"
"form[consum]": 1,
"form[oze]": 1,
"form[fileType]": "CSV", # or "XLS"
"form[fileType]": "CSV", # or "XLS"
},
).content.decode().split('\n')
).text.splitlines()
def get_data(self, start_date, end_date=None):
@staticmethod
def parse_timestamp(timespec):
date, time = timespec.split(None, 1)
hour = int(time.split(":")[0]) - 1
return datetime.datetime.strptime(date, "%Y-%m-%d") + datetime.timedelta(
hours=hour
)
def get_readings(self, start_date, end_date=None):
end_date = end_date or start_date
data = csv.reader(self.get_raw_data(start_date, end_date)[1:], delimiter=';')
cons = defaultdict(float)
prod = defaultdict(float)
for rec in data:
try :
t, v, r, *_ = rec
except ValueError:
# print('ValueError:', rec)
continue
date, hour = t.split()
h, m = hour.split(':')
timestamp = datetime.datetime.strptime(date, "%d.%m.%Y")
timestamp += datetime.timedelta(hours=int(h), minutes=int(m))
# Skip records outside a single day block
if start_date == end_date and timestamp.day != start_date.day:
print(f'Skip {timestamp} not within {start_date}.')
continue
v = v.replace(',','.')
if r=='pobór':
cons[timestamp] = v
elif r=='oddanie':
prod[timestamp] = v
else :
print('Unknown data format:', l)
data = self.get_raw_data(start_date, end_date)
records = [
{
"timestamp": self.parse_timestamp(rec["Data"]),
"value": float(rec[" Wartość kWh"].replace(",", ".")),
"type": rec["Rodzaj"],
}
for rec in csv.DictReader(data, delimiter=";")
]
# skip records which are outside the requested date range
# TODO: is this really needed?
records = [
rec for rec in records if start_date <= rec["timestamp"].date() <= end_date
]
prod = {
rec["timestamp"]: rec["value"] for rec in records if rec["type"] == "pobór"
}
cons = {
rec["timestamp"]: rec["value"]
for rec in records
if rec["type"] == "oddanie"
}
# TODO
# This probably drops the data from the double hour during DST change
# Needs to be investigated and fixed
return sorted(
(timestamp, float(cons[timestamp]), float(prod[timestamp]))
(timestamp, cons.get(timestamp), prod.get(timestamp))
for timestamp in set(cons) | set(prod)
)