urbit/fuse
2016-05-04 16:25:08 -04:00

196 lines
6.3 KiB
Python
Executable File

#!/usr/bin/python
# write script for fuse/api/urb demo
# vere should start this as a child process
# filenames (.)
# speed
# could say if no . then dir (else check)
# drop size, thus no roundtrip on getattr
import os
import sys
import stat
import errno
import fuse
import logging
import json
import requests
import datetime
import collections
logging.basicConfig(level=logging.DEBUG,
format='%(levelname)s %(funcName)s %(lineno)s - %(message)s',
filename='log')
fuse.fuse_python_api = (0, 2)
url = "http://localhost:12321"
CacheEntry = collections.namedtuple('CacheEntry', ['when', 'what'])
Path = collections.namedtuple('Path',
['vane', 'ship', 'desk', 'case', 'spur', 'mark'])
class Stat(fuse.Stat):
def __init__(self):
self.st_mode = 0
self.st_ino = 0
self.st_dev = 0
self.st_nlink = 0
self.st_uid = 0
self.st_gid = 0
self.st_size = 0
self.st_atime = 0
self.st_mtime = 0
self.st_ctime = 0
class FS(fuse.Fuse):
def __init__(self, **kwargs):
self.xcache = {}
super(FS, self).__init__(**kwargs)
def _path_to_path(self, path):
logging.debug('_path_to_path %s' % path)
parts = path.split('/')[1:]
if len(parts) < 4:
logging.warn('path too short! %s' % path)
return Path('c', '=', '=', '=', '', '')
else:
return Path(parts[0],
parts[1],
parts[2],
parts[3],
'/' + '/'.join(parts[4:]) if parts[4:] else '',
parts[-1] if len(parts) > 4 else '')
def _path_to_beam(self, care, path):
return '%%%s%s /%s/%s/%s%s' \
% (path.vane, care, path.ship, path.desk, path.case, path.spur)
def _get_x(self, path):
payload = \
{'source':
{'as': {'mark': 'tang', 'next':
{'hoon': {'code': '|=(mime [leaf+(trip q.q)]~)', 'next':
{'as': {'mark': 'mime', 'next':
{'as': {'mark': path.mark, 'next':
{'dojo': '.^(noun %s)' % self._path_to_beam('x', path)}}}}}}}}},
'sink': {'stdout': None}}
r = requests.post(url, data=json.dumps(payload))
if r.text[0] == '"':
return r.text[1:-1].decode('string_escape').encode('ascii') + '\n'
else:
logging.warn('unrecognized response')
return '\n'
# we're a file iff we have contents and no children. no
# children and no contents is an implicitly-created
# directory. no contents but children is clearly a
# directory. contents and children is a directory, else
# children would be inaccessible.
def _is_dir(self, path):
logging.debug('_is_dir %s' % self._path_to_beam('a', path))
payload = \
{'source':
{'hoon': {'code': '|=(a/arch !?=({^ $~} a))', 'next':
{'dojo': '.^(arch %s)' % self._path_to_beam('y', path)}}},
'sink':
{'stdout': None}}
r = requests.post(url, data=json.dumps(payload))
logging.debug('%s is%s a directory: %s' %
(self._path_to_beam('a', path),
'' if r.text == '"%.y"' else ' not', r.text))
return r.text == '"%.y"'
def _direntries(self, path):
payload = \
{'source':
{'hoon': {'code': '|=((list {knot $~}) (turn +< head))', 'next':
{'hoon': {'code': '|=((map knot $~) (~(tap by +<)))', 'next':
{'hoon': {'code': 'tail', 'next':
{'dojo': '.^(arch %s)' % self._path_to_beam('y', path)}}}}}}},
'sink':
{'stdout': None}}
r = requests.post(url, data=json.dumps(payload))
logging.debug([x.encode('ascii') for x in r.text[1:-1].split('/')[1:]])
return [x.encode('ascii') for x in r.text[1:-1].split('/')[1:]]
def _read_file(self, path):
logging.debug('_read_file %s' % self._path_to_beam('a', path))
logging.debug('xcache %s' % str(self.xcache))
if self.xcache.get(path) is None or \
datetime.datetime.now() - self.xcache.get(path).when > \
datetime.timedelta(0, 2):
logging.debug('None %s' % self._path_to_beam('a', path))
result = self._get_x(path)
logging.debug(self.xcache)
self.xcache[path] = CacheEntry(datetime.datetime.now(), result)
logging.debug(self.xcache)
return result
else:
logging.debug('not None %s %s' %
(datetime.datetime.now() - self.xcache.get(path).when,
self.xcache.get(path)))
return self.xcache.get(path).what
def getattr(self, path):
logging.debug('getattr %s' % path)
path = self._path_to_path(path)
st = Stat()
if self._is_dir(path):
st.st_mode = stat.S_IFDIR | 0755
st.st_nlink = 2
else:
st.st_mode = stat.S_IFREG | 0444
st.st_nlink = 1
st.st_size = len(self._read_file(path))
return st
def readdir(self, path, offset):
logging.debug('readdir %s' % path)
path = self._path_to_path(path)
for child in ['.', '..'] + self._direntries(path):
yield fuse.Direntry(child)
def open(self, path, flags):
logging.debug('open %s' % path)
path = self._path_to_path(path)
accmode = os.O_RDONLY | os.O_WRONLY | os.O_RDWR
if (flags & accmode) != os.O_RDONLY:
return -errno.EACCES
def read(self, path, size, offset):
logging.debug('read %s' % path)
path = self._path_to_path(path)
text = self._read_file(path)
# with open('/home/philip/urbit/urb/log', 'w') as f:
# f.write(str([text, len(text), "\\'dull the pain\\'", len("\\'dull the pain\\'")]))
length = len(text)
if offset < length:
if offset + size > length:
size = length - offset
buf = text[offset:offset+size]
else:
buf = ''
return buf
def main():
usage="""
Just Do It
""" + fuse.Fuse.fusage
server= FS(version="%prog " + fuse.__version__,
usage=usage,
dash_s_do='setsingle')
server.parse(errex=1)
server.main()
if __name__ == '__main__':
main()