1
0

[FUSE]: Collection directories WIP
All checks were successful
Test Python / Test (push) Successful in 26s

This commit is contained in:
Jon Michael Aanes 2024-10-02 17:17:14 +02:00
parent 17ab429ed3
commit a476c6eac4
2 changed files with 41 additions and 17 deletions

View File

@ -16,6 +16,7 @@ from .favro_data_model import (
CustomFieldId,
CustomFieldInfo,
OrganizationId,
CollectionId,
SeqId,
TagId,
TagInfo,
@ -115,9 +116,10 @@ class FavroClient:
self,
*,
seq_id: SeqId | None = None,
collection_id: CollectionId | None = None,
todo_list=False,
) -> Iterator[Card]:
request = self._get_cards_request(seq_id=seq_id,todo_list=todo_list)
request = self._get_cards_request(seq_id=seq_id,todo_list=todo_list,collection_id=collection_id)
yield from self._get_paginated(request, Card.from_json)
def get_collections(self) -> Iterator[Collection]:
@ -134,18 +136,15 @@ class FavroClient:
self,
seq_id: SeqId | None = None,
todo_list: bool = False,
request_id: None | str = None,
page: None | int = None,
collection_id: CollectionId | None = None,
) -> requests.Request:
params = {'descriptionFormat': 'markdown'}
if seq_id is not None:
params['cardSequentialId'] = str(seq_id.raw_id)
if todo_list is True:
params['todoList'] = 'true'
if request_id:
params['requestId'] = request_id
if page:
params['page'] = page
if collection_id is not None:
params['collectionId'] = str(collection_id.raw_id)
return requests.Request('GET', URL_GET_CARDS, params=params)

View File

@ -104,14 +104,19 @@ class RootFileSystemItem(FileSystemItem):
def from_path_segment(segment: str) -> 'RootFileSystemItem':
return RootFileSystemItem()
def __str__(self):
return '/'
@dataclasses.dataclass(frozen=True)
class OrganizationFileSystemItem(FileSystemItem):
name: str
class CollectionFileSystemItem(FileSystemItem):
collection_name: str
@staticmethod
def from_path_segment(segment: str) -> 'OrganizationFileSystemItem':
return OrganizationFileSystemItem(segment)
def from_path_segment(segment: str) -> 'CollectionFileSystemItem':
return CollectionFileSystemItem(segment)
def __str__(self):
return self.collection_name
@dataclasses.dataclass(frozen=True)
class CardFileSystemItem(FileSystemItem):
@ -123,6 +128,9 @@ class CardFileSystemItem(FileSystemItem):
return CardFileSystemItem(SeqId(int(m.group(1))))
return None
def __str__(self):
return CARD_FILENAME_FORMAT.format(seq_id=self.seq_id.raw_id)
def path_to_file_system_item(path_str: str, path_components: list[type[FileSystemItem]]) -> FileSystemItem | None:
path = re.findall(r'[^/]+', path_str)
@ -142,15 +150,15 @@ class FavroFuse(fuse.Fuse):
self.favro_client = favro_client
self.formatter = formatter
self.wiped_cards = set()
#self.path_components = [RootFileSystemItem, OrganizationFileSystemItem, CardFileSystemItem]
self.path_components = [RootFileSystemItem, CardFileSystemItem]
self.path_components = [RootFileSystemItem, CollectionFileSystemItem, CardFileSystemItem]
super().__init__(**kwargs)
def getattr(self, path: str) -> FavroStat | int:
file_system_item = path_to_file_system_item(path, self.path_components)
print(file_system_item )
st = FavroStat()
if isinstance(file_system_item, RootFileSystemItem):
if isinstance(file_system_item, RootFileSystemItem | CollectionFileSystemItem):
st.st_mode = stat.S_IFDIR | 0o755
st.st_nlink = 2
elif isinstance(file_system_item, CardFileSystemItem):
@ -166,12 +174,29 @@ class FavroFuse(fuse.Fuse):
return st
def readdir(self, path: str, offset: int) -> Iterator[fuse.Direntry]:
logger.warning('readdir(path=%s, offset=%s)', path, offset)
file_system_item = path_to_file_system_item(path, self.path_components)
yield fuse.Direntry('.')
yield fuse.Direntry('..')
for card in self.favro_client.get_todo_list_cards():
yield fuse.Direntry(CARD_FILENAME_FORMAT.format(seq_id=card.seq_id.raw_id))
if isinstance(file_system_item, RootFileSystemItem):
for collection in self.favro_client.get_collections():
print(collection)
yield fuse.Direntry(str(CollectionFileSystemItem(collection.name)))
del collection
elif isinstance(file_system_item, CollectionFileSystemItem):
# TODO: move into own function
for collection in self.favro_client.get_collections():
if collection.name == file_system_item.collection_name:
collection_id = collection.collection_id
del collection
print('Collection', collection_id)
for card in self.favro_client.get_cards(collection_id=collection_id):
yield fuse.Direntry(str(CardFileSystemItem(card.seq_id)))
del card
def open(self, path: str, flags: int) -> int | None:
file_system_item = path_to_file_system_item(path, self.path_components)