pyledriver/listeners.py

210 lines
5.0 KiB
Python
Raw Normal View History

2017-06-05 02:08:17 -04:00
'''
Classes that listen for user input
'''
import logging, os, sys, stat
2017-06-10 02:30:28 -04:00
from threading import Timer
from exceptionThreading import ExceptionThread
2016-12-30 02:51:56 -05:00
from evdev import InputDevice, ecodes
from select import select
2017-06-10 02:30:28 -04:00
from auxilary import waitForPath
2016-12-30 02:51:56 -05:00
import stateMachine
logger = logging.getLogger(__name__)
class KeypadListener:
2017-06-05 02:08:17 -04:00
'''
Interface for standard numpad device. Capabilities include:
- accepting numeric input
- volume control
- arm/disarm the stateMachine
This launches two daemon threads:
- input listener that accepts events and reacts in fun ways
- countdown timer to reset the input buffer after 30 seconds of inactivity
'''
2017-06-17 01:43:14 -04:00
def __init__(self, stateMachine, passwd):
2016-12-30 02:51:56 -05:00
2017-06-17 01:43:14 -04:00
ctrlKeys = { 69: 'NUML', 98: '/', 55: '*', 14: 'BS', 96: 'ENTER'}
2016-12-30 02:51:56 -05:00
2017-06-17 01:43:14 -04:00
volKeys = { 74: '-', 78: '+', 83: '.'}
2016-12-30 02:51:56 -05:00
numKeys = {
71: '7', 72: '8', 73: '9',
75: '4', 76: '5', 77: '6',
79: '1', 80: '2', 81: '3',
2017-06-17 01:43:14 -04:00
82: '0'
2016-12-30 02:51:56 -05:00
}
2017-06-11 03:10:55 -04:00
soundLib = stateMachine.soundLib
2016-12-30 02:51:56 -05:00
numKeySound = soundLib.soundEffects['numKey']
ctrlKeySound = soundLib.soundEffects['ctrlKey']
wrongPassSound = soundLib.soundEffects['wrongPass']
backspaceSound = soundLib.soundEffects['backspace']
def getInput():
while 1:
2017-06-11 18:06:43 -04:00
select([self._dev], [], [])
2016-12-30 02:51:56 -05:00
for event in self._dev.read():
if event.type == 1 and event.value == 1:
# numeral input
if event.code in numKeys:
if stateMachine.currentState != stateMachine.states.disarmed:
self._buf = self._buf + numKeys[event.code]
2017-06-10 02:32:17 -04:00
self._startResetTimer()
2016-12-30 02:51:56 -05:00
numKeySound.play()
# ctrl input
elif event.code in ctrlKeys:
val = ctrlKeys[event.code]
# disarm if correct passwd
if val=='ENTER':
if stateMachine.currentState == stateMachine.states.disarmed:
ctrlKeySound.play()
else:
if self._buf == '':
ctrlKeySound.play()
elif self._buf == passwd:
self.resetBuffer()
2017-06-17 01:43:14 -04:00
stateMachine.DISARM
2016-12-30 02:51:56 -05:00
else:
self.resetBuffer()
wrongPassSound.play()
2017-06-17 01:43:14 -04:00
# lock
2016-12-30 02:51:56 -05:00
elif val == 'NUML':
self.resetBuffer()
2017-06-17 01:43:14 -04:00
stateMachine.LOCK
ctrlKeySound.play()
# instant lock
elif val == '/':
self.resetBuffer()
stateMachine.INSTANT_LOCK
ctrlKeySound.play()
# arm
elif val == '*':
self.resetBuffer()
stateMachine.ARM
2016-12-30 02:51:56 -05:00
ctrlKeySound.play()
# delete last char in buffer
elif val == 'BS':
self._buf = self._buf[:-1]
if self._buf == '':
2017-06-10 02:32:17 -04:00
self._stopResetTimer()
else:
self._startResetTimer()
2016-12-30 02:51:56 -05:00
backspaceSound.play()
# volume input
elif event.code in volKeys:
val = volKeys[event.code]
if val == '+':
soundLib.changeVolume(10)
elif val == '-':
soundLib.changeVolume(-10)
2017-06-17 01:43:14 -04:00
elif val == '.':
2016-12-30 02:51:56 -05:00
soundLib.mute()
ctrlKeySound.play()
self._dev.set_led(ecodes.LED_NUML, 0 if soundLib.volume > 0 else 1)
self._listener = ExceptionThread(target=getInput, daemon=True)
self._clearBuffer()
def start(self):
devPath = '/dev/input/by-id/usb-04d9_1203-event-kbd'
waitForPath(devPath, logger)
self._dev = InputDevice(devPath)
self._dev.grab()
self._listener.start()
logger.debug('Started keypad listener')
2017-06-10 01:52:06 -04:00
def stop(self):
try:
self._dev.ungrab()
self._dev = None
logger.debug('Released keypad device')
except IOError:
logger.error('Failed to release keypad device')
except AttributeError:
pass
def resetBuffer(self):
2017-06-10 02:32:17 -04:00
self._stopResetTimer()
self._clearBuffer()
2017-06-10 02:32:17 -04:00
def _startResetTimer(self):
self._resetTimer = t = Timer(30, self._clearBuffer)
t.daemon = True
t.start()
2016-12-30 02:51:56 -05:00
2017-06-10 02:32:17 -04:00
def _stopResetTimer(self):
2017-06-10 02:30:28 -04:00
try:
self._resetTimer.cancel()
except AttributeError:
pass
def _clearBuffer(self):
2016-12-30 02:51:56 -05:00
self._buf = ''
def __del__(self):
2017-06-10 01:52:06 -04:00
self.stop()
2016-12-30 02:51:56 -05:00
class PipeListener(ExceptionThread):
2017-06-05 02:08:17 -04:00
'''
Creates a pipe in the /tmp directory and listens for input. Primarily
meant as a receiver for ssh sessions to echo messages to the stateMachine
(aka secrets) that trigger a signal
'''
2017-06-05 01:13:46 -04:00
def __init__(self, callback, name):
2017-06-05 02:08:17 -04:00
self._path = os.path.join('/tmp', name)
pipeMode = 0o0777
2016-12-30 02:51:56 -05:00
2017-06-05 01:13:46 -04:00
if not os.path.exists(self._path):
2017-06-09 03:39:19 -04:00
os.mkfifo(self._path)
os.chmod(self._path, pipeMode)
else:
2017-06-06 01:26:18 -04:00
st_mode = os.stat(self._path).st_mode
2017-06-05 01:13:46 -04:00
if not stat.S_ISFIFO(st_mode):
os.remove(self._path)
2017-06-09 03:39:19 -04:00
os.mkfifo(self._path)
os.chmod(self._path, pipeMode)
2017-06-05 02:08:17 -04:00
elif st_mode % 0o10000 != pipeMode:
os.chmod(self._path, pipeMode)
2016-12-30 02:51:56 -05:00
2017-06-05 02:08:17 -04:00
def listen():
while 1:
with open(self._path, 'r') as f:
msg = f.readline()[:-1]
callback(msg, logger)
2017-06-05 02:08:17 -04:00
super().__init__(target=listen, daemon=True)
def start(self):
ExceptionThread.start(self)
logger.debug('Started pipe listener at path %s', self._path)
2017-06-10 01:52:06 -04:00
def stop(self):
2017-06-05 01:13:46 -04:00
try:
2016-12-30 02:51:56 -05:00
os.remove(self._path)
2017-06-10 01:52:06 -04:00
logger.debug('Cleaned up pipe listener at path %s', self._path)
2017-06-05 01:13:46 -04:00
except FileNotFoundError:
pass
2017-06-10 01:52:06 -04:00
def __del__(self):
self.stop()