import logging import urllib.parse import wikidata.entity import datetime from dataclasses import dataclass from enforce_typing import enforce_types from typing import List, Set, Optional, Union from enum import Enum STRICT_VALIDATION = True class Concept(object): def __init__(self, context, data): self.context = context self.data = {self.canonical_key(k):v for (k, v) in data.items()} def get(self, key, *args, **kwargs): return self.data.get(self.canonical_key(key, *args, **kwargs)) def keys(self): return self.data.keys() def setdefault(self, key, value): return self.data.setdefault(self.canonical_key(key), value) def to_dict(self): return {k:v for k,v in self.data.items()} def __getitem__(self, key): return self.data[self.canonical_key(key)] def __setitem__(self, key, value): if STRICT_VALIDATION: if not isinstance(key, str) or key != '@id': assert isinstance(value, list), value for v in value: assert isinstance(v, dict), value assert 'value' in v, value for subk in v: assert not isinstance(v[subk], list), value self.data[self.canonical_key(key)] = value def __contains__(self, key): return self.canonical_key(key) in self.data def __delitem__(self, key): del self.data[self.canonical_key(key)] def canonical_key(self, key): if isinstance(key, urllib.parse.ParseResult): return key if not isinstance(key, str): return key elif key.startswith('@'): return key if self.context is None: return key return self.context._replace(path = key) def __repr__(self): if id := self.data.get('@id'): return 'Concept {{ @id = {} }}'.format(id) return 'Concept '+str(self.data) def __str__(self): return repr(self) def determine_concepts_internal(json, context, outputs): if isinstance(json, list): for m in json: determine_concepts_internal(m, context, outputs) return assert isinstance(json, dict), type(json) context = urllib.parse.urlparse(json.get('@context', context)) assert context.netloc == 'schema.org' if '@graph' in json: determine_concepts_internal(json['@graph'], context, outputs) else: outputs.append(Concept(context, json)) def determine_concepts(json): concepts = [] determine_concepts_internal(json, '', concepts) return concepts REFERENCE_PROPERTIES = {'P813', 'P854', 'P248', 'P143', 'P813'} def fmt_value(c, prefer_reference = False): if isinstance(c, str): return '"{}"'.format(c) # TODO: Escape elif isinstance(c, Concept): if '@id' in c: return fmt_value(c['@id'], prefer_reference) else: logging.error('Could not determine useful id for %s', c) return '' elif isinstance(c, wikidata.entity.Entity): s = c.id if isinstance(s, int): s = 'P{}'.format(s) if s in REFERENCE_PROPERTIES: s = s.replace('P', 'S', 1) return s elif isinstance(c, urllib.parse.ParseResult): return c.geturl() if prefer_reference else fmt_value(c.geturl(), prefer_reference) elif isinstance(c, datetime.datetime): return '+{}/11'.format(c.isoformat()) elif isinstance(c, datetime.date): return '+{}T00:00:00Z/11'.format(c.isoformat()) return str(c) def fmt_predicate(pred, object): if isinstance(pred, urllib.parse.ParseResult) and pred.netloc == 'schema.org': lang = object.get('__language') or 'en' if pred.path == '/name': return 'L'+lang elif pred.path == '/alternateName': return 'A'+lang elif pred.path == '/description': return 'D'+lang elif pred.path == '/sameAs': return 'S{}wiki'.format(lang) else: assert False, pred return fmt_value(pred, prefer_reference = True) def to_quickstatements_v1_item(subject, lines, skip_impossible = True, skip_already_syncronized = True): #assert '@id' not in subject, 'TODO: Linked subjects' subject_id = fmt_value(subject, True) if '@id' in subject else 'LAST' if subject_id == 'LAST': lines.append(['CREATE']) def fmt_key_value_pair(v, line): if isinstance(v, list): for e in v: fmt_key_value_pair(e, line) return elif isinstance(v, dict) and 'value' in v: line.append(fmt_value(v['value'])) for sub_k, sub_v in v.items(): if sub_k is None or sub_v is None: continue if not isinstance(sub_k, str): line.append(fmt_predicate(sub_k, sub_v)) line.append(fmt_value(sub_v)) else: line.append(fmt_value(v)) for predicate, pred_objects in subject.data.items(): if isinstance(predicate, str) and (predicate == '@id' or predicate.startswith('__')): continue assert isinstance(pred_objects, list) for pred_object in pred_objects: if pred_object.get('__synchronized_with_wikidata', False) and skip_already_syncronized: continue predicate_str = fmt_predicate(predicate, pred_object) line = [subject_id, predicate_str] fmt_key_value_pair(pred_object, line) if skip_impossible and predicate_str.startswith('"'): logging.warning('Bad line: %s (Lines must not start with ")', predicate_str) continue if '' in line and skip_impossible: logging.warning('Bad line: %s (Lines must not contain empty names)', line) continue assert 'None' not in line, line lines.append(line) def to_quickstatements_v1(concepts): if isinstance(concepts, Concept): concepts = [concepts] lines = [] for concept in concepts: to_quickstatements_v1_item(concept, lines) logging.info("Produced %s statements for %s concepts", len(lines), len(concepts)) commands = '\n'.join(['\t'.join(l) for l in lines]) assert '\tNone\t' not in commands, 'TODO' return commands def commands_to_quickstatements_v1_url(commands): url = commands.replace('\t', '|').replace('\n', '||') url = urllib.parse.quote(url, safe = '') return 'https://quickstatements.toolforge.org/#/v1=' + url def to_quickstatements_v1_url(concepts): return commands_to_quickstatements_v1_url(to_quickstatements_v1(concepts))