100 lines
2.5 KiB
Python
100 lines
2.5 KiB
Python
import time, psutil, yaml, os
|
|
from subprocess import check_output, DEVNULL, CalledProcessError
|
|
from threading import Thread, Event
|
|
|
|
class ConfigFile():
|
|
'''
|
|
Presents a config yaml file as a dict-like object
|
|
'''
|
|
def __init__(self, path):
|
|
self._path = path
|
|
with open(self._path, 'r') as f:
|
|
self._dict = yaml.safe_load(f)
|
|
|
|
def __getitem__(self, key):
|
|
return self._dict[key]
|
|
|
|
def __setitem__(self, key, value):
|
|
self._dict[key] = value
|
|
|
|
def sync(self):
|
|
with open(self._path, 'w') as f:
|
|
yaml.dump(self._dict, f, default_flow_style=False)
|
|
|
|
class async:
|
|
'''
|
|
Wraps any function in a thread and starts the thread. Intended to be used as
|
|
a decorator
|
|
'''
|
|
def __init__(self, daemon=False):
|
|
self._daemon = daemon
|
|
|
|
def __call__(self, f):
|
|
def wrapper(*args, **kwargs):
|
|
t = Thread(target=f, daemon=self._daemon, args=args, kwargs=kwargs)
|
|
t.start()
|
|
return wrapper
|
|
|
|
class CountdownTimer(Thread):
|
|
'''
|
|
Launches thread which self terminates after some time (given in seconds).
|
|
Termination triggers some action (a function). Optionally, a sound can be
|
|
assigned to each 'tick'
|
|
'''
|
|
def __init__(self, countdownSeconds, action, sound=None):
|
|
self._stopper = Event()
|
|
|
|
def countdown():
|
|
for i in range(countdownSeconds, 0, -1):
|
|
if self._stopper.isSet():
|
|
return None
|
|
if sound and i < countdownSeconds:
|
|
sound.play()
|
|
time.sleep(1)
|
|
action()
|
|
|
|
super().__init__(target=countdown, daemon=True)
|
|
self.start()
|
|
|
|
def stop(self):
|
|
self._stopper.set()
|
|
|
|
def __del__(self):
|
|
self.stop()
|
|
|
|
def mkdirSafe(path, logger):
|
|
'''
|
|
Makes new dir if path does not exist, and aborts program if path exists and
|
|
path is a file not a dir. Else does nothing
|
|
'''
|
|
if not os.path.exists(path):
|
|
os.mkdir(path)
|
|
elif os.path.isfile(path):
|
|
logger.error('%s is present but is a file (vs a directory). ' \
|
|
'Please (re)move this file to prevent data loss', path)
|
|
raise SystemExit
|
|
|
|
def waitForPath(path, logger=None, timeout=30):
|
|
'''
|
|
Waits for a path to appear. Useful for procfs and sysfs where devices
|
|
regularly (dis)appear. Timeout given in seconds
|
|
'''
|
|
for i in range(0, timeout):
|
|
if os.path.exists(path):
|
|
return
|
|
time.sleep(1)
|
|
if logger:
|
|
logger.error('Could not find %s after %s seconds', path, timeout)
|
|
raise SystemExit
|
|
|
|
def resetUSBDevice(device):
|
|
'''
|
|
Resets a USB device using the de/reauthorization method. This is really
|
|
crude but works beautifully
|
|
'''
|
|
devpath = os.path.join('/sys/bus/usb/devices/' + device + '/authorized')
|
|
with open(devpath, 'w') as f:
|
|
f.write('0')
|
|
with open(devpath, 'w') as f:
|
|
f.write('1')
|