1
0

Ruff format

This commit is contained in:
Jon Michael Aanes 2024-07-08 18:54:14 +02:00
parent d7ad42890b
commit 61ceb06d91
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
7 changed files with 111 additions and 63 deletions

View File

@ -4,11 +4,11 @@ Utility for working with scheme+ld and other data-graph and semantic web formats
"""
__all__ = [
'__version__',
'format',
'parse',
'schemeld',
'wikidata_ext',
'__version__',
'format',
'parse',
'schemeld',
'wikidata_ext',
]
import os.path

View File

@ -1 +1 @@
__version__ = '0.1.12'
__version__ = '0.1.12'

View File

@ -1,4 +1,3 @@
import datetime
import logging
import urllib.parse
@ -9,9 +8,10 @@ import datagraph.schemeld
REFERENCE_PROPERTIES = {'P813', 'P854', 'P248', 'P143', 'P813'}
def fmt_value(c, prefer_reference = False):
def fmt_value(c, prefer_reference=False):
if isinstance(c, str):
return f'"{c}"' # TODO: Escape
return f'"{c}"' # TODO: Escape
elif isinstance(c, datagraph.schemeld.Concept):
if '@id' in c:
return fmt_value(c['@id'], prefer_reference)
@ -26,7 +26,9 @@ def fmt_value(c, prefer_reference = False):
s = s.replace('P', 'S', 1)
return s
elif isinstance(c, urllib.parse.ParseResult):
return c.geturl() if prefer_reference else fmt_value(c.geturl(), prefer_reference)
return (
c.geturl() if prefer_reference else fmt_value(c.geturl(), prefer_reference)
)
elif isinstance(c, datetime.datetime):
return f'+{c.isoformat()}/11'
elif isinstance(c, datetime.date):
@ -34,27 +36,32 @@ def fmt_value(c, prefer_reference = False):
return str(c)
def fmt_predicate(pred, object):
if isinstance(pred, urllib.parse.ParseResult) and pred.netloc == 'schema.org':
lang = object.get('__language') or 'en'
if pred.path == '/name':
return 'L'+lang
return 'L' + lang
elif pred.path == '/alternateName':
return 'A'+lang
return 'A' + lang
elif pred.path == '/description':
return 'D'+lang
return 'D' + lang
elif pred.path == '/sameAs':
return f'S{lang}wiki'
else:
assert False, pred
return fmt_value(pred, prefer_reference = True)
return fmt_value(pred, prefer_reference=True)
def assert_good_value_repr(r):
assert '{' not in r
assert '}' not in r
def to_quickstatements_v1_item(subject, lines, skip_impossible = True, skip_already_syncronized = True):
#assert '@id' not in subject, 'TODO: Linked subjects'
def to_quickstatements_v1_item(
subject, lines, skip_impossible=True, skip_already_syncronized=True
):
# assert '@id' not in subject, 'TODO: Linked subjects'
subject_id = fmt_value(subject, True) if '@id' in subject else 'LAST'
assert_good_value_repr(subject_id)
@ -78,26 +85,36 @@ def to_quickstatements_v1_item(subject, lines, skip_impossible = True, skip_alre
line.append(fmt_value(v))
for predicate, pred_objects in subject.data.items():
if isinstance(predicate, str) and (predicate == '@id' or predicate.startswith('__')):
if isinstance(predicate, str) and (
predicate == '@id' or predicate.startswith('__')
):
continue
assert isinstance(pred_objects, list)
for pred_object in pred_objects:
if pred_object.get('__synchronized_with_wikidata', False) and skip_already_syncronized:
if (
pred_object.get('__synchronized_with_wikidata', False)
and skip_already_syncronized
):
continue
predicate_str = fmt_predicate(predicate, pred_object)
line = [subject_id, predicate_str]
fmt_key_value_pair(pred_object, line)
if skip_impossible and predicate_str.startswith('"'):
logging.warning('Bad line: %s (Lines must not start with ")', predicate_str)
logging.warning(
'Bad line: %s (Lines must not start with ")', predicate_str
)
continue
if '' in line and skip_impossible:
logging.warning('Bad line: %s (Lines must not contain empty names)', line)
logging.warning(
'Bad line: %s (Lines must not contain empty names)', line
)
continue
assert 'None' not in line, line
lines.append(line)
def to_quickstatements_v1(concepts):
if isinstance(concepts, datagraph.schemeld.Concept):
concepts = [concepts]
@ -113,10 +130,12 @@ def to_quickstatements_v1(concepts):
assert '\tNone\t' not in commands, 'TODO'
return commands
def commands_to_quickstatements_v1_url(commands):
url = commands.replace('\t', '|').replace('\n', '||')
url = urllib.parse.quote(url, safe = '')
url = urllib.parse.quote(url, safe='')
return 'https://quickstatements.toolforge.org/#/v1=' + url
def to_quickstatements_v1_url(concepts):
return commands_to_quickstatements_v1_url(to_quickstatements_v1(concepts))

View File

@ -1,4 +1,3 @@
import json
import urllib
@ -20,11 +19,13 @@ def determine_concepts_internal(json, context, outputs):
else:
outputs.append(schemeld.Concept(context, json))
def determine_concepts(json):
concepts = []
determine_concepts_internal(json, '', concepts)
return concepts
def determine_concepts_in_soup(soup):
# TODO: Check type
ld_json_elements = soup.find_all('script', type='application/ld+json')
@ -33,5 +34,3 @@ def determine_concepts_in_soup(soup):
json_data = json.loads(e.string)
concepts.extend(determine_concepts(json_data))
return concepts

View File

@ -1,8 +1,8 @@
import urllib.parse
STRICT_VALIDATION = True
def canonical_keys(base_key, context):
if isinstance(base_key, urllib.parse.ParseResult):
return [base_key]
@ -12,15 +12,17 @@ def canonical_keys(base_key, context):
return [base_key]
if context is None:
return [base_key]
return [context._replace(path = base_key), base_key]
return [context._replace(path=base_key), base_key]
class Concept:
def __init__(self, context, pairs):
self.pairs = []
for k, v in pairs.items():
for k, v in pairs.items():
keys = canonical_keys(k, context)
self.pairs.append({'canonical_key': keys[0], 'keys': set(keys), 'values': v})
self.pairs.append(
{'canonical_key': keys[0], 'keys': set(keys), 'values': v}
)
self.regenerate_by_keys()
def regenerate_by_keys(self):
@ -29,11 +31,17 @@ class Concept:
def __copy__(self):
new = Concept(None, {})
for p in self.pairs:
new.pairs.append({'canonical_key': p['canonical_key'], 'keys': set(p['keys']), 'values': p['values']})
new.pairs.append(
{
'canonical_key': p['canonical_key'],
'keys': set(p['keys']),
'values': p['values'],
}
)
new.regenerate_by_keys()
return new
def get(self, key, default = None):
def get(self, key, default=None):
pairs = self.by_keys.get(key, None)
return pairs['values'] if pairs is not None else default
@ -54,7 +62,7 @@ class Concept:
return self.by_keys[key]['values']
def to_dict(self):
return {p['canonical_key']:p['values'] for p in self.pairs}
return {p['canonical_key']: p['values'] for p in self.pairs}
def __getitem__(self, key):
return self.by_keys[key]['values']
@ -87,14 +95,12 @@ class Concept:
if id := self.by_keys.get('@id'):
return 'Concept {{ @id = {} }}'.format(id['values'])
return 'Concept '+str({p['canonical_key']:p['values'] for p in self.pairs})
return 'Concept ' + str({p['canonical_key']: p['values'] for p in self.pairs})
def __str__(self):
return repr(self)
def set_canonical_key(self, new_canonical_key, key = None):
def set_canonical_key(self, new_canonical_key, key=None):
if key is None:
key = new_canonical_key
self.by_keys[key]['canonical_key'] = new_canonical_key

View File

@ -1,4 +1,3 @@
import logging
import urllib.parse
@ -6,7 +5,8 @@ import ratelimit
import requests
import wikidata.entity
REQUEST_SESSION = None # TODO?
REQUEST_SESSION = None # TODO?
def concept_uri(obj):
assert isinstance(obj, wikidata.entity.Entity), obj
@ -17,7 +17,8 @@ def concept_uri(obj):
else:
assert False, 'TODO: ' + ojb.id
def fmt_triple_value(obj, prefer_obj = False):
def fmt_triple_value(obj, prefer_obj=False):
if obj is None:
return ''
if isinstance(obj, str):
@ -30,36 +31,45 @@ def fmt_triple_value(obj, prefer_obj = False):
else:
assert False, type(obj)
@ratelimit.sleep_and_retry
def fetch_by_url(url, headers):
logging.debug('Fetching: %s', url)
assert REQUEST_SESSION is not None, 'REQUEST_SESSION must be set, before calling fetch_by_url'
response = REQUEST_SESSION.get(url, headers = headers)
assert (
REQUEST_SESSION is not None
), 'REQUEST_SESSION must be set, before calling fetch_by_url'
response = REQUEST_SESSION.get(url, headers=headers)
if response.status_code != 200:
logging.error('Got %s error message: %s', response.status_code, response.text)
return None
return response
ITEMS_PER_PAGE = 'http://www.w3.org/ns/hydra/core#itemsPerPage'
TOTAL_ITEMS = 'http://www.w3.org/ns/hydra/core#totalItems'
def fmt_params(subject, predicate, object):
derp = [x for x in [subject, predicate, object] if x]
assert len(derp) >= 1, str(derp)
params = {
'subject': fmt_triple_value(subject, prefer_obj = True),
'predicate': fmt_triple_value(predicate, prefer_obj = True),
'object': fmt_triple_value(object, prefer_obj = True),
'subject': fmt_triple_value(subject, prefer_obj=True),
'predicate': fmt_triple_value(predicate, prefer_obj=True),
'object': fmt_triple_value(object, prefer_obj=True),
'page': 1,
}
return params
def get_triples_count(subject = None, predicate = None, object = None):
"""Fetches first page in order to determine amount of items.
"""
def get_triples_count(subject=None, predicate=None, object=None):
"""Fetches first page in order to determine amount of items."""
params = fmt_params(subject, predicate, object)
url = requests.Request(url = 'https://query.wikidata.org/bigdata/ldf', params = params).prepare().url
response = fetch_by_url(url, headers = {'accept': 'application/ld+json'})
url = (
requests.Request(url='https://query.wikidata.org/bigdata/ldf', params=params)
.prepare()
.url
)
response = fetch_by_url(url, headers={'accept': 'application/ld+json'})
if response is None:
return {
'items_per_page': 0,
@ -76,13 +86,20 @@ def get_triples_count(subject = None, predicate = None, object = None):
}
assert False
def get_triples_internal(subject, predicate, object):
params = fmt_params(subject, predicate, object)
pagination_data = get_triples_count(subject, predicate, object)
for current_page in range(1, pagination_data['num_pages']+1):
for current_page in range(1, pagination_data['num_pages'] + 1):
params['page'] = current_page
url = requests.Request(url = 'https://query.wikidata.org/bigdata/ldf', params = params).prepare().url
response = fetch_by_url(url, headers = {'accept': 'application/ld+json'})
url = (
requests.Request(
url='https://query.wikidata.org/bigdata/ldf', params=params
)
.prepare()
.url
)
response = fetch_by_url(url, headers={'accept': 'application/ld+json'})
json_data = response.json()
for item in json_data['@graph']:
@ -97,29 +114,34 @@ def get_triples_internal(subject, predicate, object):
# Bookkeeping
del url, response, json_data
SCHEMA_ABOUT = urllib.parse.urlparse('http://schema.org/about')
def get_wikidata_concept_for_wikipedia_page(client, wikipage):
triples = get_triples_internal(wikipage, SCHEMA_ABOUT, None);
triples = get_triples_internal(wikipage, SCHEMA_ABOUT, None)
triples = list(triples)
for item in triples:
s = item['about'][3:]
return client.get(s, load = False)
return client.get(s, load=False)
def get_triples(client, subject = None, predicate = None, object = None):
def get_triples(client, subject=None, predicate=None, object=None):
triples = []
iterator = get_triples_internal(subject, predicate, object)
for item in iterator:
is_looking_for = item['@id'].startswith('wd:') and predicate.id in item
if is_looking_for :
if is_looking_for:
s = subject
if s is None:
s = client.get(item['@id'][3:], load = False)
s = client.get(item['@id'][3:], load=False)
o = object or item[predicate.id]
yield (s, predicate, o)
del item, is_looking_for
def get_backlinks(client, predicate, object):
for subject, _, _ in get_triples(client, subject = None, predicate = predicate, object = object):
yield subject
def get_backlinks(client, predicate, object):
for subject, _, _ in get_triples(
client, subject=None, predicate=predicate, object=object
):
yield subject

View File

@ -6,9 +6,11 @@ import datagraph.wikidata_ext
datagraph.wikidata_ext.REQUEST_SESSION = requests_cache.CachedSession('output/testing')
def test_version():
assert datagraph.__version__ is not None
def test_get_triples():
client = wikidata.client.Client()
@ -17,8 +19,8 @@ def test_get_triples():
schema_prop = 'image'
triples_iter = datagraph.wikidata_ext.get_triples(
client = client,
predicate = EQV_PROPERTY,
object = f'{schema_root}{schema_prop}',
client=client,
predicate=EQV_PROPERTY,
object=f'{schema_root}{schema_prop}',
)
assert triples_iter is not None