Redistributed library in a cursed fashion.
This commit is contained in:
parent
d39fbbfed3
commit
b561352ec4
12
__init__.py
Normal file
12
__init__.py
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
|
||||||
|
# Package
|
||||||
|
|
||||||
|
import sys, os
|
||||||
|
sys.path.append(os.path.join(os.path.dirname(__file__)))
|
||||||
|
|
||||||
|
import format
|
||||||
|
import parse
|
||||||
|
import schemeld
|
||||||
|
import wikidata_ext
|
||||||
|
|
||||||
|
|
111
format.py
Normal file
111
format.py
Normal file
|
@ -0,0 +1,111 @@
|
||||||
|
|
||||||
|
import schemeld
|
||||||
|
|
||||||
|
REFERENCE_PROPERTIES = {'P813', 'P854', 'P248', 'P143', 'P813'}
|
||||||
|
|
||||||
|
def fmt_value(c, prefer_reference = False):
|
||||||
|
if isinstance(c, str):
|
||||||
|
return '"{}"'.format(c) # TODO: Escape
|
||||||
|
elif isinstance(c, schemeld.Concept):
|
||||||
|
if '@id' in c:
|
||||||
|
return fmt_value(c['@id'], prefer_reference)
|
||||||
|
else:
|
||||||
|
logging.error('Could not determine useful id for %s', c)
|
||||||
|
return ''
|
||||||
|
elif isinstance(c, wikidata.entity.Entity):
|
||||||
|
s = c.id
|
||||||
|
if isinstance(s, int):
|
||||||
|
s = 'P{}'.format(s)
|
||||||
|
if s in REFERENCE_PROPERTIES:
|
||||||
|
s = s.replace('P', 'S', 1)
|
||||||
|
return s
|
||||||
|
elif isinstance(c, urllib.parse.ParseResult):
|
||||||
|
return c.geturl() if prefer_reference else fmt_value(c.geturl(), prefer_reference)
|
||||||
|
elif isinstance(c, datetime.datetime):
|
||||||
|
return '+{}/11'.format(c.isoformat())
|
||||||
|
elif isinstance(c, datetime.date):
|
||||||
|
return '+{}T00:00:00Z/11'.format(c.isoformat())
|
||||||
|
|
||||||
|
return str(c)
|
||||||
|
|
||||||
|
def fmt_predicate(pred, object):
|
||||||
|
if isinstance(pred, urllib.parse.ParseResult) and pred.netloc == 'schema.org':
|
||||||
|
lang = object.get('__language') or 'en'
|
||||||
|
if pred.path == '/name':
|
||||||
|
return 'L'+lang
|
||||||
|
elif pred.path == '/alternateName':
|
||||||
|
return 'A'+lang
|
||||||
|
elif pred.path == '/description':
|
||||||
|
return 'D'+lang
|
||||||
|
elif pred.path == '/sameAs':
|
||||||
|
return 'S{}wiki'.format(lang)
|
||||||
|
else:
|
||||||
|
assert False, pred
|
||||||
|
return fmt_value(pred, prefer_reference = True)
|
||||||
|
|
||||||
|
def to_quickstatements_v1_item(subject, lines, skip_impossible = True, skip_already_syncronized = True):
|
||||||
|
#assert '@id' not in subject, 'TODO: Linked subjects'
|
||||||
|
subject_id = fmt_value(subject, True) if '@id' in subject else 'LAST'
|
||||||
|
|
||||||
|
if subject_id == 'LAST':
|
||||||
|
lines.append(['CREATE'])
|
||||||
|
|
||||||
|
def fmt_key_value_pair(v, line):
|
||||||
|
if isinstance(v, list):
|
||||||
|
for e in v:
|
||||||
|
fmt_key_value_pair(e, line)
|
||||||
|
return
|
||||||
|
elif isinstance(v, dict) and 'value' in v:
|
||||||
|
line.append(fmt_value(v['value']))
|
||||||
|
for sub_k, sub_v in v.items():
|
||||||
|
if sub_k is None or sub_v is None:
|
||||||
|
continue
|
||||||
|
if not isinstance(sub_k, str):
|
||||||
|
line.append(fmt_predicate(sub_k, sub_v))
|
||||||
|
line.append(fmt_value(sub_v))
|
||||||
|
else:
|
||||||
|
line.append(fmt_value(v))
|
||||||
|
|
||||||
|
for predicate, pred_objects in subject.data.items():
|
||||||
|
if isinstance(predicate, str) and (predicate == '@id' or predicate.startswith('__')):
|
||||||
|
continue
|
||||||
|
|
||||||
|
assert isinstance(pred_objects, list)
|
||||||
|
for pred_object in pred_objects:
|
||||||
|
if pred_object.get('__synchronized_with_wikidata', False) and skip_already_syncronized:
|
||||||
|
continue
|
||||||
|
predicate_str = fmt_predicate(predicate, pred_object)
|
||||||
|
line = [subject_id, predicate_str]
|
||||||
|
fmt_key_value_pair(pred_object, line)
|
||||||
|
|
||||||
|
if skip_impossible and predicate_str.startswith('"'):
|
||||||
|
logging.warning('Bad line: %s (Lines must not start with ")', predicate_str)
|
||||||
|
continue
|
||||||
|
if '' in line and skip_impossible:
|
||||||
|
logging.warning('Bad line: %s (Lines must not contain empty names)', line)
|
||||||
|
continue
|
||||||
|
assert 'None' not in line, line
|
||||||
|
lines.append(line)
|
||||||
|
|
||||||
|
def to_quickstatements_v1(concepts):
|
||||||
|
if isinstance(concepts, schemeld.Concept):
|
||||||
|
concepts = [concepts]
|
||||||
|
|
||||||
|
lines = []
|
||||||
|
|
||||||
|
for concept in concepts:
|
||||||
|
to_quickstatements_v1_item(concept, lines)
|
||||||
|
|
||||||
|
logging.info("Produced %s statements for %s concepts", len(lines), len(concepts))
|
||||||
|
commands = '\n'.join(['\t'.join(l) for l in lines])
|
||||||
|
|
||||||
|
assert '\tNone\t' not in commands, 'TODO'
|
||||||
|
return commands
|
||||||
|
|
||||||
|
def commands_to_quickstatements_v1_url(commands):
|
||||||
|
url = commands.replace('\t', '|').replace('\n', '||')
|
||||||
|
url = urllib.parse.quote(url, safe = '')
|
||||||
|
return 'https://quickstatements.toolforge.org/#/v1=' + url
|
||||||
|
|
||||||
|
def to_quickstatements_v1_url(concepts):
|
||||||
|
return commands_to_quickstatements_v1_url(to_quickstatements_v1(concepts))
|
24
parse.py
Normal file
24
parse.py
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
|
||||||
|
import schemeld
|
||||||
|
|
||||||
|
def determine_concepts_internal(json, context, outputs):
|
||||||
|
if isinstance(json, list):
|
||||||
|
for m in json:
|
||||||
|
determine_concepts_internal(m, context, outputs)
|
||||||
|
return
|
||||||
|
|
||||||
|
assert isinstance(json, dict), type(json)
|
||||||
|
context = urllib.parse.urlparse(json.get('@context', context))
|
||||||
|
assert context.netloc == 'schema.org'
|
||||||
|
|
||||||
|
if '@graph' in json:
|
||||||
|
determine_concepts_internal(json['@graph'], context, outputs)
|
||||||
|
else:
|
||||||
|
outputs.append(schemeld.Concept(context, json))
|
||||||
|
|
||||||
|
def determine_concepts(json):
|
||||||
|
concepts = []
|
||||||
|
determine_concepts_internal(json, '', concepts)
|
||||||
|
return concepts
|
||||||
|
|
||||||
|
|
129
schemeld.py
129
schemeld.py
|
@ -68,132 +68,3 @@ class Concept(object):
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return repr(self)
|
return repr(self)
|
||||||
|
|
||||||
def determine_concepts_internal(json, context, outputs):
|
|
||||||
if isinstance(json, list):
|
|
||||||
for m in json:
|
|
||||||
determine_concepts_internal(m, context, outputs)
|
|
||||||
return
|
|
||||||
|
|
||||||
assert isinstance(json, dict), type(json)
|
|
||||||
context = urllib.parse.urlparse(json.get('@context', context))
|
|
||||||
assert context.netloc == 'schema.org'
|
|
||||||
|
|
||||||
if '@graph' in json:
|
|
||||||
determine_concepts_internal(json['@graph'], context, outputs)
|
|
||||||
else:
|
|
||||||
outputs.append(Concept(context, json))
|
|
||||||
|
|
||||||
def determine_concepts(json):
|
|
||||||
concepts = []
|
|
||||||
determine_concepts_internal(json, '', concepts)
|
|
||||||
return concepts
|
|
||||||
|
|
||||||
REFERENCE_PROPERTIES = {'P813', 'P854', 'P248', 'P143', 'P813'}
|
|
||||||
|
|
||||||
def fmt_value(c, prefer_reference = False):
|
|
||||||
if isinstance(c, str):
|
|
||||||
return '"{}"'.format(c) # TODO: Escape
|
|
||||||
elif isinstance(c, Concept):
|
|
||||||
if '@id' in c:
|
|
||||||
return fmt_value(c['@id'], prefer_reference)
|
|
||||||
else:
|
|
||||||
logging.error('Could not determine useful id for %s', c)
|
|
||||||
return ''
|
|
||||||
elif isinstance(c, wikidata.entity.Entity):
|
|
||||||
s = c.id
|
|
||||||
if isinstance(s, int):
|
|
||||||
s = 'P{}'.format(s)
|
|
||||||
if s in REFERENCE_PROPERTIES:
|
|
||||||
s = s.replace('P', 'S', 1)
|
|
||||||
return s
|
|
||||||
elif isinstance(c, urllib.parse.ParseResult):
|
|
||||||
return c.geturl() if prefer_reference else fmt_value(c.geturl(), prefer_reference)
|
|
||||||
elif isinstance(c, datetime.datetime):
|
|
||||||
return '+{}/11'.format(c.isoformat())
|
|
||||||
elif isinstance(c, datetime.date):
|
|
||||||
return '+{}T00:00:00Z/11'.format(c.isoformat())
|
|
||||||
|
|
||||||
return str(c)
|
|
||||||
|
|
||||||
def fmt_predicate(pred, object):
|
|
||||||
if isinstance(pred, urllib.parse.ParseResult) and pred.netloc == 'schema.org':
|
|
||||||
lang = object.get('__language') or 'en'
|
|
||||||
if pred.path == '/name':
|
|
||||||
return 'L'+lang
|
|
||||||
elif pred.path == '/alternateName':
|
|
||||||
return 'A'+lang
|
|
||||||
elif pred.path == '/description':
|
|
||||||
return 'D'+lang
|
|
||||||
elif pred.path == '/sameAs':
|
|
||||||
return 'S{}wiki'.format(lang)
|
|
||||||
else:
|
|
||||||
assert False, pred
|
|
||||||
return fmt_value(pred, prefer_reference = True)
|
|
||||||
|
|
||||||
def to_quickstatements_v1_item(subject, lines, skip_impossible = True, skip_already_syncronized = True):
|
|
||||||
#assert '@id' not in subject, 'TODO: Linked subjects'
|
|
||||||
subject_id = fmt_value(subject, True) if '@id' in subject else 'LAST'
|
|
||||||
|
|
||||||
if subject_id == 'LAST':
|
|
||||||
lines.append(['CREATE'])
|
|
||||||
|
|
||||||
def fmt_key_value_pair(v, line):
|
|
||||||
if isinstance(v, list):
|
|
||||||
for e in v:
|
|
||||||
fmt_key_value_pair(e, line)
|
|
||||||
return
|
|
||||||
elif isinstance(v, dict) and 'value' in v:
|
|
||||||
line.append(fmt_value(v['value']))
|
|
||||||
for sub_k, sub_v in v.items():
|
|
||||||
if sub_k is None or sub_v is None:
|
|
||||||
continue
|
|
||||||
if not isinstance(sub_k, str):
|
|
||||||
line.append(fmt_predicate(sub_k, sub_v))
|
|
||||||
line.append(fmt_value(sub_v))
|
|
||||||
else:
|
|
||||||
line.append(fmt_value(v))
|
|
||||||
|
|
||||||
for predicate, pred_objects in subject.data.items():
|
|
||||||
if isinstance(predicate, str) and (predicate == '@id' or predicate.startswith('__')):
|
|
||||||
continue
|
|
||||||
|
|
||||||
assert isinstance(pred_objects, list)
|
|
||||||
for pred_object in pred_objects:
|
|
||||||
if pred_object.get('__synchronized_with_wikidata', False) and skip_already_syncronized:
|
|
||||||
continue
|
|
||||||
predicate_str = fmt_predicate(predicate, pred_object)
|
|
||||||
line = [subject_id, predicate_str]
|
|
||||||
fmt_key_value_pair(pred_object, line)
|
|
||||||
|
|
||||||
if skip_impossible and predicate_str.startswith('"'):
|
|
||||||
logging.warning('Bad line: %s (Lines must not start with ")', predicate_str)
|
|
||||||
continue
|
|
||||||
if '' in line and skip_impossible:
|
|
||||||
logging.warning('Bad line: %s (Lines must not contain empty names)', line)
|
|
||||||
continue
|
|
||||||
assert 'None' not in line, line
|
|
||||||
lines.append(line)
|
|
||||||
|
|
||||||
def to_quickstatements_v1(concepts):
|
|
||||||
if isinstance(concepts, Concept):
|
|
||||||
concepts = [concepts]
|
|
||||||
|
|
||||||
lines = []
|
|
||||||
|
|
||||||
for concept in concepts:
|
|
||||||
to_quickstatements_v1_item(concept, lines)
|
|
||||||
|
|
||||||
logging.info("Produced %s statements for %s concepts", len(lines), len(concepts))
|
|
||||||
commands = '\n'.join(['\t'.join(l) for l in lines])
|
|
||||||
|
|
||||||
assert '\tNone\t' not in commands, 'TODO'
|
|
||||||
return commands
|
|
||||||
|
|
||||||
def commands_to_quickstatements_v1_url(commands):
|
|
||||||
url = commands.replace('\t', '|').replace('\n', '||')
|
|
||||||
url = urllib.parse.quote(url, safe = '')
|
|
||||||
return 'https://quickstatements.toolforge.org/#/v1=' + url
|
|
||||||
|
|
||||||
def to_quickstatements_v1_url(concepts):
|
|
||||||
return commands_to_quickstatements_v1_url(to_quickstatements_v1(concepts))
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user