1
0

Kucoin coverage
All checks were successful
Build container / Package-Python (push) Successful in 23s
Build container / Package-Container (push) Successful in 1m19s

This commit is contained in:
Jon Michael Aanes 2024-05-18 22:03:34 +02:00
parent 62db705b3e
commit 53f307e40c
Signed by: Jmaa
SSH Key Fingerprint: SHA256:Ab0GfHGCblESJx7JRE4fj4bFy/KRpeLhi41y4pF3sNA
2 changed files with 52 additions and 11 deletions

View File

@ -42,6 +42,18 @@ class KucoinAccountBalances(Scraper):
yield frozendict(data_point) yield frozendict(data_point)
def addresses_to_data_points(addresses) -> frozendict:
data_point = {}
data_point['account.num_deposit_addresses'] = len(addresses)
data_point['account.update_time'] = datetime.datetime.now(tz=datetime.UTC)
if len(addresses) > 0:
for k, v in addresses[-1].items():
data_point[f'deposit.{k}'] = v
return frozendict(data_point)
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class KucoinDepositAddresses(Scraper): class KucoinDepositAddresses(Scraper):
dataset_name = 'defi_kucoin_deposit_address' dataset_name = 'defi_kucoin_deposit_address'
@ -49,14 +61,6 @@ class KucoinDepositAddresses(Scraper):
deduplicate_ignore_columns = ['account.update_time'] deduplicate_ignore_columns = ['account.update_time']
def scrape(self) -> Iterator[Mapping[str, object]]: def scrape(self) -> Iterator[Mapping[str, object]]:
data_point = {} # addresses = client.get_deposit_address('MPC')
addresses = list()
addresses = client.get_deposit_address('MPC') yield addresses_to_data_points(addresses)
data_point['account.num_deposit_addresses'] = len(addresses)
data_point['account.update_time'] = datetime.datetime.now(tz=datetime.UTC)
for k, v in addresses[-1].items():
data_point[f'deposit.{k}'] = v
yield frozendict(data_point)

37
test/test_kucoin.py Normal file
View File

@ -0,0 +1,37 @@
from frozendict import frozendict
from personal_data.fetchers.defi_kucoin import addresses_to_data_points
def test_addresses_to_data_points_none():
data_point = addresses_to_data_points([])
assert data_point['account.update_time'] is not None
expected = {
'account.num_deposit_addresses': 0,
'account.update_time': data_point['account.update_time'],
}
assert data_point == frozendict(expected)
def test_addresses_to_data_points_one():
data_point = addresses_to_data_points([{'a': 1, 'b': 2}])
assert data_point['account.update_time'] is not None
expected = {
'account.num_deposit_addresses': 1,
'account.update_time': data_point['account.update_time'],
'deposit.a': 1,
'deposit.b': 2,
}
assert data_point == frozendict(expected)
def test_addresses_to_data_points_two():
data_point = addresses_to_data_points([{'a': 1, 'b': 2}, {'a': 4, 'b': 9}])
assert data_point['account.update_time'] is not None
expected = {
'account.num_deposit_addresses': 2,
'account.update_time': data_point['account.update_time'],
'deposit.a': 4,
'deposit.b': 9,
}
assert data_point == frozendict(expected)