From a4a167165098cf6901ceb718d1df16e56e07c696 Mon Sep 17 00:00:00 2001 From: petrucci4prez Date: Fri, 2 Jun 2017 00:15:05 -0400 Subject: [PATCH] add filedump functionality --- config.yaml | 2 +- sharedLogging.py | 12 +++--- stateMachine.py | 18 ++++---- stream.py | 107 ++++++++++++++++++++++++++++++++++++----------- 4 files changed, 99 insertions(+), 40 deletions(-) diff --git a/config.yaml b/config.yaml index f3295e1..94f8313 100644 --- a/config.yaml +++ b/config.yaml @@ -1,6 +1,6 @@ gmail: - username: natedwarshuis@gmail.com passwd: bwsasfxqjbookmed recipientList: - natedwarshuis@gmail.com + username: natedwarshuis@gmail.com state: disarmed diff --git a/sharedLogging.py b/sharedLogging.py index c9e7fed..3ac8c62 100644 --- a/sharedLogging.py +++ b/sharedLogging.py @@ -40,7 +40,7 @@ class GlusterFSHandler(TimedRotatingFileHandler): if not os.path.exists(mountpoint): raise FileNotFoundError - self._mountpoint = mountpoint + self.mountpoint = mountpoint self._server = server self._volume = volume self._options = options @@ -56,18 +56,20 @@ class GlusterFSHandler(TimedRotatingFileHandler): self.setFormatter(fmt) def _mount(self): - if os.path.ismount(self._mountpoint): + if os.path.ismount(self.mountpoint): # this assumes that the already-mounted device is the one intended - logger.warning('Device already mounted at {}'.format(self._mountpoint)) + logger.warning('Device already mounted at {}'.format(self.mountpoint)) else: dst = self._server + ':/' + self._volume - cmd = ['mount', '-t', 'glusterfs', dst, self._mountpoint] + cmd = ['mount', '-t', 'glusterfs', dst, self.mountpoint] if self._options: cmd[1:1] = ['-o', self._options] self._run(cmd) + self.isMounted = True def _unmount(self): - self._run(['umount', self._mountpoint]) + self._run(['umount', self.mountpoint]) + self.isMounted = False def _run(self, cmd): try: diff --git a/stateMachine.py b/stateMachine.py index ccec62e..cf61740 100644 --- a/stateMachine.py +++ b/stateMachine.py @@ -1,6 +1,5 @@ import RPi.GPIO as GPIO import time, logging -from datetime import datetime from threading import Lock from functools import partial from collections import namedtuple @@ -12,7 +11,7 @@ from listeners import KeypadListener, PipeListener from blinkenLights import Blinkenlights from soundLib import SoundLib from webInterface import initWebInterface -from stream import initCamera +from stream import initCamera, FileDump logger = logging.getLogger(__name__) @@ -142,15 +141,16 @@ class StateMachine: if self.currentState == self.states.armed: self.selectState(SIGNALS.TRIGGER) + fileDump = FileDump() + sensitiveStates = (self.states.armed, self.states.armedCountdown, self.states.triggered) + def actionVideo(pin): - if self.currentState in (self.states.armed, self.states.armedCountdown, self.states.triggered): + if self.currentState in sensitiveStates: self.selectState(SIGNALS.TRIGGER) - while GPIO.input(pin): - # TODO: check that this path exists - path = '/mnt/glusterfs/pyledriver/images/%s.jpg' - #~ with open(path % datetime.now(), 'wb') as f: - #~ f.write(camera.getFrame()) - time.sleep(0.2) + fileDump.addInitiator(pin) + while GPIO.input(pin) and self.currentState in sensitiveStates: + time.sleep(0.1) + fileDump.removeInitiator(pin) setupMotionSensor(5, 'Nate\'s room', action) setupMotionSensor(19, 'front door', action) diff --git a/stream.py b/stream.py index faab1a6..9d26604 100644 --- a/stream.py +++ b/stream.py @@ -12,10 +12,12 @@ From a logging an error handling standpoint, all 'errors' here are logged as 'critical' which will shut down the entire program and send an email. """ -from auxilary import async, waitForPath -from threading import Thread +import gi, time, os, logging +from datetime import datetime +from threading import Thread, Lock -import gi, time, logging +from auxilary import async, waitForPath, mkdirSafe +from sharedLogging import gluster logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -72,9 +74,10 @@ def eventLoop(pipeline, block=True, doProgress=False, targetState=Gst.State.PLAY pName = pipeline.get_name() bus = pipeline.get_bus() + # TODO: add killswitch to exit this thread? while 1: msg = bus.timed_pop(1e18 if block else 0) - + if not msg: return @@ -87,9 +90,8 @@ def eventLoop(pipeline, block=True, doProgress=False, targetState=Gst.State.PLAY # messages that involve manipulating the pipeline if msgType == Gst.MessageType.REQUEST_STATE: state = msg.parse_request_state() - - logger.info('Setting state to %s as requested by %s', - Gst.Element.get_state_name(state), msgSrcName) + + logger.info('Setting state to %s as requested by %s', state.value_name, msgSrcName) pipeline.set_state(state) @@ -218,7 +220,10 @@ def eventLoop(pipeline, block=True, doProgress=False, targetState=Gst.State.PLAY elif msgType == Gst.MessageType.UNKNOWN: gstPrintMsg(pName, 'Unknown message', sName=msgSrcname) -@async(daemon=True) +@async(daemon=True) +def mainLoop(pipeline): + eventLoop(pipeline, block=True, doProgress=False, targetState=Gst.State.PLAYING) + def startPipeline(pipeline, play=True): pName = pipeline.get_name() stateChange = pipeline.set_state(Gst.State.PAUSED) @@ -259,7 +264,7 @@ def startPipeline(pipeline, play=True): # ...we end up here and loop until Tool releases their next album try: - eventLoop(pipeline, block=True, doProgress=False, targetState=Gst.State.PLAYING) + mainLoop(pipeline) except: raise GstException @@ -319,47 +324,99 @@ def initCamera(video=True, audio=True): startPipeline(pipeline) class FileDump: + ''' + Pipeline that takes audio and input from two udp ports, muxes them, and + dumps the result to a file. Intended to work with the Camera above. Will + init to a paused state, then will transition to a playing state (which will + dump the file) when at least one initiator registers with the class. + + Initiators are represented by unique identifiers held in a list. The current + use case is that each identifier is for the pin of the IR sensor that + detects motion, and thus adding a pin number to the list signifies that + video/audio should be recorded + ''' def __init__(self): + self._initiators = [] + self._lock = Lock() + + if not gluster.isMounted: + logger.error('Attempting to init FileDump without gluster mounted. Aborting') + raise SystemExit + + self._savePath = os.path.join(gluster.mountpoint + '/video') + + mkdirSafe(self._savePath, logger) + self.pipeline = Gst.Pipeline.new('filedump') aSource = Gst.ElementFactory.make('udpsrc', 'audioSource') - aParse = Gst.ElementFactory.make('opusparse', 'audioParse') + aJitBuf = Gst.ElementFactory.make('rtpjitterbuffer', 'audioJitterBuffer') + aDepay = Gst.ElementFactory.make('rtpopusdepay', 'audioDepay') aQueue = Gst.ElementFactory.make('queue', 'audioQueue') + aCaps = Gst.Caps.from_string('application/x-rtp,encoding-name=OPUS,payload=96') + vSource = Gst.ElementFactory.make('udpsrc', 'videoSource') + vJitBuf = Gst.ElementFactory.make('rtpjitterbuffer', 'videoJitterBuffer') + vDepay = Gst.ElementFactory.make('rtph264depay', 'videoDepay') vParse = Gst.ElementFactory.make('h264parse', 'videoParse') vQueue = Gst.ElementFactory.make('queue', 'videoQueue') + vCaps = Gst.Caps.from_string('application/x-rtp,encoding-name=H264,payload=96') + mux = Gst.ElementFactory.make('matroskamux', 'mux') self.sink = Gst.ElementFactory.make('filesink', 'sink') self.sink.set_property('location', '/dev/null') - aSource.set_property('port', 8000) - vSource.set_property('port', 9000) + aSource.set_property('port', 8002) + vSource.set_property('port', 9002) - self.pipeline.add(aSource, aParse, aQueue, vSource, vParse, vQueue, mux, self.sink) + self.pipeline.add(aSource, aJitBuf, aDepay, aQueue, + vSource, vJitBuf, vDepay, vParse, vQueue, mux, self.sink) - linkElements(vSource, vParse) + + linkElements(aSource, aJitBuf, aCaps) + linkElements(aJitBuf, aDepay) + linkElements(aDepay, aQueue) + linkElements(aQueue, mux) + + linkElements(vSource, vJitBuf, vCaps) + linkElements(vJitBuf, vDepay) + linkElements(vDepay, vParse) linkElements(vParse, vQueue) linkElements(vQueue, mux) - linkElements(aSource, aParse) - linkElements(aParse, aQueue) - linkElements(aQueue, mux) - linkElements(mux, self.sink) - startPipeline(self.pipeline, play=False) + # TODO: there is probably a better way to init than starting up to PAUSE + # and then dropping back down to NULL + startPipeline(self.pipeline, play=False) - def setPath(self, path): - self.sink.set_property('location', path) + self.pipeline.post_message(Gst.Message.new_request_state(self.pipeline, Gst.State.NULL)) - def play(self): - self.pipeline.set_state(Gst.State.PLAYING) + def addInitiator(self, identifier): + with self._lock: + if identifier in self._initiators: + logger.warn('Identifier \'%s\' already in FileDump initiator list', identifier) + else: + self._initiators.append(identifier) + + if self.pipeline.get_state(Gst.CLOCK_TIME_NONE).state == Gst.State.NULL: + filePath = os.path.join(self._savePath, '{}.mkv'.format(datetime.now())) + self.sink.set_property('location', filePath) + # TODO: cannot post messages from null to change state, is this bad? + self.pipeline.set_state(Gst.State.PLAYING) - def pause(self): - self.pipeline.set_state(Gst.State.PAUSED) + def removeInitiator(self, identifier): + with self._lock: + try: + self._initiators.remove(identifier) + except ValueError: + logger.warn('Attempted to remove nonexistant identifier \'%s\'', identifier) + + if len(self._initiators) == 0: + self.pipeline.set_state(Gst.State.NULL) Gst.init(None)