pyledriver/listeners.py

184 lines
4.7 KiB
Python

'''
Classes that listen for user input
'''
import logging, os, sys, stat
from exceptionThreading import ExceptionThread
from evdev import InputDevice, ecodes
from select import select
from auxilary import CountdownTimer, waitForPath
import stateMachine
logger = logging.getLogger(__name__)
class KeypadListener:
'''
Interface for standard numpad device. Capabilities include:
- accepting numeric input
- volume control
- arm/disarm the stateMachine
This launches two daemon threads:
- input listener that accepts events and reacts in fun ways
- countdown timer to reset the input buffer after 30 seconds of inactivity
'''
def __init__(self, stateMachine, callbackDisarm, callbackArm, soundLib, passwd):
ctrlKeys = { 69: 'NUML', 98: '/', 14: 'BS', 96: 'ENTER'}
volKeys = { 55: '*', 74: '-', 78: '+'}
numKeys = {
71: '7', 72: '8', 73: '9',
75: '4', 76: '5', 77: '6',
79: '1', 80: '2', 81: '3',
82: '0', 83: '.'
}
numKeySound = soundLib.soundEffects['numKey']
ctrlKeySound = soundLib.soundEffects['ctrlKey']
wrongPassSound = soundLib.soundEffects['wrongPass']
backspaceSound = soundLib.soundEffects['backspace']
def getInput():
while 1:
r, w, x = select([self._dev], [], [])
for event in self._dev.read():
if event.type == 1 and event.value == 1:
# numeral input
if event.code in numKeys:
if stateMachine.currentState != stateMachine.states.disarmed:
self._buf = self._buf + numKeys[event.code]
self._startResetCountdown()
numKeySound.play()
# ctrl input
elif event.code in ctrlKeys:
val = ctrlKeys[event.code]
# disarm if correct passwd
if val=='ENTER':
if stateMachine.currentState == stateMachine.states.disarmed:
ctrlKeySound.play()
else:
if self._buf == '':
ctrlKeySound.play()
elif self._buf == passwd:
callbackDisarm()
else:
self.resetBuffer()
wrongPassSound.play()
# arm
elif val == 'NUML':
callbackArm()
ctrlKeySound.play()
# delete last char in buffer
elif val == 'BS':
self._buf = self._buf[:-1]
if self._buf == '':
self._stopResetCountdown()
backspaceSound.play()
# reset buffer
elif val == '/':
self.resetBuffer()
backspaceSound.play()
# volume input
elif event.code in volKeys:
val = volKeys[event.code]
if val == '+':
soundLib.changeVolume(10)
elif val == '-':
soundLib.changeVolume(-10)
elif val == '*':
soundLib.mute()
ctrlKeySound.play()
self._dev.set_led(ecodes.LED_NUML, 0 if soundLib.volume > 0 else 1)
self._listener = ExceptionThread(target=getInput, daemon=True)
self._resetCountdown = None
self._clearBuffer()
def start(self):
devPath = '/dev/input/by-id/usb-04d9_1203-event-kbd'
waitForPath(devPath, logger)
self._dev = InputDevice(devPath)
self._dev.grab()
self._listener.start()
logger.debug('Started keypad listener')
def resetBuffer(self):
self._stopResetCountdown
self._clearBuffer()
def _startResetCountdown(self):
self._resetCountdown = CountdownTimer(30, self._clearBuffer)
def _stopResetCountdown(self):
if self._resetCountdown is not None and self._resetCountdown.is_alive():
self._resetCountdown.stop()
self._resetCountdown = None
def _clearBuffer(self):
self._buf = ''
def __del__(self):
try:
self._dev.ungrab()
logger.debug('Released keypad device')
except IOError:
logger.error('Failed to release keypad device')
except AttributeError:
pass
class PipeListener(ExceptionThread):
'''
Creates a pipe in the /tmp directory and listens for input. Primarily
meant as a receiver for ssh sessions to echo messages to the stateMachine
(aka secrets) that trigger a signal
'''
def __init__(self, callback, name):
self._path = os.path.join('/tmp', name)
pipeMode = 0o0777
if not os.path.exists(self._path):
os.mkfifo(self._path, mode=pipeMode)
else:
st_mode = os.stat(self._path).st_mode
if not stat.S_ISFIFO(st_mode):
os.remove(self._path)
os.mkfifo(self._path, mode=pipeMode)
elif st_mode % 0o10000 != pipeMode:
os.chmod(self._path, pipeMode)
def listen():
while 1:
with open(self._path, 'r') as f:
msg = f.readline()[:-1]
callback(msg, logger)
super().__init__(target=listen, daemon=True)
def start(self):
ExceptionThread.start(self)
logger.debug('Started pipe listener at path %s', self._path)
def __del__(self):
try:
os.remove(self._path)
except FileNotFoundError:
pass
logger.debug('Cleaned up pipe listener at path %s', self._path)