84 lines
2.2 KiB
Python
84 lines
2.2 KiB
Python
import os, stat, errno, fuse
|
|
from pathlib import Path
|
|
from collections.abc import Iterator
|
|
|
|
from .favro_client import FavroClient
|
|
|
|
fuse.fuse_python_api = (0, 2)
|
|
|
|
hello_path = '/hello'
|
|
hello_str = b'Hello World!\n'
|
|
|
|
class MyStat(fuse.Stat):
|
|
def __init__(self):
|
|
self.st_mode = 0
|
|
self.st_ino = 0
|
|
self.st_dev = 0
|
|
self.st_nlink = 0
|
|
self.st_uid = 0
|
|
self.st_gid = 0
|
|
self.st_size = 0
|
|
self.st_atime = 0
|
|
self.st_mtime = 0
|
|
self.st_ctime = 0
|
|
|
|
class FavroFuse(fuse.Fuse):
|
|
|
|
def __init__(self, favro_client: FavroClient, **kwargs):
|
|
self.favro_client = favro_client
|
|
super().__init__(**kwargs)
|
|
|
|
def getattr(self, path: str) -> MyStat | int:
|
|
st = MyStat()
|
|
if path == '/':
|
|
st.st_mode = stat.S_IFDIR | 0o755
|
|
st.st_nlink = 2
|
|
elif path == hello_path:
|
|
st.st_mode = stat.S_IFREG | 0o444
|
|
st.st_nlink = 1
|
|
st.st_size = len(hello_str)
|
|
else:
|
|
return -errno.ENOENT
|
|
return st
|
|
|
|
def readdir(self, path: str, offset: int) -> Iterator[fuse.Direntry]:
|
|
yield fuse.Direntry('.')
|
|
yield fuse.Direntry('..')
|
|
|
|
for card in self.favro_client.get_todo_list_cards():
|
|
yield fuse.Direntry(card.seq_id_with_prefix)
|
|
|
|
def open(self, path: str, flags) -> int:
|
|
if path != hello_path:
|
|
return -errno.ENOENT
|
|
accmode = os.O_RDONLY | os.O_WRONLY | os.O_RDWR
|
|
if (flags & accmode) != os.O_RDONLY:
|
|
return -errno.EACCES
|
|
|
|
def read(self, path: str, size: int, offset: int) -> bytes | int:
|
|
if path != hello_path:
|
|
return -errno.ENOENT
|
|
slen = len(hello_str)
|
|
if offset < slen:
|
|
if offset + size > slen:
|
|
size = slen - offset
|
|
buf = hello_str[offset:offset+size]
|
|
else:
|
|
buf = b''
|
|
return buf
|
|
|
|
HELP="""
|
|
Userspace hello example
|
|
|
|
""" + fuse.Fuse.fusage
|
|
|
|
def start_favro_fuse(favro_client: FavroClient):
|
|
# TODO:
|
|
server = FavroFuse(
|
|
favro_client = favro_client,
|
|
version='%prog ' + fuse.__version__,
|
|
usage=HELP,
|
|
dash_s_do='setsingle')
|
|
server.parse(errex=1)
|
|
server.main()
|