"""Withings API fetcher. Supports downloading activity summary from the [Withings API](https://developer.withings.com/api-reference/) using the [non-official Withings API Python Client](https://pypi.org/project/withings-api/). """ import dataclasses import datetime import logging import pickle import subprocess from pathlib import Path import withings_api from withings_api.common import CredentialsType from personal_data import secrets from personal_data.data import DeduplicateMode, Scraper logger = logging.getLogger(__name__) CREDENTIALS_FILE = Path('secrets/withings_oath_creds') def save_credentials(credentials: CredentialsType) -> None: """Save credentials to a file.""" logger.info('Saving credentials in: %s', CREDENTIALS_FILE) with open(CREDENTIALS_FILE, 'wb') as file_handle: pickle.dump(credentials, file_handle) def load_credentials() -> CredentialsType: """Load credentials from a file.""" logger.info('Using credentials saved in: %s', CREDENTIALS_FILE) try: with open(CREDENTIALS_FILE, 'rb') as file_handle: return pickle.load(file_handle) except FileNotFoundError: return None @dataclasses.dataclass(frozen=True) class WithingsActivity(Scraper): dataset_name = 'withings_activity' deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS @staticmethod def requires_cfscrape() -> bool: return False def oauth_flow(self) -> CredentialsType: if creds := load_credentials(): return creds auth = withings_api.WithingsAuth( client_id=secrets.WITHINGS_CLIENTID, consumer_secret=secrets.WITHINGS_SECRET, callback_uri=secrets.WITHINGS_CALLBACK_URI, scope=( withings_api.AuthScope.USER_ACTIVITY, withings_api.AuthScope.USER_METRICS, withings_api.AuthScope.USER_INFO, withings_api.AuthScope.USER_SLEEP_EVENTS, ), ) authorize_url = auth.get_authorize_url() subprocess.run(['firefox', '--new-tab', authorize_url]) credentials_code = input('Please insert your code here: ').strip() creds = auth.get_credentials(credentials_code) save_credentials(creds) return creds def scrape(self): credentials = self.oauth_flow() # Now you are ready to make calls for data. api = withings_api.WithingsApi(credentials) start = datetime.date.today() - datetime.timedelta(days=200) end = datetime.date.today() activity_result = api.measure_get_activity( startdateymd=start, enddateymd=end, ) for activity in activity_result.activities: sample = dict(activity) sample['date'] = activity.date.date() del sample['timezone'], sample['is_tracker'] yield sample del activity, sample