1
0

partisia blockchain

This commit is contained in:
Jon Michael Aanes 2024-04-16 23:23:27 +02:00
parent c521dd35a6
commit 5f5e7e6776
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
7 changed files with 66 additions and 10 deletions

View File

@ -20,6 +20,7 @@ import personal_data.fetchers.crunchyroll
import personal_data.fetchers.ffxiv_lodestone import personal_data.fetchers.ffxiv_lodestone
import personal_data.fetchers.playstation import personal_data.fetchers.playstation
import personal_data.fetchers.psnprofiles import personal_data.fetchers.psnprofiles
import personal_data.fetchers.partisia_blockchain
from personal_data._version import __version__ from personal_data._version import __version__
CSV_DIALECT = 'one_true_dialect' CSV_DIALECT = 'one_true_dialect'
@ -133,12 +134,10 @@ def get_session(cookiejar, *, with_cfscrape: bool) -> requests.Session:
return session return session
def main(): def main(scraper_filter: frozenset[str]):
cookiejar = browsercookie.firefox() cookiejar = browsercookie.firefox()
logger.warning('Got cookiejar from firefox: %s cookies', len(cookiejar)) logger.warning('Got cookiejar from firefox: %s cookies', len(cookiejar))
scraper_filter = {'PsnProfilesScraper'}
for scraper_cls in personal_data.data.Scraper.__subclasses__(): for scraper_cls in personal_data.data.Scraper.__subclasses__():
session = get_session(cookiejar, with_cfscrape=scraper_cls.requires_cfscrape()) session = get_session(cookiejar, with_cfscrape=scraper_cls.requires_cfscrape())
scraper = scraper_cls(session) scraper = scraper_cls(session)

View File

@ -1,4 +1,15 @@
import personal_data import personal_data
import argparse
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('fetchers', metavar='FETCHER', type=str, nargs='+')
return parser.parse_args()
def main():
args = parse_arguments()
scraper_filter = frozenset(args.fetchers)
personal_data.main(scraper_filter)
if __name__ == '__main__': if __name__ == '__main__':
personal_data.main() main()

View File

@ -15,22 +15,18 @@ class DeduplicateMode(Enum):
class Scraper(abc.ABC): class Scraper(abc.ABC):
session: requests.Session session: requests.Session
@abc.abstractmethod
@staticmethod @staticmethod
def dataset_name() -> str: def dataset_name() -> str:
pass pass
@abc.abstractmethod
@staticmethod @staticmethod
def deduplicate_mode() -> DeduplicateMode: def deduplicate_mode() -> DeduplicateMode:
pass pass
@abc.abstractmethod
@staticmethod @staticmethod
def dataset_format() -> str: def dataset_format() -> str:
return 'list-of-dicts' return 'list-of-dicts'
@abc.abstractmethod
@staticmethod @staticmethod
def requires_cfscrape() -> bool: def requires_cfscrape() -> bool:
return False return False

View File

View File

@ -47,8 +47,6 @@ class LodestoneAchievementScraper(Scraper):
scripts=False, scripts=False,
) )
# print(soup)
for entry in soup.select('.ldst__achievement ul li.entry'): for entry in soup.select('.ldst__achievement ul li.entry'):
time_acquired = str(entry.script.text).strip() time_acquired = str(entry.script.text).strip()
time_acquired = re.search( time_acquired = re.search(

View File

@ -0,0 +1,52 @@
import dataclasses
import datetime
import logging
import re
import secrets
import bs4
import personal_data.html_util
import personal_data.parse_util
from personal_data.data import DeduplicateMode, Scraper
logger = logging.getLogger(__name__)
# mainnet: https://reader.partisiablockchain.com
# testnet: https://node1.testnet.partisiablockchain.com
HOSTNAME = 'reader.partisiablockchain.com'
URL_ACCOUNT_PLUGIN = 'https://{hostname}/{shard}blockchain/accountPlugin/local'
@dataclasses.dataclass(frozen=True)
class MpcBalance(Scraper):
dataset_name = 'defi_mpc_balance'
deduplicate_mode = DeduplicateMode.BY_ALL_COLUMNS
def scrape(self):
address = '0019e9a28c978dd65114cc4e0bcb876770805b0349' # TODO
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
}
url = URL_ACCOUNT_PLUGIN.format(
hostname = HOSTNAME,
shard = 'shards/Shard0/',
)
data = f"{{\"path\":[{{\"type\":\"field\",\"name\":\"accounts\"}},{{\"type\":\"avl\",\"keyType\":\"BLOCKCHAIN_ADDRESS\",\"key\":\"{address}\"}}]}}"
print(data)
response = self.session.post(url, headers = headers, data=data)
response.raise_for_status()
json = response.json()
print(json)
assert False

0
test/__init__.py Normal file
View File