From 61ceb06d9121df35d86a1e0514223fa3863d63b8 Mon Sep 17 00:00:00 2001 From: Jon Michael Aanes Date: Mon, 8 Jul 2024 18:54:14 +0200 Subject: [PATCH] Ruff format --- datagraph/__init__.py | 10 +++--- datagraph/_version.py | 2 +- datagraph/format.py | 49 ++++++++++++++++++--------- datagraph/parse.py | 5 ++- datagraph/schemeld.py | 30 ++++++++++------- datagraph/wikidata_ext.py | 70 +++++++++++++++++++++++++-------------- test/test_datagraph.py | 8 +++-- 7 files changed, 111 insertions(+), 63 deletions(-) diff --git a/datagraph/__init__.py b/datagraph/__init__.py index 7ba4fbc..896ba1f 100644 --- a/datagraph/__init__.py +++ b/datagraph/__init__.py @@ -4,11 +4,11 @@ Utility for working with scheme+ld and other data-graph and semantic web formats """ __all__ = [ - '__version__', - 'format', - 'parse', - 'schemeld', - 'wikidata_ext', + '__version__', + 'format', + 'parse', + 'schemeld', + 'wikidata_ext', ] import os.path diff --git a/datagraph/_version.py b/datagraph/_version.py index 71d6f5c..e6d0c4f 100644 --- a/datagraph/_version.py +++ b/datagraph/_version.py @@ -1 +1 @@ -__version__ = '0.1.12' \ No newline at end of file +__version__ = '0.1.12' diff --git a/datagraph/format.py b/datagraph/format.py index 1dfe393..f767362 100644 --- a/datagraph/format.py +++ b/datagraph/format.py @@ -1,4 +1,3 @@ - import datetime import logging import urllib.parse @@ -9,9 +8,10 @@ import datagraph.schemeld REFERENCE_PROPERTIES = {'P813', 'P854', 'P248', 'P143', 'P813'} -def fmt_value(c, prefer_reference = False): + +def fmt_value(c, prefer_reference=False): if isinstance(c, str): - return f'"{c}"' # TODO: Escape + return f'"{c}"' # TODO: Escape elif isinstance(c, datagraph.schemeld.Concept): if '@id' in c: return fmt_value(c['@id'], prefer_reference) @@ -26,7 +26,9 @@ def fmt_value(c, prefer_reference = False): s = s.replace('P', 'S', 1) return s elif isinstance(c, urllib.parse.ParseResult): - return c.geturl() if prefer_reference else fmt_value(c.geturl(), prefer_reference) + return ( + c.geturl() if prefer_reference else fmt_value(c.geturl(), prefer_reference) + ) elif isinstance(c, datetime.datetime): return f'+{c.isoformat()}/11' elif isinstance(c, datetime.date): @@ -34,27 +36,32 @@ def fmt_value(c, prefer_reference = False): return str(c) + def fmt_predicate(pred, object): if isinstance(pred, urllib.parse.ParseResult) and pred.netloc == 'schema.org': lang = object.get('__language') or 'en' if pred.path == '/name': - return 'L'+lang + return 'L' + lang elif pred.path == '/alternateName': - return 'A'+lang + return 'A' + lang elif pred.path == '/description': - return 'D'+lang + return 'D' + lang elif pred.path == '/sameAs': return f'S{lang}wiki' else: assert False, pred - return fmt_value(pred, prefer_reference = True) + return fmt_value(pred, prefer_reference=True) + def assert_good_value_repr(r): assert '{' not in r assert '}' not in r -def to_quickstatements_v1_item(subject, lines, skip_impossible = True, skip_already_syncronized = True): - #assert '@id' not in subject, 'TODO: Linked subjects' + +def to_quickstatements_v1_item( + subject, lines, skip_impossible=True, skip_already_syncronized=True +): + # assert '@id' not in subject, 'TODO: Linked subjects' subject_id = fmt_value(subject, True) if '@id' in subject else 'LAST' assert_good_value_repr(subject_id) @@ -78,26 +85,36 @@ def to_quickstatements_v1_item(subject, lines, skip_impossible = True, skip_alre line.append(fmt_value(v)) for predicate, pred_objects in subject.data.items(): - if isinstance(predicate, str) and (predicate == '@id' or predicate.startswith('__')): + if isinstance(predicate, str) and ( + predicate == '@id' or predicate.startswith('__') + ): continue assert isinstance(pred_objects, list) for pred_object in pred_objects: - if pred_object.get('__synchronized_with_wikidata', False) and skip_already_syncronized: + if ( + pred_object.get('__synchronized_with_wikidata', False) + and skip_already_syncronized + ): continue predicate_str = fmt_predicate(predicate, pred_object) line = [subject_id, predicate_str] fmt_key_value_pair(pred_object, line) if skip_impossible and predicate_str.startswith('"'): - logging.warning('Bad line: %s (Lines must not start with ")', predicate_str) + logging.warning( + 'Bad line: %s (Lines must not start with ")', predicate_str + ) continue if '' in line and skip_impossible: - logging.warning('Bad line: %s (Lines must not contain empty names)', line) + logging.warning( + 'Bad line: %s (Lines must not contain empty names)', line + ) continue assert 'None' not in line, line lines.append(line) + def to_quickstatements_v1(concepts): if isinstance(concepts, datagraph.schemeld.Concept): concepts = [concepts] @@ -113,10 +130,12 @@ def to_quickstatements_v1(concepts): assert '\tNone\t' not in commands, 'TODO' return commands + def commands_to_quickstatements_v1_url(commands): url = commands.replace('\t', '|').replace('\n', '||') - url = urllib.parse.quote(url, safe = '') + url = urllib.parse.quote(url, safe='') return 'https://quickstatements.toolforge.org/#/v1=' + url + def to_quickstatements_v1_url(concepts): return commands_to_quickstatements_v1_url(to_quickstatements_v1(concepts)) diff --git a/datagraph/parse.py b/datagraph/parse.py index 4b7084d..ba767c6 100644 --- a/datagraph/parse.py +++ b/datagraph/parse.py @@ -1,4 +1,3 @@ - import json import urllib @@ -20,11 +19,13 @@ def determine_concepts_internal(json, context, outputs): else: outputs.append(schemeld.Concept(context, json)) + def determine_concepts(json): concepts = [] determine_concepts_internal(json, '', concepts) return concepts + def determine_concepts_in_soup(soup): # TODO: Check type ld_json_elements = soup.find_all('script', type='application/ld+json') @@ -33,5 +34,3 @@ def determine_concepts_in_soup(soup): json_data = json.loads(e.string) concepts.extend(determine_concepts(json_data)) return concepts - - diff --git a/datagraph/schemeld.py b/datagraph/schemeld.py index 828da80..0e80438 100644 --- a/datagraph/schemeld.py +++ b/datagraph/schemeld.py @@ -1,8 +1,8 @@ - import urllib.parse STRICT_VALIDATION = True + def canonical_keys(base_key, context): if isinstance(base_key, urllib.parse.ParseResult): return [base_key] @@ -12,15 +12,17 @@ def canonical_keys(base_key, context): return [base_key] if context is None: return [base_key] - return [context._replace(path = base_key), base_key] + return [context._replace(path=base_key), base_key] + class Concept: - def __init__(self, context, pairs): self.pairs = [] - for k, v in pairs.items(): + for k, v in pairs.items(): keys = canonical_keys(k, context) - self.pairs.append({'canonical_key': keys[0], 'keys': set(keys), 'values': v}) + self.pairs.append( + {'canonical_key': keys[0], 'keys': set(keys), 'values': v} + ) self.regenerate_by_keys() def regenerate_by_keys(self): @@ -29,11 +31,17 @@ class Concept: def __copy__(self): new = Concept(None, {}) for p in self.pairs: - new.pairs.append({'canonical_key': p['canonical_key'], 'keys': set(p['keys']), 'values': p['values']}) + new.pairs.append( + { + 'canonical_key': p['canonical_key'], + 'keys': set(p['keys']), + 'values': p['values'], + } + ) new.regenerate_by_keys() return new - def get(self, key, default = None): + def get(self, key, default=None): pairs = self.by_keys.get(key, None) return pairs['values'] if pairs is not None else default @@ -54,7 +62,7 @@ class Concept: return self.by_keys[key]['values'] def to_dict(self): - return {p['canonical_key']:p['values'] for p in self.pairs} + return {p['canonical_key']: p['values'] for p in self.pairs} def __getitem__(self, key): return self.by_keys[key]['values'] @@ -87,14 +95,12 @@ class Concept: if id := self.by_keys.get('@id'): return 'Concept {{ @id = {} }}'.format(id['values']) - return 'Concept '+str({p['canonical_key']:p['values'] for p in self.pairs}) + return 'Concept ' + str({p['canonical_key']: p['values'] for p in self.pairs}) def __str__(self): return repr(self) - def set_canonical_key(self, new_canonical_key, key = None): + def set_canonical_key(self, new_canonical_key, key=None): if key is None: key = new_canonical_key self.by_keys[key]['canonical_key'] = new_canonical_key - - diff --git a/datagraph/wikidata_ext.py b/datagraph/wikidata_ext.py index b61e52c..ec3451b 100644 --- a/datagraph/wikidata_ext.py +++ b/datagraph/wikidata_ext.py @@ -1,4 +1,3 @@ - import logging import urllib.parse @@ -6,7 +5,8 @@ import ratelimit import requests import wikidata.entity -REQUEST_SESSION = None # TODO? +REQUEST_SESSION = None # TODO? + def concept_uri(obj): assert isinstance(obj, wikidata.entity.Entity), obj @@ -17,7 +17,8 @@ def concept_uri(obj): else: assert False, 'TODO: ' + ojb.id -def fmt_triple_value(obj, prefer_obj = False): + +def fmt_triple_value(obj, prefer_obj=False): if obj is None: return '' if isinstance(obj, str): @@ -30,36 +31,45 @@ def fmt_triple_value(obj, prefer_obj = False): else: assert False, type(obj) + @ratelimit.sleep_and_retry def fetch_by_url(url, headers): logging.debug('Fetching: %s', url) - assert REQUEST_SESSION is not None, 'REQUEST_SESSION must be set, before calling fetch_by_url' - response = REQUEST_SESSION.get(url, headers = headers) + assert ( + REQUEST_SESSION is not None + ), 'REQUEST_SESSION must be set, before calling fetch_by_url' + response = REQUEST_SESSION.get(url, headers=headers) if response.status_code != 200: logging.error('Got %s error message: %s', response.status_code, response.text) return None return response + ITEMS_PER_PAGE = 'http://www.w3.org/ns/hydra/core#itemsPerPage' TOTAL_ITEMS = 'http://www.w3.org/ns/hydra/core#totalItems' + def fmt_params(subject, predicate, object): derp = [x for x in [subject, predicate, object] if x] assert len(derp) >= 1, str(derp) params = { - 'subject': fmt_triple_value(subject, prefer_obj = True), - 'predicate': fmt_triple_value(predicate, prefer_obj = True), - 'object': fmt_triple_value(object, prefer_obj = True), + 'subject': fmt_triple_value(subject, prefer_obj=True), + 'predicate': fmt_triple_value(predicate, prefer_obj=True), + 'object': fmt_triple_value(object, prefer_obj=True), 'page': 1, } return params -def get_triples_count(subject = None, predicate = None, object = None): - """Fetches first page in order to determine amount of items. - """ + +def get_triples_count(subject=None, predicate=None, object=None): + """Fetches first page in order to determine amount of items.""" params = fmt_params(subject, predicate, object) - url = requests.Request(url = 'https://query.wikidata.org/bigdata/ldf', params = params).prepare().url - response = fetch_by_url(url, headers = {'accept': 'application/ld+json'}) + url = ( + requests.Request(url='https://query.wikidata.org/bigdata/ldf', params=params) + .prepare() + .url + ) + response = fetch_by_url(url, headers={'accept': 'application/ld+json'}) if response is None: return { 'items_per_page': 0, @@ -76,13 +86,20 @@ def get_triples_count(subject = None, predicate = None, object = None): } assert False + def get_triples_internal(subject, predicate, object): params = fmt_params(subject, predicate, object) pagination_data = get_triples_count(subject, predicate, object) - for current_page in range(1, pagination_data['num_pages']+1): + for current_page in range(1, pagination_data['num_pages'] + 1): params['page'] = current_page - url = requests.Request(url = 'https://query.wikidata.org/bigdata/ldf', params = params).prepare().url - response = fetch_by_url(url, headers = {'accept': 'application/ld+json'}) + url = ( + requests.Request( + url='https://query.wikidata.org/bigdata/ldf', params=params + ) + .prepare() + .url + ) + response = fetch_by_url(url, headers={'accept': 'application/ld+json'}) json_data = response.json() for item in json_data['@graph']: @@ -97,29 +114,34 @@ def get_triples_internal(subject, predicate, object): # Bookkeeping del url, response, json_data + SCHEMA_ABOUT = urllib.parse.urlparse('http://schema.org/about') + def get_wikidata_concept_for_wikipedia_page(client, wikipage): - triples = get_triples_internal(wikipage, SCHEMA_ABOUT, None); + triples = get_triples_internal(wikipage, SCHEMA_ABOUT, None) triples = list(triples) for item in triples: s = item['about'][3:] - return client.get(s, load = False) + return client.get(s, load=False) -def get_triples(client, subject = None, predicate = None, object = None): + +def get_triples(client, subject=None, predicate=None, object=None): triples = [] iterator = get_triples_internal(subject, predicate, object) for item in iterator: is_looking_for = item['@id'].startswith('wd:') and predicate.id in item - if is_looking_for : + if is_looking_for: s = subject if s is None: - s = client.get(item['@id'][3:], load = False) + s = client.get(item['@id'][3:], load=False) o = object or item[predicate.id] yield (s, predicate, o) del item, is_looking_for -def get_backlinks(client, predicate, object): - for subject, _, _ in get_triples(client, subject = None, predicate = predicate, object = object): - yield subject +def get_backlinks(client, predicate, object): + for subject, _, _ in get_triples( + client, subject=None, predicate=predicate, object=object + ): + yield subject diff --git a/test/test_datagraph.py b/test/test_datagraph.py index f674688..f35b089 100644 --- a/test/test_datagraph.py +++ b/test/test_datagraph.py @@ -6,9 +6,11 @@ import datagraph.wikidata_ext datagraph.wikidata_ext.REQUEST_SESSION = requests_cache.CachedSession('output/testing') + def test_version(): assert datagraph.__version__ is not None + def test_get_triples(): client = wikidata.client.Client() @@ -17,8 +19,8 @@ def test_get_triples(): schema_prop = 'image' triples_iter = datagraph.wikidata_ext.get_triples( - client = client, - predicate = EQV_PROPERTY, - object = f'{schema_root}{schema_prop}', + client=client, + predicate=EQV_PROPERTY, + object=f'{schema_root}{schema_prop}', ) assert triples_iter is not None