1
0
personal-data/personal_data/fetchers/home_assistant.py

53 lines
1.7 KiB
Python

import dataclasses
from decimal import Decimal
import datetime
import logging
from collections.abc import Iterator, Mapping
from frozendict import frozendict
from personal_data.data import DeduplicateMode, Scraper
from .. import secrets
logger = logging.getLogger(__name__)
HA_ROOT = secrets.HOME_ASSISTANT_ROOT
HA_LLAK= secrets.HOME_ASSISTANT_LLAK
@dataclasses.dataclass(frozen=True)
class HomeAssistantScaleWeight(Scraper):
dataset_name = 'health_weight'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
deduplicate_ignore_columns = []
def scrape(self) -> Iterator[Mapping[str, object]]:
headers = {
'Authorization': 'Bearer '+HA_LLAK,
'Content-Type': 'application/json',
}
end_time = datetime.datetime.now()
start_time = end_time - datetime.timedelta(days = 90)
url = '{}/api/history/period/{}'.format(HA_ROOT, start_time)
print(url)
params = {
'filter_entity_id': 'sensor.bathroom_scale_mass',
'end_time': end_time,
}
response = self.session.get(url, params = params, headers = headers)
response.raise_for_status()
data = response.json()
state_range_for_consideration = (Decimal(30), Decimal(300))
for d in data[0]:
if d['state'] == 'unavailable':
continue
state = Decimal(d['state'])
if state_range_for_consideration[0] <= state <= state_range_for_consideration[1]:
yield {
'weight.sample_time': datetime.datetime.fromisoformat(d['last_updated']),
'weight.kg': state,
}