1
0
personal-data/personal_data/fetchers/youtube.py

85 lines
2.5 KiB
Python

import datetime
import json
import logging
import subprocess
from dataclasses import dataclass
from typing import ClassVar
from personal_data.data import DeduplicateMode, Scraper
from personal_data.secrets import YOUTUBE_AUTH
from ..util import safe_del
logger = logging.getLogger(__name__)
PLAYLIST_ID = 'PLAfDVJvDKCvOMvfoTL7eW8GkWNJwd90eV'
def scrape(watch_history: bool) -> list[dict[str, str]]:
"""Use yt-dlp to fetch the list of favorited videos. This is a placeholder for invoking yt-dlp and parsing its output."""
if watch_history:
url = 'https://www.youtube.com/feed/history'
ytdlp_args = [
'yt-dlp',
'--dump-json',
'--cookies',
YOUTUBE_AUTH,
url,
]
else:
url = f'https://www.youtube.com/playlist?list={PLAYLIST_ID}'
ytdlp_args = [
'yt-dlp',
'--flat-playlist',
'--dump-json',
url,
]
result = subprocess.run(
ytdlp_args,
capture_output=True,
text=True,
)
if result.returncode != 0:
message = 'Non-zero returncode in command: ' + str(result.returncode) + "\n\n" + result.stderr
raise RuntimeError(message)
output = []
for line in result.stdout.splitlines():
data = json.loads(line)
if watch_history:
if data.get('thumbnails'):
data['thumbnail'] = data['thumbnails'][-1]['url']
if data.get('timestamp'):
data['watch_datetime'] = datetime.datetime.fromtimestamp(
int(data['timestamp']),
tz=datetime.timezone.utc
).isoformat()
else:
data['thumbnail'] = data['thumbnails'][-1]['url']
safe_del(data, '_type', '_version', 'thumbnails')
output.append(data)
return output
@dataclass(frozen=True)
class YoutubeFavoritesScraper(Scraper):
dataset_name: str = 'youtube_favorites'
deduplicate_mode: DeduplicateMode = DeduplicateMode.BY_ALL_COLUMNS
deduplicate_ignore_columns: ClassVar[list[str]] = []
def scrape(self) -> list[dict]:
yield from scrape(watch_history=False)
@dataclass(frozen=True)
class YoutubeWatchHistoryScraper(Scraper):
dataset_name: str = 'youtube_watch_history'
deduplicate_mode: DeduplicateMode = DeduplicateMode.BY_ALL_COLUMNS
deduplicate_ignore_columns: ClassVar[list[str]] = []
def scrape(self) -> list[dict]:
yield from scrape(watch_history=True)