diff --git a/personal_data/fetchers/home_assistant.py b/personal_data/fetchers/home_assistant.py new file mode 100644 index 0000000..109c0bc --- /dev/null +++ b/personal_data/fetchers/home_assistant.py @@ -0,0 +1,52 @@ +import dataclasses +from decimal import Decimal +import datetime +import logging +from collections.abc import Iterator, Mapping + +from frozendict import frozendict + +from personal_data.data import DeduplicateMode, Scraper + +from .. import secrets + +logger = logging.getLogger(__name__) + +HA_ROOT = secrets.HOME_ASSISTANT_ROOT +HA_LLAK= secrets.HOME_ASSISTANT_LLAK + +@dataclasses.dataclass(frozen=True) +class HomeAssistantScaleWeight(Scraper): + dataset_name = 'health_weight' + deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS + deduplicate_ignore_columns = [] + + def scrape(self) -> Iterator[Mapping[str, object]]: + headers = { + 'Authorization': 'Bearer '+HA_LLAK, + 'Content-Type': 'application/json', + } + end_time = datetime.datetime.now() + start_time = end_time - datetime.timedelta(days = 90) + url = '{}/api/history/period/{}'.format(HA_ROOT, start_time) + print(url) + params = { + 'filter_entity_id': 'sensor.bathroom_scale_mass', + 'end_time': end_time, + } + response = self.session.get(url, params = params, headers = headers) + response.raise_for_status() + + data = response.json() + + state_range_for_consideration = (Decimal(30), Decimal(300)) + + for d in data[0]: + if d['state'] == 'unavailable': + continue + state = Decimal(d['state']) + if state_range_for_consideration[0] <= state <= state_range_for_consideration[1]: + yield { + 'weight.sample_time': datetime.datetime.fromisoformat(d['last_updated']), + 'weight.kg': state, + } diff --git a/personal_data/main.py b/personal_data/main.py index 949fcdc..0e2b2c3 100644 --- a/personal_data/main.py +++ b/personal_data/main.py @@ -77,6 +77,24 @@ def equals_without_fields( return frozendict(a) == frozendict(b) +def deduplicate_by_ignoring_certain_fields(dicts: list[dict], + deduplicate_ignore_columns: list[str], +) -> list[dict]: + """Removes duplicates that occur when ignoring certain columns. + + Output order is stable. + """ + to_remove = set() + for idx1, first in enumerate(dicts): + for idx2, second in enumerate(dicts[idx1 + 1 :], idx1 + 1): + if equals_without_fields(first, second, deduplicate_ignore_columns): + to_remove.add(idx2) + + to_remove = sorted(to_remove) + while to_remove: + del dicts[to_remove.pop()] + + return dicts def deduplicate_dicts( dicts: Sequence[dict], @@ -99,12 +117,8 @@ def deduplicate_dicts( ): del dicts[-1] elif deduplicate_mode == personal_data.data.DeduplicateMode.BY_ALL_COLUMNS: - to_remove = set() - for idx1, first in enumerate(dicts): - for second in dicts[idx1 + 1 :]: - if equals_without_fields(first, second, deduplicate_ignore_columns): - to_remove.add(second) - dicts = set(dicts) - to_remove + dicts = deduplicate_by_ignoring_certain_fields(dicts, + deduplicate_ignore_columns) elif deduplicate_mode != personal_data.data.DeduplicateMode.NONE: dicts = set(dicts) diff --git a/personal_data/secrets.py b/personal_data/secrets.py index a68bd76..7984c1e 100644 --- a/personal_data/secrets.py +++ b/personal_data/secrets.py @@ -43,6 +43,10 @@ KUCOIN_PASS = load_secret('KUCOIN_PASS') KRAKEN_KEY = load_secret('KRAKEN_KEY') KRAKEN_SECRET = load_secret('KRAKEN_SECRET') +# Home Assistant +HOME_ASSISTANT_ROOT = load_secret('HOME_ASSISTANT_ROOT') +HOME_ASSISTANT_LLAK = load_secret('HOME_ASSISTANT_LLAK') + # Email configuration MAILGUN_API_KEY = load_secret('MAILGUN_API_KEY') MAILGUN_DOMAIN = load_secret('MAILGUN_DOMAIN') diff --git a/test/test_deduplicate.py b/test/test_deduplicate.py index 55729d7..f103097 100644 --- a/test/test_deduplicate.py +++ b/test/test_deduplicate.py @@ -1,5 +1,8 @@ from frozendict import frozendict +import datetime +from decimal import Decimal + from personal_data.data import DeduplicateMode from personal_data.main import deduplicate_dicts @@ -23,7 +26,7 @@ def test_no_fields_to_ignore(): def test_only_latest(): ls, fields = deduplicate_dicts(LIST, DeduplicateMode.ONLY_LATEST, ['t']) assert fields == ['a', 'b', 't'] - assert ls == ls[:3] + assert ls == LIST[:3] def test_all_fields(): @@ -34,3 +37,28 @@ def test_all_fields(): frozendict({'a': 0, 'b': 12, 't': 300}), frozendict({'a': 1, 'b': 2, 't': 300}), ] + +def test_all_fields(): + ls, fields = deduplicate_dicts(LIST + LIST, DeduplicateMode.BY_ALL_COLUMNS, ['t']) + assert fields == ['a', 'b', 't'] + print(ls) + assert ls == [ + frozendict({'a': 0, 'b': 12, 't': 300}), + frozendict({'a': 1, 'b': 2, 't': 300}), + ] + +LIST_2 = [ + frozendict({'weight.sample_time': datetime.datetime(2024, 5, 28, 6, 27, 31, 134506, tzinfo=datetime.timezone.utc), 'weight.kg': Decimal('73.6')}), + frozendict({'weight.sample_time': datetime.datetime(2024, 6, 1, 7, 36, 9, 590355, tzinfo=datetime.timezone.utc), 'weight.kg': Decimal('74.7')}), +] + +def test_deduplicate_weight(): + ls, fields = deduplicate_dicts(LIST_2, DeduplicateMode.BY_ALL_COLUMNS, []) + assert fields == ['weight.sample_time', 'weight.kg'] + assert ls == LIST_2 + +def test_deduplicate_weight_2(): + ls, fields = deduplicate_dicts(LIST_2 + LIST_2, DeduplicateMode.BY_ALL_COLUMNS, []) + assert fields == ['weight.sample_time', 'weight.kg'] + assert ls == LIST_2 +