Compare commits
3 Commits
1f3bef0772
...
7bf85498ae
Author | SHA1 | Date | |
---|---|---|---|
7bf85498ae | |||
73baf6bd29 | |||
a99f2c134d |
|
@ -59,7 +59,10 @@ def assert_good_value_repr(r):
|
|||
|
||||
|
||||
def to_quickstatements_v1_item(
|
||||
subject, lines, skip_impossible=True, skip_already_syncronized=True
|
||||
subject,
|
||||
lines,
|
||||
skip_impossible=True,
|
||||
skip_already_syncronized=True,
|
||||
):
|
||||
# assert '@id' not in subject, 'TODO: Linked subjects'
|
||||
subject_id = fmt_value(subject, True) if '@id' in subject else 'LAST'
|
||||
|
@ -103,12 +106,14 @@ def to_quickstatements_v1_item(
|
|||
|
||||
if skip_impossible and predicate_str.startswith('"'):
|
||||
logging.warning(
|
||||
'Bad line: %s (Lines must not start with ")', predicate_str
|
||||
'Bad line: %s (Lines must not start with ")',
|
||||
predicate_str,
|
||||
)
|
||||
continue
|
||||
if '' in line and skip_impossible:
|
||||
logging.warning(
|
||||
'Bad line: %s (Lines must not contain empty names)', line
|
||||
'Bad line: %s (Lines must not contain empty names)',
|
||||
line,
|
||||
)
|
||||
continue
|
||||
assert 'None' not in line, line
|
||||
|
|
|
@ -1,14 +1,19 @@
|
|||
import urllib.parse
|
||||
from collections.abc import Iterator
|
||||
from typing import Any
|
||||
|
||||
STRICT_VALIDATION = True
|
||||
|
||||
Key = int | str | urllib.parse.ParseResult
|
||||
Context = str # TODO
|
||||
|
||||
def canonical_keys(base_key, context):
|
||||
|
||||
def canonical_keys(base_key: Key, context: Context | None) -> list[Any]:
|
||||
if isinstance(base_key, urllib.parse.ParseResult):
|
||||
return [base_key]
|
||||
if not isinstance(base_key, str):
|
||||
return [base_key]
|
||||
elif base_key.startswith('@'):
|
||||
if base_key.startswith('@'):
|
||||
return [base_key]
|
||||
if context is None:
|
||||
return [base_key]
|
||||
|
@ -16,19 +21,19 @@ def canonical_keys(base_key, context):
|
|||
|
||||
|
||||
class Concept:
|
||||
def __init__(self, context, pairs):
|
||||
def __init__(self, context: Context | None, pairs: dict[Key, str]) -> None:
|
||||
self.pairs = []
|
||||
for k, v in pairs.items():
|
||||
keys = canonical_keys(k, context)
|
||||
self.pairs.append(
|
||||
{'canonical_key': keys[0], 'keys': set(keys), 'values': v}
|
||||
{'canonical_key': keys[0], 'keys': set(keys), 'values': v},
|
||||
)
|
||||
self.regenerate_by_keys()
|
||||
|
||||
def regenerate_by_keys(self):
|
||||
def regenerate_by_keys(self) -> None:
|
||||
self.by_keys = {k: pair for pair in self.pairs for k in pair['keys']}
|
||||
|
||||
def __copy__(self):
|
||||
def __copy__(self) -> 'Concept':
|
||||
new = Concept(None, {})
|
||||
for p in self.pairs:
|
||||
new.pairs.append(
|
||||
|
@ -36,46 +41,54 @@ class Concept:
|
|||
'canonical_key': p['canonical_key'],
|
||||
'keys': set(p['keys']),
|
||||
'values': p['values'],
|
||||
}
|
||||
},
|
||||
)
|
||||
new.regenerate_by_keys()
|
||||
return new
|
||||
|
||||
def get(self, key, default=None):
|
||||
def get(self, key: Key, default=None):
|
||||
pairs = self.by_keys.get(key, None)
|
||||
return pairs['values'] if pairs is not None else default
|
||||
|
||||
def getlist(self, key):
|
||||
def getlist(self, key: Key) -> list[Any]:
|
||||
result = self.get(key)
|
||||
if result is None:
|
||||
return []
|
||||
assert isinstance(result, list), 'Not a list: ' + str(result)
|
||||
if not isinstance(result, list):
|
||||
msg = f'Not a list: {result}'
|
||||
raise TypeError(msg)
|
||||
return [r['value'] for r in result]
|
||||
|
||||
def keys(self):
|
||||
def keys(self) -> Iterator[Key]:
|
||||
for pair in self.pairs:
|
||||
yield pair['canonical_key']
|
||||
|
||||
def setdefault(self, key, value):
|
||||
def setdefault(self, key: Key, value):
|
||||
if key not in self.by_keys:
|
||||
self[key] = value
|
||||
return self.by_keys[key]['values']
|
||||
|
||||
def to_dict(self):
|
||||
def to_dict(self) -> dict[Key, Any]:
|
||||
return {p['canonical_key']: p['values'] for p in self.pairs}
|
||||
|
||||
def __getitem__(self, key):
|
||||
def __getitem__(self, key: Key):
|
||||
return self.by_keys[key]['values']
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
def __setitem__(self, key: Key, value) -> None:
|
||||
if STRICT_VALIDATION:
|
||||
if not isinstance(key, str) or key != '@id':
|
||||
assert isinstance(value, list), value
|
||||
if not isinstance(value, list):
|
||||
raise TypeError(value)
|
||||
for v in value:
|
||||
assert isinstance(v, dict), value
|
||||
assert 'value' in v, value
|
||||
if not isinstance(value, dict):
|
||||
raise TypeError(value)
|
||||
if 'value' not in v:
|
||||
raise TypeError(value)
|
||||
for subk in v:
|
||||
assert not isinstance(v[subk], list), value
|
||||
if isinstance(v[subk], list):
|
||||
raise TypeError(value)
|
||||
del subk
|
||||
del v
|
||||
|
||||
if key in self.by_keys:
|
||||
self.by_keys[key]['values'] = value
|
||||
|
@ -84,23 +97,23 @@ class Concept:
|
|||
self.pairs.append(pair)
|
||||
self.by_keys[key] = pair
|
||||
|
||||
def __contains__(self, key):
|
||||
def __contains__(self, key: Key) -> bool:
|
||||
return key in self.by_keys
|
||||
|
||||
def __delitem__(self, key):
|
||||
def __delitem__(self, key: Key) -> None:
|
||||
self.pairs.remove(self.by_keys[key])
|
||||
del self.by_keys[key]
|
||||
|
||||
def __repr__(self):
|
||||
if id := self.by_keys.get('@id'):
|
||||
return 'Concept {{ @id = {} }}'.format(id['values'])
|
||||
def __repr__(self) -> str:
|
||||
if object_id := self.by_keys.get('@id'):
|
||||
return 'Concept {{ @id = {} }}'.format(object_id['values'])
|
||||
|
||||
return 'Concept ' + str({p['canonical_key']: p['values'] for p in self.pairs})
|
||||
|
||||
def __str__(self):
|
||||
def __str__(self) -> str:
|
||||
return repr(self)
|
||||
|
||||
def set_canonical_key(self, new_canonical_key, key=None):
|
||||
def set_canonical_key(self, new_canonical_key: Key, key: Key | None = None):
|
||||
if key is None:
|
||||
key = new_canonical_key
|
||||
self.by_keys[key]['canonical_key'] = new_canonical_key
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
import logging
|
||||
import urllib.parse
|
||||
from collections.abc import Iterator
|
||||
from typing import Any
|
||||
|
||||
import ratelimit
|
||||
import requests
|
||||
|
@ -8,38 +10,43 @@ import wikidata.entity
|
|||
REQUEST_SESSION = None # TODO?
|
||||
|
||||
|
||||
def concept_uri(obj):
|
||||
assert isinstance(obj, wikidata.entity.Entity), obj
|
||||
def concept_uri(obj: wikidata.entity.Entity) -> urllib.parse.ParseResult:
|
||||
if obj.id.startswith('P'):
|
||||
return urllib.parse.urlparse(f'http://www.wikidata.org/prop/direct/{obj.id}')
|
||||
elif obj.id.startswith('Q'):
|
||||
if obj.id.startswith('Q'):
|
||||
return urllib.parse.urlparse(f'http://www.wikidata.org/entity/{obj.id}')
|
||||
else:
|
||||
assert False, 'TODO: ' + ojb.id
|
||||
|
||||
msg = f'Object id scheme not supported: {obj.id}'
|
||||
raise ValueError(msg)
|
||||
|
||||
|
||||
def fmt_triple_value(obj, prefer_obj=False):
|
||||
def format_value_for_triple_request(obj: Any, prefer_obj=False) -> str:
|
||||
if obj is None:
|
||||
return ''
|
||||
if isinstance(obj, str):
|
||||
return f'"{obj}"'
|
||||
elif isinstance(obj, urllib.parse.ParseResult):
|
||||
return obj.geturl() if prefer_obj else fmt_triple_value(obj.geturl())
|
||||
elif isinstance(obj, wikidata.entity.Entity):
|
||||
if isinstance(obj, urllib.parse.ParseResult):
|
||||
return (
|
||||
obj.geturl()
|
||||
if prefer_obj
|
||||
else format_value_for_triple_request(obj.geturl())
|
||||
)
|
||||
if isinstance(obj, wikidata.entity.Entity):
|
||||
uri = concept_uri(obj)
|
||||
return fmt_triple_value(uri, True)
|
||||
else:
|
||||
assert False, type(obj)
|
||||
return format_value_for_triple_request(uri, True)
|
||||
|
||||
msg = f'Type cannot be formatted: {type(obj)}'
|
||||
raise TypeError(msg)
|
||||
|
||||
|
||||
@ratelimit.sleep_and_retry
|
||||
def fetch_by_url(url, headers):
|
||||
def fetch_by_url(url: str, headers: dict[str, str]):
|
||||
logging.debug('Fetching: %s', url)
|
||||
assert (
|
||||
REQUEST_SESSION is not None
|
||||
), 'REQUEST_SESSION must be set, before calling fetch_by_url'
|
||||
if REQUEST_SESSION is None:
|
||||
msg = 'REQUEST_SESSION must be set, before calling fetch_by_url'
|
||||
raise RuntimeError(msg)
|
||||
response = REQUEST_SESSION.get(url, headers=headers)
|
||||
if response.status_code != 200:
|
||||
if not response.status_code.ok:
|
||||
logging.error('Got %s error message: %s', response.status_code, response.text)
|
||||
return None
|
||||
return response
|
||||
|
@ -49,22 +56,27 @@ ITEMS_PER_PAGE = 'http://www.w3.org/ns/hydra/core#itemsPerPage'
|
|||
TOTAL_ITEMS = 'http://www.w3.org/ns/hydra/core#totalItems'
|
||||
|
||||
|
||||
def fmt_params(subject, predicate, object):
|
||||
derp = [x for x in [subject, predicate, object] if x]
|
||||
assert len(derp) >= 1, str(derp)
|
||||
params = {
|
||||
'subject': fmt_triple_value(subject, prefer_obj=True),
|
||||
'predicate': fmt_triple_value(predicate, prefer_obj=True),
|
||||
'object': fmt_triple_value(object, prefer_obj=True),
|
||||
def fmt_params(subject: Any, predicate: Any, object_: Any) -> dict[str, str | int]:
|
||||
entities = [x for x in [subject, predicate, object_] if x]
|
||||
if len(entities) == 0:
|
||||
msg = 'There are no entities for this query!'
|
||||
raise RuntimeError(msg)
|
||||
return {
|
||||
'subject': format_value_for_triple_request(subject, prefer_obj=True),
|
||||
'predicate': format_value_for_triple_request(predicate, prefer_obj=True),
|
||||
'object': format_value_for_triple_request(object_, prefer_obj=True),
|
||||
'page': 1,
|
||||
}
|
||||
return params
|
||||
|
||||
|
||||
def get_triples_count(subject=None, predicate=None, object=None):
|
||||
def get_triples_count(
|
||||
subject: Any = None,
|
||||
predicate: Any = None,
|
||||
object_: Any = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Fetches first page in order to determine amount of items."""
|
||||
params = fmt_params(subject, predicate, object)
|
||||
url = (
|
||||
params = fmt_params(subject, predicate, object_)
|
||||
url: str = (
|
||||
requests.Request(url='https://query.wikidata.org/bigdata/ldf', params=params)
|
||||
.prepare()
|
||||
.url
|
||||
|
@ -84,17 +96,20 @@ def get_triples_count(subject=None, predicate=None, object=None):
|
|||
'items_total': item[TOTAL_ITEMS],
|
||||
'num_pages': int((item[TOTAL_ITEMS] - 1) / item[ITEMS_PER_PAGE] + 1),
|
||||
}
|
||||
assert False
|
||||
|
||||
msg = 'Could not determine triple count'
|
||||
raise RuntimeError(msg)
|
||||
|
||||
|
||||
def get_triples_internal(subject, predicate, object):
|
||||
params = fmt_params(subject, predicate, object)
|
||||
pagination_data = get_triples_count(subject, predicate, object)
|
||||
def get_triples_internal(subject: Any, predicate: Any, object_: Any) -> Iterator[dict]:
|
||||
params = fmt_params(subject, predicate, object_)
|
||||
pagination_data = get_triples_count(subject, predicate, object_)
|
||||
for current_page in range(1, pagination_data['num_pages'] + 1):
|
||||
params['page'] = current_page
|
||||
url = (
|
||||
requests.Request(
|
||||
url='https://query.wikidata.org/bigdata/ldf', params=params
|
||||
url='https://query.wikidata.org/bigdata/ldf',
|
||||
params=params,
|
||||
)
|
||||
.prepare()
|
||||
.url
|
||||
|
@ -118,30 +133,46 @@ def get_triples_internal(subject, predicate, object):
|
|||
SCHEMA_ABOUT = urllib.parse.urlparse('http://schema.org/about')
|
||||
|
||||
|
||||
def get_wikidata_concept_for_wikipedia_page(client, wikipage):
|
||||
def get_wikidata_concept_for_wikipedia_page(
|
||||
client,
|
||||
wikipage: str,
|
||||
) -> wikidata.entity.Entity | None:
|
||||
triples = get_triples_internal(wikipage, SCHEMA_ABOUT, None)
|
||||
triples = list(triples)
|
||||
for item in triples:
|
||||
for item in list(triples):
|
||||
s = item['about'][3:]
|
||||
return client.get(s, load=False)
|
||||
return None
|
||||
|
||||
|
||||
def get_triples(client, subject=None, predicate=None, object=None):
|
||||
triples = []
|
||||
iterator = get_triples_internal(subject, predicate, object)
|
||||
def get_triples(
|
||||
client,
|
||||
subject: Any = None,
|
||||
predicate: Any = None,
|
||||
object_: Any = None,
|
||||
) -> Iterator[
|
||||
tuple[wikidata.entity.Entity, wikidata.entity.Entity, wikidata.entity.Entity]
|
||||
]:
|
||||
iterator = get_triples_internal(subject, predicate, object_)
|
||||
for item in iterator:
|
||||
is_looking_for = item['@id'].startswith('wd:') and predicate.id in item
|
||||
if is_looking_for:
|
||||
s = subject
|
||||
if s is None:
|
||||
s = client.get(item['@id'][3:], load=False)
|
||||
o = object or item[predicate.id]
|
||||
o = object_ or item[predicate.id]
|
||||
yield (s, predicate, o)
|
||||
del item, is_looking_for
|
||||
|
||||
|
||||
def get_backlinks(client, predicate, object):
|
||||
def get_backlinks(
|
||||
client,
|
||||
predicate: Any,
|
||||
object_: Any,
|
||||
) -> Iterator[wikidata.entity.Entity]:
|
||||
for subject, _, _ in get_triples(
|
||||
client, subject=None, predicate=predicate, object=object
|
||||
client,
|
||||
subject=None,
|
||||
predicate=predicate,
|
||||
object_=object_,
|
||||
):
|
||||
yield subject
|
||||
|
|
|
@ -14,13 +14,13 @@ def test_version():
|
|||
def test_get_triples():
|
||||
client = wikidata.client.Client()
|
||||
|
||||
EQV_PROPERTY = client.get('P1628')
|
||||
eqv_property = client.get('P1628')
|
||||
schema_root = 'https://schema.org/'
|
||||
schema_prop = 'image'
|
||||
|
||||
triples_iter = datagraph.wikidata_ext.get_triples(
|
||||
client=client,
|
||||
predicate=EQV_PROPERTY,
|
||||
predicate=eqv_property,
|
||||
object=f'{schema_root}{schema_prop}',
|
||||
)
|
||||
assert triples_iter is not None
|
||||
|
|
Loading…
Reference in New Issue
Block a user