From b561352ec4a21fbe6187fc252f6026abad2cfc16 Mon Sep 17 00:00:00 2001 From: Jon Michael Aanes Date: Tue, 7 Mar 2023 00:03:47 +0100 Subject: [PATCH] Redistributed library in a cursed fashion. --- __init__.py | 12 +++++ format.py | 111 ++++++++++++++++++++++++++++++++++++++++++++ parse.py | 24 ++++++++++ schemeld.py | 129 ---------------------------------------------------- 4 files changed, 147 insertions(+), 129 deletions(-) create mode 100644 __init__.py create mode 100644 format.py create mode 100644 parse.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..ab21368 --- /dev/null +++ b/__init__.py @@ -0,0 +1,12 @@ + +# Package + +import sys, os +sys.path.append(os.path.join(os.path.dirname(__file__))) + +import format +import parse +import schemeld +import wikidata_ext + + diff --git a/format.py b/format.py new file mode 100644 index 0000000..4c6da6f --- /dev/null +++ b/format.py @@ -0,0 +1,111 @@ + +import schemeld + +REFERENCE_PROPERTIES = {'P813', 'P854', 'P248', 'P143', 'P813'} + +def fmt_value(c, prefer_reference = False): + if isinstance(c, str): + return '"{}"'.format(c) # TODO: Escape + elif isinstance(c, schemeld.Concept): + if '@id' in c: + return fmt_value(c['@id'], prefer_reference) + else: + logging.error('Could not determine useful id for %s', c) + return '' + elif isinstance(c, wikidata.entity.Entity): + s = c.id + if isinstance(s, int): + s = 'P{}'.format(s) + if s in REFERENCE_PROPERTIES: + s = s.replace('P', 'S', 1) + return s + elif isinstance(c, urllib.parse.ParseResult): + return c.geturl() if prefer_reference else fmt_value(c.geturl(), prefer_reference) + elif isinstance(c, datetime.datetime): + return '+{}/11'.format(c.isoformat()) + elif isinstance(c, datetime.date): + return '+{}T00:00:00Z/11'.format(c.isoformat()) + + return str(c) + +def fmt_predicate(pred, object): + if isinstance(pred, urllib.parse.ParseResult) and pred.netloc == 'schema.org': + lang = object.get('__language') or 'en' + if pred.path == '/name': + return 'L'+lang + elif pred.path == '/alternateName': + return 'A'+lang + elif pred.path == '/description': + return 'D'+lang + elif pred.path == '/sameAs': + return 'S{}wiki'.format(lang) + else: + assert False, pred + return fmt_value(pred, prefer_reference = True) + +def to_quickstatements_v1_item(subject, lines, skip_impossible = True, skip_already_syncronized = True): + #assert '@id' not in subject, 'TODO: Linked subjects' + subject_id = fmt_value(subject, True) if '@id' in subject else 'LAST' + + if subject_id == 'LAST': + lines.append(['CREATE']) + + def fmt_key_value_pair(v, line): + if isinstance(v, list): + for e in v: + fmt_key_value_pair(e, line) + return + elif isinstance(v, dict) and 'value' in v: + line.append(fmt_value(v['value'])) + for sub_k, sub_v in v.items(): + if sub_k is None or sub_v is None: + continue + if not isinstance(sub_k, str): + line.append(fmt_predicate(sub_k, sub_v)) + line.append(fmt_value(sub_v)) + else: + line.append(fmt_value(v)) + + for predicate, pred_objects in subject.data.items(): + if isinstance(predicate, str) and (predicate == '@id' or predicate.startswith('__')): + continue + + assert isinstance(pred_objects, list) + for pred_object in pred_objects: + if pred_object.get('__synchronized_with_wikidata', False) and skip_already_syncronized: + continue + predicate_str = fmt_predicate(predicate, pred_object) + line = [subject_id, predicate_str] + fmt_key_value_pair(pred_object, line) + + if skip_impossible and predicate_str.startswith('"'): + logging.warning('Bad line: %s (Lines must not start with ")', predicate_str) + continue + if '' in line and skip_impossible: + logging.warning('Bad line: %s (Lines must not contain empty names)', line) + continue + assert 'None' not in line, line + lines.append(line) + +def to_quickstatements_v1(concepts): + if isinstance(concepts, schemeld.Concept): + concepts = [concepts] + + lines = [] + + for concept in concepts: + to_quickstatements_v1_item(concept, lines) + + logging.info("Produced %s statements for %s concepts", len(lines), len(concepts)) + commands = '\n'.join(['\t'.join(l) for l in lines]) + + assert '\tNone\t' not in commands, 'TODO' + return commands + +def commands_to_quickstatements_v1_url(commands): + url = commands.replace('\t', '|').replace('\n', '||') + url = urllib.parse.quote(url, safe = '') + return 'https://quickstatements.toolforge.org/#/v1=' + url + +def to_quickstatements_v1_url(concepts): + return commands_to_quickstatements_v1_url(to_quickstatements_v1(concepts)) diff --git a/parse.py b/parse.py new file mode 100644 index 0000000..85f7ba6 --- /dev/null +++ b/parse.py @@ -0,0 +1,24 @@ + +import schemeld + +def determine_concepts_internal(json, context, outputs): + if isinstance(json, list): + for m in json: + determine_concepts_internal(m, context, outputs) + return + + assert isinstance(json, dict), type(json) + context = urllib.parse.urlparse(json.get('@context', context)) + assert context.netloc == 'schema.org' + + if '@graph' in json: + determine_concepts_internal(json['@graph'], context, outputs) + else: + outputs.append(schemeld.Concept(context, json)) + +def determine_concepts(json): + concepts = [] + determine_concepts_internal(json, '', concepts) + return concepts + + diff --git a/schemeld.py b/schemeld.py index d5bc4ff..a40c277 100644 --- a/schemeld.py +++ b/schemeld.py @@ -68,132 +68,3 @@ class Concept(object): def __str__(self): return repr(self) -def determine_concepts_internal(json, context, outputs): - if isinstance(json, list): - for m in json: - determine_concepts_internal(m, context, outputs) - return - - assert isinstance(json, dict), type(json) - context = urllib.parse.urlparse(json.get('@context', context)) - assert context.netloc == 'schema.org' - - if '@graph' in json: - determine_concepts_internal(json['@graph'], context, outputs) - else: - outputs.append(Concept(context, json)) - -def determine_concepts(json): - concepts = [] - determine_concepts_internal(json, '', concepts) - return concepts - -REFERENCE_PROPERTIES = {'P813', 'P854', 'P248', 'P143', 'P813'} - -def fmt_value(c, prefer_reference = False): - if isinstance(c, str): - return '"{}"'.format(c) # TODO: Escape - elif isinstance(c, Concept): - if '@id' in c: - return fmt_value(c['@id'], prefer_reference) - else: - logging.error('Could not determine useful id for %s', c) - return '' - elif isinstance(c, wikidata.entity.Entity): - s = c.id - if isinstance(s, int): - s = 'P{}'.format(s) - if s in REFERENCE_PROPERTIES: - s = s.replace('P', 'S', 1) - return s - elif isinstance(c, urllib.parse.ParseResult): - return c.geturl() if prefer_reference else fmt_value(c.geturl(), prefer_reference) - elif isinstance(c, datetime.datetime): - return '+{}/11'.format(c.isoformat()) - elif isinstance(c, datetime.date): - return '+{}T00:00:00Z/11'.format(c.isoformat()) - - return str(c) - -def fmt_predicate(pred, object): - if isinstance(pred, urllib.parse.ParseResult) and pred.netloc == 'schema.org': - lang = object.get('__language') or 'en' - if pred.path == '/name': - return 'L'+lang - elif pred.path == '/alternateName': - return 'A'+lang - elif pred.path == '/description': - return 'D'+lang - elif pred.path == '/sameAs': - return 'S{}wiki'.format(lang) - else: - assert False, pred - return fmt_value(pred, prefer_reference = True) - -def to_quickstatements_v1_item(subject, lines, skip_impossible = True, skip_already_syncronized = True): - #assert '@id' not in subject, 'TODO: Linked subjects' - subject_id = fmt_value(subject, True) if '@id' in subject else 'LAST' - - if subject_id == 'LAST': - lines.append(['CREATE']) - - def fmt_key_value_pair(v, line): - if isinstance(v, list): - for e in v: - fmt_key_value_pair(e, line) - return - elif isinstance(v, dict) and 'value' in v: - line.append(fmt_value(v['value'])) - for sub_k, sub_v in v.items(): - if sub_k is None or sub_v is None: - continue - if not isinstance(sub_k, str): - line.append(fmt_predicate(sub_k, sub_v)) - line.append(fmt_value(sub_v)) - else: - line.append(fmt_value(v)) - - for predicate, pred_objects in subject.data.items(): - if isinstance(predicate, str) and (predicate == '@id' or predicate.startswith('__')): - continue - - assert isinstance(pred_objects, list) - for pred_object in pred_objects: - if pred_object.get('__synchronized_with_wikidata', False) and skip_already_syncronized: - continue - predicate_str = fmt_predicate(predicate, pred_object) - line = [subject_id, predicate_str] - fmt_key_value_pair(pred_object, line) - - if skip_impossible and predicate_str.startswith('"'): - logging.warning('Bad line: %s (Lines must not start with ")', predicate_str) - continue - if '' in line and skip_impossible: - logging.warning('Bad line: %s (Lines must not contain empty names)', line) - continue - assert 'None' not in line, line - lines.append(line) - -def to_quickstatements_v1(concepts): - if isinstance(concepts, Concept): - concepts = [concepts] - - lines = [] - - for concept in concepts: - to_quickstatements_v1_item(concept, lines) - - logging.info("Produced %s statements for %s concepts", len(lines), len(concepts)) - commands = '\n'.join(['\t'.join(l) for l in lines]) - - assert '\tNone\t' not in commands, 'TODO' - return commands - -def commands_to_quickstatements_v1_url(commands): - url = commands.replace('\t', '|').replace('\n', '||') - url = urllib.parse.quote(url, safe = '') - return 'https://quickstatements.toolforge.org/#/v1=' + url - -def to_quickstatements_v1_url(concepts): - return commands_to_quickstatements_v1_url(to_quickstatements_v1(concepts)) -