1
0
crypto-tax/crypto_tax/__main__.py

118 lines
3.6 KiB
Python
Raw Normal View History

2024-12-14 22:38:06 +00:00
import sys
import requests
import fin_depo
from . import secrets
from fin_defs import CryptoCurrency, AssetAmount, MPC, Asset, USDT
from decimal import Decimal
from collections import deque
from fin_depo.data import *
import datetime
import dataclasses
KUCOIN_CLIENT = fin_depo.defi_kucoin.KucoinDepoFetcher(
secrets.KUCOIN_KEY,
secrets.KUCOIN_SECRET,
secrets.KUCOIN_PASS,
)
@dataclasses.dataclass
class BoughtAndSold:
amount: AssetAmount
time_bought: datetime.datetime
time_sold: datetime.datetime
@dataclasses.dataclass
class BoughtAndNotYetSold:
amount: AssetAmount
time_bought: datetime.datetime
def compute_tax(transfers: list):
bought_and_sold_for: list[BoughtAndSold] = []
prices_bought_for: dict[Asset, deque[BoughtAndNotYetSold]] = {}
prices_bought_for[MPC] = deque()
prices_bought_for[MPC].append(BoughtAndNotYetSold(AssetAmount(MPC, Decimal(100)), datetime.datetime(2000, 1,1,1,1,1,1)))
prices_bought_for[USDT] = deque()
prices_bought_for[USDT].append(BoughtAndNotYetSold(AssetAmount(USDT, Decimal(100)), datetime.datetime(2000, 1,1,1,1,1,1)))
def sell(amount: AssetAmount, executed_time: datetime.datetime):
print(f'Selling: {amount}')
while amount.amount > 0:
if len(prices_bought_for[amount.asset]) == 0:
raise Exception('Miscalculation: ' + str(amount))
partial = prices_bought_for[amount.asset].popleft()
print(f'Fuck : {amount} - {partial.amount}')
amount_covered_by_partial = min(partial.amount, amount)
amount -= amount_covered_by_partial
new_partial_amount = partial.amount - amount_covered_by_partial
print(f' => {amount} ({new_partial_amount}')
# Re-add partial
if new_partial_amount.amount != 0:
new_partial = BoughtAndNotYetSold(new_partial_amount, partial.time_bought)
print(f' => new partial: {new_partial}')
prices_bought_for[amount.asset].appendleft(new_partial)
del new_partial
# Add FIFO like
bought_and_sold_for.append(BoughtAndSold(amount_covered_by_partial, partial.time_bought, executed_time))
del partial, amount_covered_by_partial
for transfer in transfers:
print(transfer)
# Buy
if output := transfer.output:
print('Buy')
if output.asset not in prices_bought_for:
prices_bought_for[output.asset] = deque()
prices_bought_for[output.asset].append(BoughtAndNotYetSold(output, transfer.executed_time))
del output
# Sell
if _input := transfer.input:
print('Sell')
sell(_input, transfer.executed_time)
del transfer, _input
if True:
print('Bought for:')
for asset, prices in prices_bought_for.items():
price_sum = sum((p.amount for p in prices), start=AssetAmount.ZERO)
sys.stdout.write(f' - {asset.raw_short_name():10} ({price_sum}): ')
for price in prices:
sys.stdout.write(str(price.amount.amount))
sys.stdout.write(', ')
del price
sys.stdout.write('\n')
del asset, prices
print('-'*80)
for bas in bought_and_sold_for:
print(f'{bas.amount} ({bas.time_bought} ----> {bas.time_sold})')
for c, l in prices_bought_for.items():
print(c)
print(l)
def main():
"""Main function."""
TRANSFERS = list(KUCOIN_CLIENT._get_double_registers())
compute_tax(TRANSFERS)
if __name__ == '__main__':
main()