1
0
personal-data/personal_data/fetchers/home_assistant.py
Jon Michael Aanes a6e6c6dfee
All checks were successful
Test Python / Test (push) Successful in 27s
Ruff
2024-06-06 23:50:29 +02:00

58 lines
1.7 KiB
Python

import dataclasses
import datetime
import logging
from collections.abc import Iterator, Mapping
from decimal import Decimal
from personal_data.data import DeduplicateMode, Scraper
from .. import secrets
logger = logging.getLogger(__name__)
HA_ROOT = secrets.HOME_ASSISTANT_ROOT
HA_LLAK = secrets.HOME_ASSISTANT_LLAK
@dataclasses.dataclass(frozen=True)
class HomeAssistantScaleWeight(Scraper):
dataset_name = 'health_weight'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
deduplicate_ignore_columns = []
def scrape(self) -> Iterator[Mapping[str, object]]:
headers = {
'Authorization': 'Bearer ' + HA_LLAK,
'Content-Type': 'application/json',
}
end_time = datetime.datetime.now()
start_time = end_time - datetime.timedelta(days=90)
url = f'{HA_ROOT}/api/history/period/{start_time}'
print(url)
params = {
'filter_entity_id': 'sensor.bathroom_scale_mass',
'end_time': end_time,
}
response = self.session.get(url, params=params, headers=headers)
response.raise_for_status()
data = response.json()
state_range_for_consideration = (Decimal(30), Decimal(300))
for d in data[0]:
if d['state'] == 'unavailable':
continue
state = Decimal(d['state'])
if (
state_range_for_consideration[0]
<= state
<= state_range_for_consideration[1]
):
yield {
'weight.sample_time': datetime.datetime.fromisoformat(
d['last_updated'],
),
'weight.kg': state,
}