1
0

Canonical key system

This commit is contained in:
Jon Michael Aanes 2023-09-17 12:09:17 +02:00
parent c45bbc6157
commit 1d9715a9bf
4 changed files with 94 additions and 33 deletions

View File

@ -1,12 +1,16 @@
import schemeld
import datagraph.schemeld
import urllib.parse
import wikidata.entity
import datetime
import logging
REFERENCE_PROPERTIES = {'P813', 'P854', 'P248', 'P143', 'P813'}
def fmt_value(c, prefer_reference = False):
if isinstance(c, str):
return '"{}"'.format(c) # TODO: Escape
elif isinstance(c, schemeld.Concept):
elif isinstance(c, datagraph.schemeld.Concept):
if '@id' in c:
return fmt_value(c['@id'], prefer_reference)
else:
@ -43,9 +47,14 @@ def fmt_predicate(pred, object):
assert False, pred
return fmt_value(pred, prefer_reference = True)
def assert_good_value_repr(r):
assert '{' not in r
assert '}' not in r
def to_quickstatements_v1_item(subject, lines, skip_impossible = True, skip_already_syncronized = True):
#assert '@id' not in subject, 'TODO: Linked subjects'
subject_id = fmt_value(subject, True) if '@id' in subject else 'LAST'
assert_good_value_repr(subject_id)
if subject_id == 'LAST':
lines.append(['CREATE'])
@ -88,7 +97,7 @@ def to_quickstatements_v1_item(subject, lines, skip_impossible = True, skip_alre
lines.append(line)
def to_quickstatements_v1(concepts):
if isinstance(concepts, schemeld.Concept):
if isinstance(concepts, datagraph.schemeld.Concept):
concepts = [concepts]
lines = []

View File

@ -1,6 +1,7 @@
import schemeld
import urllib
import json
def determine_concepts_internal(json, context, outputs):
if isinstance(json, list):
@ -22,4 +23,13 @@ def determine_concepts(json):
determine_concepts_internal(json, '', concepts)
return concepts
def determine_concepts_in_soup(soup):
# TODO: Check type
ld_json_elements = soup.find_all('script', type="application/ld+json")
concepts = []
for e in ld_json_elements:
json_data = json.loads(e.string)
concepts.extend(determine_concepts(json_data))
return concepts

View File

@ -10,26 +10,61 @@ from enum import Enum
STRICT_VALIDATION = True
def canonical_keys(base_key, context):
if isinstance(base_key, urllib.parse.ParseResult):
return [base_key]
if not isinstance(base_key, str):
return [base_key]
elif base_key.startswith('@'):
return [base_key]
if context is None:
return [base_key]
return [context._replace(path = base_key), base_key]
class Concept(object):
def __init__(self, context, data):
self.context = context
self.data = {self.canonical_key(k):v for (k, v) in data.items()}
def __init__(self, context, pairs):
self.pairs = []
for k, v in pairs.items():
keys = canonical_keys(k, context)
self.pairs.append({'canonical_key': keys[0], 'keys': set(keys), 'values': v})
self.regenerate_by_keys()
def get(self, key, *args, **kwargs):
return self.data.get(self.canonical_key(key, *args, **kwargs))
def regenerate_by_keys(self):
self.by_keys = {k: pair for pair in self.pairs for k in pair['keys']}
def __copy__(self):
new = Concept(None, {})
for p in self.pairs:
new.pairs.append({'canonical_key': p['canonical_key'], 'keys': set(p['keys']), 'values': p['values']})
new.regenerate_by_keys()
return new
def get(self, key, default = None):
pairs = self.by_keys.get(key, None)
return pairs['values'] if pairs is not None else default
def getlist(self, key):
result = self.get(key)
if result is None:
return []
assert isinstance(result, list), 'Not a list: ' + str(result)
return [r['value'] for r in result]
def keys(self):
return self.data.keys()
for pair in self.pairs:
yield pair['canonical_key']
def setdefault(self, key, value):
return self.data.setdefault(self.canonical_key(key), value)
if key not in self.by_keys:
self[key] = value
return self.by_keys[key]['values']
def to_dict(self):
return {k:v for k,v in self.data.items()}
return {p['canonical_key']:p['values'] for p in self.pairs}
def __getitem__(self, key):
return self.data[self.canonical_key(key)]
return self.by_keys[key]['values']
def __setitem__(self, key, value):
if STRICT_VALIDATION:
@ -40,31 +75,33 @@ class Concept(object):
assert 'value' in v, value
for subk in v:
assert not isinstance(v[subk], list), value
self.data[self.canonical_key(key)] = value
if key in self.by_keys:
self.by_keys[key]['values'] = value
else:
pair = {'canonical_key': key, 'keys': {key}, 'values': value}
self.pairs.append(pair)
self.by_keys[key] = pair
def __contains__(self, key):
return self.canonical_key(key) in self.data
return key in self.by_keys
def __delitem__(self, key):
del self.data[self.canonical_key(key)]
def canonical_key(self, key):
if isinstance(key, urllib.parse.ParseResult):
return key
if not isinstance(key, str):
return key
elif key.startswith('@'):
return key
if self.context is None:
return key
return self.context._replace(path = key)
self.pairs.remove(self.by_keys[key])
del self.by_keys[key]
def __repr__(self):
if id := self.data.get('@id'):
return 'Concept {{ @id = {} }}'.format(id)
if id := self.by_keys.get('@id'):
return 'Concept {{ @id = {} }}'.format(id['values'])
return 'Concept '+str(self.data)
return 'Concept '+str({p['canonical_key']:p['values'] for p in self.pairs})
def __str__(self):
return repr(self)
def set_canonical_key(self, new_canonical_key, key = None):
if key is None:
key = new_canonical_key
self.by_keys[key]['canonical_key'] = new_canonical_key

View File

@ -31,12 +31,11 @@ def fmt_triple_value(obj, prefer_obj = False):
assert False, type(obj)
@ratelimit.sleep_and_retry
@ratelimit.limits(calls=10, period=60)
def fetch_by_url(url, headers):
logging.warning('Fetching: %s', url)
logging.debug('Fetching: %s', url)
response = REQUEST_SESSION.get(url, headers = headers)
if response.status_code != 200:
logging.error('Got %s error message: %s', response.status_code, url)
logging.error('Got %s error message: %s', response.status_code, response.text)
return None
return response
@ -49,7 +48,7 @@ def fmt_params(subject, predicate, object):
params = {
'subject': fmt_triple_value(subject, prefer_obj = True),
'predicate': fmt_triple_value(predicate, prefer_obj = True),
'object': fmt_triple_value(object),
'object': fmt_triple_value(object, prefer_obj = True),
'page': 1,
}
return params
@ -61,6 +60,12 @@ def get_triples_count(subject = None, predicate = None, object = None):
params = fmt_params(subject, predicate, object)
url = requests.Request(url = 'https://query.wikidata.org/bigdata/ldf', params = params).prepare().url
response = fetch_by_url(url, headers = {'accept': 'application/ld+json'})
if response is None:
return {
'items_per_page': 0,
'items_total': 0,
'num_pages': 0,
}
json_data = response.json()
for item in json_data['@graph']:
if TOTAL_ITEMS in item: