1
0
personal-data/personal_data/fetchers/home_assistant.py

58 lines
1.7 KiB
Python
Raw Normal View History

2024-06-06 21:49:52 +00:00
import dataclasses
import datetime
import logging
from collections.abc import Iterator, Mapping
2024-06-06 21:50:29 +00:00
from decimal import Decimal
2024-06-06 21:49:52 +00:00
from personal_data.data import DeduplicateMode, Scraper
from .. import secrets
logger = logging.getLogger(__name__)
HA_ROOT = secrets.HOME_ASSISTANT_ROOT
2024-06-06 21:50:29 +00:00
HA_LLAK = secrets.HOME_ASSISTANT_LLAK
2024-06-06 21:49:52 +00:00
@dataclasses.dataclass(frozen=True)
class HomeAssistantScaleWeight(Scraper):
dataset_name = 'health_weight'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
deduplicate_ignore_columns = []
def scrape(self) -> Iterator[Mapping[str, object]]:
headers = {
2024-06-06 21:50:29 +00:00
'Authorization': 'Bearer ' + HA_LLAK,
'Content-Type': 'application/json',
2024-06-06 21:49:52 +00:00
}
end_time = datetime.datetime.now()
2024-06-06 21:50:29 +00:00
start_time = end_time - datetime.timedelta(days=90)
url = f'{HA_ROOT}/api/history/period/{start_time}'
2024-06-06 21:49:52 +00:00
print(url)
params = {
2024-06-06 21:50:29 +00:00
'filter_entity_id': 'sensor.bathroom_scale_mass',
'end_time': end_time,
2024-06-06 21:49:52 +00:00
}
2024-06-06 21:50:29 +00:00
response = self.session.get(url, params=params, headers=headers)
2024-06-06 21:49:52 +00:00
response.raise_for_status()
data = response.json()
state_range_for_consideration = (Decimal(30), Decimal(300))
for d in data[0]:
if d['state'] == 'unavailable':
continue
state = Decimal(d['state'])
2024-06-06 21:50:29 +00:00
if (
state_range_for_consideration[0]
<= state
<= state_range_for_consideration[1]
):
2024-06-06 21:49:52 +00:00
yield {
2024-06-06 21:50:29 +00:00
'weight.sample_time': datetime.datetime.fromisoformat(
d['last_updated'],
),
'weight.kg': state,
2024-06-06 21:49:52 +00:00
}