184 lines
4.7 KiB
Python
184 lines
4.7 KiB
Python
'''
|
|
Classes that listen for user input
|
|
'''
|
|
|
|
import logging, os, sys, stat
|
|
from exceptionThreading import ExceptionThread
|
|
from evdev import InputDevice, ecodes
|
|
from select import select
|
|
from auxilary import CountdownTimer, waitForPath
|
|
import stateMachine
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class KeypadListener:
|
|
'''
|
|
Interface for standard numpad device. Capabilities include:
|
|
- accepting numeric input
|
|
- volume control
|
|
- arm/disarm the stateMachine
|
|
|
|
This launches two daemon threads:
|
|
- input listener that accepts events and reacts in fun ways
|
|
- countdown timer to reset the input buffer after 30 seconds of inactivity
|
|
'''
|
|
def __init__(self, stateMachine, callbackDisarm, callbackArm, soundLib, passwd):
|
|
|
|
ctrlKeys = { 69: 'NUML', 98: '/', 14: 'BS', 96: 'ENTER'}
|
|
|
|
volKeys = { 55: '*', 74: '-', 78: '+'}
|
|
|
|
numKeys = {
|
|
71: '7', 72: '8', 73: '9',
|
|
75: '4', 76: '5', 77: '6',
|
|
79: '1', 80: '2', 81: '3',
|
|
82: '0', 83: '.'
|
|
}
|
|
|
|
numKeySound = soundLib.soundEffects['numKey']
|
|
ctrlKeySound = soundLib.soundEffects['ctrlKey']
|
|
wrongPassSound = soundLib.soundEffects['wrongPass']
|
|
backspaceSound = soundLib.soundEffects['backspace']
|
|
|
|
def getInput():
|
|
while 1:
|
|
r, w, x = select([self._dev], [], [])
|
|
for event in self._dev.read():
|
|
if event.type == 1 and event.value == 1:
|
|
|
|
# numeral input
|
|
if event.code in numKeys:
|
|
if stateMachine.currentState != stateMachine.states.disarmed:
|
|
self._buf = self._buf + numKeys[event.code]
|
|
self._startResetCountdown()
|
|
numKeySound.play()
|
|
|
|
# ctrl input
|
|
elif event.code in ctrlKeys:
|
|
val = ctrlKeys[event.code]
|
|
|
|
# disarm if correct passwd
|
|
if val=='ENTER':
|
|
if stateMachine.currentState == stateMachine.states.disarmed:
|
|
ctrlKeySound.play()
|
|
else:
|
|
if self._buf == '':
|
|
ctrlKeySound.play()
|
|
elif self._buf == passwd:
|
|
callbackDisarm()
|
|
else:
|
|
self.resetBuffer()
|
|
wrongPassSound.play()
|
|
|
|
# arm
|
|
elif val == 'NUML':
|
|
callbackArm()
|
|
ctrlKeySound.play()
|
|
|
|
# delete last char in buffer
|
|
elif val == 'BS':
|
|
self._buf = self._buf[:-1]
|
|
if self._buf == '':
|
|
self._stopResetCountdown()
|
|
backspaceSound.play()
|
|
|
|
# reset buffer
|
|
elif val == '/':
|
|
self.resetBuffer()
|
|
backspaceSound.play()
|
|
|
|
# volume input
|
|
elif event.code in volKeys:
|
|
val = volKeys[event.code]
|
|
|
|
if val == '+':
|
|
soundLib.changeVolume(10)
|
|
|
|
elif val == '-':
|
|
soundLib.changeVolume(-10)
|
|
|
|
elif val == '*':
|
|
soundLib.mute()
|
|
|
|
ctrlKeySound.play()
|
|
self._dev.set_led(ecodes.LED_NUML, 0 if soundLib.volume > 0 else 1)
|
|
|
|
self._listener = ExceptionThread(target=getInput, daemon=True)
|
|
self._resetCountdown = None
|
|
self._clearBuffer()
|
|
|
|
def start(self):
|
|
devPath = '/dev/input/by-id/usb-04d9_1203-event-kbd'
|
|
|
|
waitForPath(devPath, logger)
|
|
|
|
self._dev = InputDevice(devPath)
|
|
self._dev.grab()
|
|
|
|
self._listener.start()
|
|
logger.debug('Started keypad listener')
|
|
|
|
def resetBuffer(self):
|
|
self._stopResetCountdown
|
|
self._clearBuffer()
|
|
|
|
def _startResetCountdown(self):
|
|
self._resetCountdown = CountdownTimer(30, self._clearBuffer)
|
|
|
|
def _stopResetCountdown(self):
|
|
if self._resetCountdown is not None and self._resetCountdown.is_alive():
|
|
self._resetCountdown.stop()
|
|
self._resetCountdown = None
|
|
|
|
def _clearBuffer(self):
|
|
self._buf = ''
|
|
|
|
def __del__(self):
|
|
try:
|
|
self._dev.ungrab()
|
|
logger.debug('Released keypad device')
|
|
except IOError:
|
|
logger.error('Failed to release keypad device')
|
|
except AttributeError:
|
|
pass
|
|
|
|
class PipeListener(ExceptionThread):
|
|
'''
|
|
Creates a pipe in the /tmp directory and listens for input. Primarily
|
|
meant as a receiver for ssh sessions to echo messages to the stateMachine
|
|
(aka secrets) that trigger a signal
|
|
'''
|
|
def __init__(self, callback, name):
|
|
self._path = os.path.join('/tmp', name)
|
|
|
|
pipeMode = 0o0777
|
|
|
|
if not os.path.exists(self._path):
|
|
os.mkfifo(self._path, mode=pipeMode)
|
|
else:
|
|
st_mode = os.stat(self._path).st_mode
|
|
if not stat.S_ISFIFO(st_mode):
|
|
os.remove(self._path)
|
|
os.mkfifo(self._path, mode=pipeMode)
|
|
elif st_mode % 0o10000 != pipeMode:
|
|
os.chmod(self._path, pipeMode)
|
|
|
|
def listen():
|
|
while 1:
|
|
with open(self._path, 'r') as f:
|
|
msg = f.readline()[:-1]
|
|
callback(msg, logger)
|
|
|
|
super().__init__(target=listen, daemon=True)
|
|
|
|
def start(self):
|
|
ExceptionThread.start(self)
|
|
logger.debug('Started pipe listener at path %s', self._path)
|
|
|
|
def __del__(self):
|
|
try:
|
|
os.remove(self._path)
|
|
except FileNotFoundError:
|
|
pass
|
|
logger.debug('Cleaned up pipe listener at path %s', self._path)
|