add async stoppers to gst pipelines

This commit is contained in:
petrucci4prez 2017-06-03 01:28:57 -04:00
parent e4a8e6f663
commit cefcbe613d
2 changed files with 315 additions and 281 deletions

View File

@ -11,7 +11,7 @@ from listeners import KeypadListener, PipeListener
from blinkenLights import Blinkenlights from blinkenLights import Blinkenlights
from soundLib import SoundLib from soundLib import SoundLib
from webInterface import initWebInterface from webInterface import initWebInterface
from stream import initCamera, FileDump from stream import Camera, FileDump
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -135,22 +135,22 @@ class StateMachine:
self.LED = Blinkenlights(17) self.LED = Blinkenlights(17)
initCamera() self.camera = Camera()
def action(): def action():
if self.currentState == self.states.armed: if self.currentState == self.states.armed:
self.selectState(SIGNALS.TRIGGER) self.selectState(SIGNALS.TRIGGER)
fileDump = FileDump() self.fileDump = FileDump()
sensitiveStates = (self.states.armed, self.states.armedCountdown, self.states.triggered) sensitiveStates = (self.states.armed, self.states.armedCountdown, self.states.triggered)
def actionVideo(pin): def actionVideo(pin):
if self.currentState in sensitiveStates: if self.currentState in sensitiveStates:
self.selectState(SIGNALS.TRIGGER) self.selectState(SIGNALS.TRIGGER)
fileDump.addInitiator(pin) self.fileDump.addInitiator(pin)
while GPIO.input(pin) and self.currentState in sensitiveStates: while GPIO.input(pin) and self.currentState in sensitiveStates:
time.sleep(0.1) time.sleep(0.1)
fileDump.removeInitiator(pin) self.fileDump.removeInitiator(pin)
setupMotionSensor(5, 'Nate\'s room', action) setupMotionSensor(5, 'Nate\'s room', action)
setupMotionSensor(19, 'front door', action) setupMotionSensor(19, 'front door', action)
@ -210,6 +210,12 @@ class StateMachine:
if hasattr(self, 'LED'): if hasattr(self, 'LED'):
self.LED.__del__() self.LED.__del__()
if hasattr(self, 'camera'):
self.camera.__del__()
if hasattr(self, 'fileDump'):
self.fileDump.__del__()
if hasattr(self, 'soundLib'): if hasattr(self, 'soundLib'):
self.soundLib.__del__() self.soundLib.__del__()

580
stream.py
View File

@ -14,7 +14,7 @@ From a logging an error handling standpoint, all 'errors' here are logged as
import gi, time, os, logging import gi, time, os, logging
from datetime import datetime from datetime import datetime
from threading import Thread, Lock from threading import Lock, Event
from auxilary import waitForPath, mkdirSafe from auxilary import waitForPath, mkdirSafe
from exceptionThreading import async from exceptionThreading import async
@ -30,8 +30,8 @@ from gi.repository import Gst, GObject
class GstException(Exception): class GstException(Exception):
pass pass
def gstPrintMsg(pName, frmt, *args, level=logging.DEBUG, sName=None): def _gstPrintMsg(pName, frmt, *args, level=logging.DEBUG, sName=None):
if sName: if sName:
logger.log(level, '[{}] [{}] '.format(pName, sName) + frmt.format(*args)) logger.log(level, '[{}] [{}] '.format(pName, sName) + frmt.format(*args))
else: else:
@ -39,15 +39,15 @@ def gstPrintMsg(pName, frmt, *args, level=logging.DEBUG, sName=None):
if level == logging.ERROR: if level == logging.ERROR:
raise GstException raise GstException
def processErrorMessage(pName, sName, msg): def _processErrorMessage(pName, sName, msg):
error, debug = msg.parse_error() error, debug = msg.parse_error()
if debug: if debug:
gstPrintMsg(pName, '{} - Additional debug info: {}', error.message, debug, _gstPrintMsg(pName, '{} - Additional debug info: {}', error.message, debug,
level=logging.ERROR, sName=sName) level=logging.ERROR, sName=sName)
else: else:
gstPrintMsg(pName, error.message, level=logging.ERROR, sName=sName) _gstPrintMsg(pName, error.message, level=logging.ERROR, sName=sName)
def linkElements(e1, e2, caps=None): def _linkElements(e1, e2, caps=None):
if caps: if caps:
if not e1.link_filtered(e2, caps): if not e1.link_filtered(e2, caps):
logger.error('cannot link \"%s\" to \"%s\" with caps %s', logger.error('cannot link \"%s\" to \"%s\" with caps %s',
@ -58,273 +58,308 @@ def linkElements(e1, e2, caps=None):
logger.error('cannot link \%s\" to \"%s\"', e1.get_name(), e2.get_name()) logger.error('cannot link \%s\" to \"%s\"', e1.get_name(), e2.get_name())
raise SystemExit raise SystemExit
def linkTee(tee, *args): class ThreadedPipeline:
i = 0 '''
for e in args: Launches a Gst Pipeline in a separate thread. Note that the 'threaded'
teePad = tee.get_request_pad('src_{}'.format(i)) aspect is impimented via and async decorator around the mainLoop below
ePad = e.get_static_pad('sink') '''
teePad.link(ePad) def __init__(self, pName):
i += 1 self._pipeline = Gst.Pipeline.new(pName)
self._stopper = Event()
# TODO: make it clearer which pipeline everything comes from
def eventLoop(pipeline, block=True, doProgress=False, targetState=Gst.State.PLAYING):
buffering = False
inProgress = False
prerolled = targetState != Gst.State.PAUSED
pName = pipeline.get_name()
bus = pipeline.get_bus()
# TODO: add killswitch to exit this thread?
while 1:
msg = bus.timed_pop(1e18 if block else 0)
if not msg:
return
msgSrc = msg.src
msgSrcName = msgSrc.get_name()
msgType = msg.type def start(self, play=True):
msgTypeName = Gst.MessageType.get_name(msgType) self._startPipeline(play)
# messages that involve manipulating the pipeline # TODO: this might not all be necessary
if msgType == Gst.MessageType.REQUEST_STATE: def stop(self):
state = msg.parse_request_state() self._stopper.set()
self._pipeline.set_state(Gst.State.NULL)
logger.info('Setting state to %s as requested by %s', state.value_name, msgSrcName) logger.debug('Shut down gstreamer pipeline: %s', self._pipeline.get_name())
pipeline.set_state(state)
elif msgType == Gst.MessageType.CLOCK_LOST:
logger.debug('Clock lost. Getting new one.')
pipeline.set_state(Gst.State.PAUSED)
pipeline.set_state(Gst.State.PLAYING)
elif msgType == Gst.MessageType.LATENCY:
gstPrintMsg(pName, 'Redistributing latency', sName=msgSrcName)
pipeline.recalculate_latency()
# messages that do not require pipeline manipulation def __del__(self):
elif msgType == Gst.MessageType.BUFFERING: self.stop()
gstPrintMsg(pName, 'Buffering: {}', msg.parse_buffering(), sName=msgSrcName)
# TODO: make it clearer which pipeline everything comes from
elif msgType == Gst.MessageType.NEW_CLOCK: def _eventLoop(self, block=True, doProgress=False, targetState=Gst.State.PLAYING):
clock = msg.parse_new_clock() '''
clock = clock.get_name() if clock else 'NULL' This is the main loop that processes information on the bus and decides
gstPrintMsg(pName, 'New clock: {}', clock) how the pipeline should react. Sometimes this entails spitting out
messages, others it involves changing state or some other manipulation.
elif msgType == Gst.MessageType.INFO: '''
error, debug = msg.parse_info() buffering = False
inProgress = False
if debug: prerolled = targetState != Gst.State.PAUSED
gstPrintMsg(pName, debug, level=logging.INFO, sName=msgSrcName)
pName = self._pipeline.get_name()
bus = self._pipeline.get_bus()
while self._stopper:
# TODO: if we actually want to stop the pipeline asyncronously we
# need to post a message on the bus that tells it to stop. Otherwise
# it will wait here forever (or will be terminated as a daemon
# thread by its parent)
msg = bus.timed_pop(1e18 if block else 0)
if not msg:
return
elif msgType == Gst.MessageType.WARNING: msgSrc = msg.src
error, debug = msg.parse_warning() msgSrcName = msgSrc.get_name()
if debug: msgType = msg.type
gstPrintMsg(pName, '{} - Additional debug info: {}', error.message, msgTypeName = Gst.MessageType.get_name(msgType)
debug, level=logging.WARNING, sName=msgSrcName)
else: # messages that involve manipulating the pipeline
gstPrintMsg(pName, error.message, level=logging.WARNING, sName=msgSrcName) if msgType == Gst.MessageType.REQUEST_STATE:
state = msg.parse_request_state()
elif msgType == Gst.MessageType.ERROR:
processErrorMessage(pName, msgSrcName, msg) logger.info('Setting state to %s as requested by %s',
state.value_name, msgSrcName)
elif msgType == Gst.MessageType.STATE_CHANGED:
# we only care about pipeline level state changes
if msgSrc == pipeline:
old, new, pending = msg.parse_state_changed()
# we only care if we reach the final target state self._pipeline.set_state(state)
if targetState == Gst.State.PAUSED and new == Gst.State.PAUSED:
prerolled = True elif msgType == Gst.MessageType.CLOCK_LOST:
logger.debug('Clock lost. Getting new one.')
self._pipeline.set_state(Gst.State.PAUSED)
self._pipeline.set_state(Gst.State.PLAYING)
elif msgType == Gst.MessageType.LATENCY:
_gstPrintMsg(pName, 'Redistributing latency', sName=msgSrcName)
self._pipeline.recalculate_latency()
# messages that do not require pipeline manipulation
elif msgType == Gst.MessageType.BUFFERING:
_gstPrintMsg(pName, 'Buffering: {}', msg.parse_buffering(), sName=msgSrcName)
elif msgType == Gst.MessageType.NEW_CLOCK:
clock = msg.parse_new_clock()
clock = clock.get_name() if clock else 'NULL'
_gstPrintMsg(pName, 'New clock: {}', clock)
elif msgType == Gst.MessageType.INFO:
error, debug = msg.parse_info()
if debug:
_gstPrintMsg(pName, debug, level=logging.INFO, sName=msgSrcName)
if buffering: elif msgType == Gst.MessageType.WARNING:
gstPrintMsg(pName, 'Prerolled, waiting for buffering to finish', error, debug = msg.parse_warning()
level=logging.INFO)
continue if debug:
_gstPrintMsg(pName, '{} - Additional debug info: {}', error.message,
debug, level=logging.WARNING, sName=msgSrcName)
else:
_gstPrintMsg(pName, error.message, level=logging.WARNING, sName=msgSrcName)
elif msgType == Gst.MessageType.ERROR:
_processErrorMessage(pName, msgSrcName, msg)
elif msgType == Gst.MessageType.STATE_CHANGED:
# we only care about pipeline level state changes
if msgSrc == self._pipeline:
old, new, pending = msg.parse_state_changed()
# we only care if we reach the final target state
if targetState == Gst.State.PAUSED and new == Gst.State.PAUSED:
prerolled = True
if inProgress: if buffering:
gstPrintMsg(pName, 'Prerolled, waiting for progress to finish', _gstPrintMsg(pName, 'Prerolled, waiting for buffering to finish',
level=logging.INFO) level=logging.INFO)
continue continue
if inProgress:
_gstPrintMsg(pName, 'Prerolled, waiting for progress to finish',
level=logging.INFO)
continue
return
elif msgType == Gst.MessageType.PROGRESS:
progressType, code, text = msg.parse_progress()
if (progressType == Gst.ProgressType.START or
progressType == Gst.ProgressType.CONTINUE):
if doProgress:
inProgress = True
block = True
elif (progressType == Gst.ProgressType.COMPLETE or
progressType == Gst.ProgressType.CANCELLED or
progressType == Gst.ProgressType.ERROR):
inProgress = False
_gstPrintMsg(pName, 'Progress: ({}) {}', code, text, sName=msgSrcName)
if doProgress and not inProgress and not buffering and prerolled:
return
elif msgType == Gst.MessageType.HAVE_CONTEXT:
context = msg.parse_have_context()
_gstPrintMsg(
pName,
'Got context: {}={}',
context.get_context_type(),
context.get_structure().to_string(),
sName = msgSrcName
)
elif msgType == Gst.MessageType.PROPERTY_NOTIFY:
obj, propName, val = msg.parse_property_notify()
valStr = '(no value)'
if val:
if GObject.type_check_value_holds(val, GObject.TYPE_STRING):
valStr = val.dup_string()
return elif val.g_type.is_a(Gst.Caps.__gtype__):
valStr = val.get_boxed().to_string()
elif msgType == Gst.MessageType.PROGRESS:
progressType, code, text = msg.parse_progress() else:
valStr = Gst.value_serialize(val)
if (progressType == Gst.ProgressType.START or
progressType == Gst.ProgressType.CONTINUE):
if doProgress:
inProgress = True
block = True
elif (progressType == Gst.ProgressType.COMPLETE or
progressType == Gst.ProgressType.CANCELLED or
progressType == Gst.ProgressType.ERROR):
inProgress = False
gstPrintMsg(pName, 'Progress: ({}) {}', code, text, sName=msgSrcName)
if doProgress and not inProgress and not buffering and prerolled:
return
elif msgType == Gst.MessageType.HAVE_CONTEXT:
context = msg.parse_have_context()
gstPrintMsg(
pName,
'Got context: {}={}',
context.get_context_type(),
context.get_structure().to_string(),
sName = msgSrcName
)
elif msgType == Gst.MessageType.PROPERTY_NOTIFY:
obj, propName, val = msg.parse_property_notify()
valStr = '(no value)'
if val:
if GObject.type_check_value_holds(val, GObject.TYPE_STRING):
valStr = val.dup_string()
elif val.g_type.is_a(Gst.Caps.__gtype__): _gstPrintMsg(pName, '{}: {} = {}', obj.get_name(), propName,
valStr = val.get_boxed().to_string() valStr, sName=msgSrcName)
# these are things I might not need...
elif msgType == Gst.MessageType.STREAM_START:
if msgSrc == self._pipeline:
_gstPrintMsg(pName, 'Started stream', level=logging.INFO)
elif msgType == Gst.MessageType.QOS:
frmt, processed, dropped = msg.parse_qos_stats()
jitter, proportion, quality = msg.parse_qos_values()
_gstPrintMsg(
pName,
'QOS stats: jitter={} dropped={}',
jitter,
'-' if frmt == Gst.Format.UNDEFINED else dropped,
sName = msgSrcName
)
else: elif msgType == Gst.MessageType.ELEMENT:
valStr = Gst.value_serialize(val) _gstPrintMsg(pName, 'Unknown message ELEMENT', sName=msgSrcName)
gstPrintMsg(pName, '{}: {} = {}', obj.get_name(), propName, valStr, sName=msgSrcName)
# these are things I might not need...
elif msgType == Gst.MessageType.STREAM_START:
if msgSrc == pipeline:
gstPrintMsg(pName, 'Started stream', level=logging.INFO)
elif msgType == Gst.MessageType.QOS: elif msgType == Gst.MessageType.UNKNOWN:
frmt, processed, dropped = msg.parse_qos_stats() _gstPrintMsg(pName, 'Unknown message', sName=msgSrcname)
jitter, proportion, quality = msg.parse_qos_values()
gstPrintMsg(
pName,
'QOS stats: jitter={} dropped={}',
jitter,
'-' if frmt == Gst.Format.UNDEFINED else dropped,
sName = msgSrcName
)
elif msgType == Gst.MessageType.ELEMENT:
gstPrintMsg(pName, 'Unknown message ELEMENT', sName=msgSrcName)
elif msgType == Gst.MessageType.UNKNOWN:
gstPrintMsg(pName, 'Unknown message', sName=msgSrcname)
@async(daemon=True) @async(daemon=True)
def mainLoop(pipeline): def _mainLoop(self):
eventLoop(pipeline, block=True, doProgress=False, targetState=Gst.State.PLAYING) self._eventLoop(block=True, doProgress=False, targetState=Gst.State.PLAYING)
def startPipeline(pipeline, play=True): def _startPipeline(self, play):
pName = pipeline.get_name() pName = self._pipeline.get_name()
stateChange = pipeline.set_state(Gst.State.PAUSED) stateChange = self._pipeline.set_state(Gst.State.PAUSED)
gstPrintMsg(pName, 'Setting to PAUSED', level=logging.INFO) _gstPrintMsg(pName, 'Setting to PAUSED', level=logging.INFO)
if stateChange == Gst.StateChangeReturn.FAILURE: if stateChange == Gst.StateChangeReturn.FAILURE:
gstPrintMsg(pName, 'Cannot set to PAUSE', level=logging.INFO) _gstPrintMsg(pName, 'Cannot set to PAUSE', level=logging.INFO)
eventLoop(pipeline, block=False, doProgress=False, targetState=Gst.State.VOID_PENDING) self._eventLoop(block=False, doProgress=False, targetState=Gst.State.VOID_PENDING)
# we should always end up here because live # we should always end up here because live
elif stateChange == Gst.StateChangeReturn.NO_PREROLL: elif stateChange == Gst.StateChangeReturn.NO_PREROLL:
gstPrintMsg(pName, 'Live and does not need preroll') _gstPrintMsg(pName, 'Live and does not need preroll')
elif stateChange == Gst.StateChangeReturn.ASYNC: elif stateChange == Gst.StateChangeReturn.ASYNC:
gstPrintMsg(pName, 'Prerolling') _gstPrintMsg(pName, 'Prerolling')
try:
_eventLoop(block=True, doProgress=True, targetState=Gst.State.PAUSED)
except GstException:
_gstPrintMsg(pName, 'Does not want to preroll', level=logging.ERROR)
raise SystemExit
elif stateChange == Gst.StateChangeReturn.SUCCESS:
_gstPrintMsg(pName, 'Is prerolled')
# this should always succeed...
try: try:
eventLoop(pipeline, block=True, doProgress=True, targetState=Gst.State.PAUSED) self._eventLoop(block=False, doProgress=True, targetState=Gst.State.PLAYING)
except GstException: except GstException:
gstPrintMsg(pName, 'Does not want to preroll', level=logging.ERROR) _gstPrintMsg(pName, 'Does not want to preroll', level=logging.ERROR)
raise SystemExit raise SystemExit
elif stateChange == Gst.StateChangeReturn.SUCCESS: # ...and end up here
gstPrintMsg(pName, 'Is prerolled') else:
if play:
_gstPrintMsg(pName, 'Setting to PLAYING', level=logging.INFO)
# ...and since this will ALWAYS be successful...
if self._pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
_gstPrintMsg(pName, 'Cannot set to PLAYING', level=logging.ERROR)
err = self._pipeline.get_bus().pop_filtered(Gst.MessageType.Error)
_processErrorMessage(pName, msgSrcName, err)
# ...we end up here and loop until Tool releases their next album
try:
self._mainLoop()
except:
raise GstException
class Camera(ThreadedPipeline):
'''
Class for usb camera. The 'video' and 'audio' flags are meant for testing.
# this should always succeed... Makes two independent stream that share the same pipeline, and thus share a
try: clock for syncronization. Video uses the hardware-accelarated OMX extensions
eventLoop(pipeline, block=False, doProgress=True, targetState=Gst.State.PLAYING) for H264 encoding. Audio has no hardware accelaration (and thus is likely
except GstException: the most resource hungry thing in this program) and encodes using Opus. Both
gstPrintMsg(pName, 'Does not want to preroll', level=logging.ERROR) send their stream to two UDP ports (900X for video, 800X for audio, where
raise SystemExit X = 1 is used by the Janus WebRTC interface and X = 2 is used by the
# ...and end up here FileDump class below.
else: '''
if play: def __init__(self, video=True, audio=True):
gstPrintMsg(pName, 'Setting to PLAYING', level=logging.INFO) super().__init__('camera')
# ...and since this will ALWAYS be successful... vPath = '/dev/video0'
if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
gstPrintMsg(pName, 'Cannot set to PLAYING', level=logging.ERROR)
err = pipeline.get_bus().pop_filtered(Gst.MessageType.Error)
processErrorMessage(pName, msgSrcName, err)
# ...we end up here and loop until Tool releases their next album if video:
try: vSource = Gst.ElementFactory.make("v4l2src", "videoSource")
mainLoop(pipeline) vConvert = Gst.ElementFactory.make("videoconvert", "videoConvert")
except: vScale = Gst.ElementFactory.make("videoscale", "videoScale")
raise GstException vClock = Gst.ElementFactory.make("clockoverlay", "videoClock")
vEncode = Gst.ElementFactory.make("omxh264enc", "videoEncoder")
def initCamera(video=True, audio=True): vRTPPay = Gst.ElementFactory.make("rtph264pay", "videoRTPPayload")
pipeline = Gst.Pipeline.new("camera") vRTPSink = Gst.ElementFactory.make("multiudpsink", "videoRTPSink")
vPath = '/dev/video0'
if video:
vSource = Gst.ElementFactory.make("v4l2src", "videoSource")
vConvert = Gst.ElementFactory.make("videoconvert", "videoConvert")
vScale = Gst.ElementFactory.make("videoscale", "videoScale")
vClock = Gst.ElementFactory.make("clockoverlay", "videoClock")
vEncode = Gst.ElementFactory.make("omxh264enc", "videoEncoder")
vRTPPay = Gst.ElementFactory.make("rtph264pay", "videoRTPPayload")
vRTPSink = Gst.ElementFactory.make("multiudpsink", "videoRTPSink")
vSource.set_property('device', vPath)
vRTPPay.set_property('config-interval', 1)
vRTPPay.set_property('pt', 96)
vRTPSink.set_property('clients', '127.0.0.1:9001,127.0.0.1:9002')
vCaps = Gst.Caps.from_string('video/x-raw,width=640,height=480,framerate=30/1')
pipeline.add(vSource, vConvert, vScale, vClock, vEncode, vRTPPay, vRTPSink) vSource.set_property('device', vPath)
vRTPPay.set_property('config-interval', 1)
vRTPPay.set_property('pt', 96)
vRTPSink.set_property('clients', '127.0.0.1:9001,127.0.0.1:9002')
linkElements(vSource, vConvert) vCaps = Gst.Caps.from_string('video/x-raw,width=640,height=480,framerate=30/1')
linkElements(vConvert, vScale)
linkElements(vScale, vClock, vCaps) self._pipeline.add(vSource, vConvert, vScale, vClock, vEncode, vRTPPay, vRTPSink)
linkElements(vClock, vEncode)
linkElements(vEncode, vRTPPay) _linkElements(vSource, vConvert)
linkElements(vRTPPay, vRTPSink) _linkElements(vConvert, vScale)
_linkElements(vScale, vClock, vCaps)
if audio: _linkElements(vClock, vEncode)
aSource = Gst.ElementFactory.make("alsasrc", "audioSource") _linkElements(vEncode, vRTPPay)
aConvert = Gst.ElementFactory.make("audioconvert", "audioConvert") _linkElements(vRTPPay, vRTPSink)
aScale = Gst.ElementFactory.make("audioresample", "audioResample")
aEncode = Gst.ElementFactory.make("opusenc", "audioEncode")
aRTPPay = Gst.ElementFactory.make("rtpopuspay", "audioRTPPayload")
aRTPSink = Gst.ElementFactory.make("multiudpsink", "audioRTPSink")
aSource.set_property('device', 'hw:1,0')
aRTPSink.set_property('clients', '127.0.0.1:8001,127.0.0.1:8002')
aCaps = Gst.Caps.from_string('audio/x-raw,rate=48000,channels=1')
pipeline.add(aSource, aConvert, aScale, aEncode, aRTPPay, aRTPSink)
linkElements(aSource, aConvert)
linkElements(aConvert, aScale)
linkElements(aScale, aEncode, aCaps)
linkElements(aEncode, aRTPPay)
linkElements(aRTPPay, aRTPSink)
waitForPath(vPath) # video is on usb, so wait until it comes back after we hard reset the bus if audio:
aSource = Gst.ElementFactory.make("alsasrc", "audioSource")
startPipeline(pipeline) aConvert = Gst.ElementFactory.make("audioconvert", "audioConvert")
aScale = Gst.ElementFactory.make("audioresample", "audioResample")
aEncode = Gst.ElementFactory.make("opusenc", "audioEncode")
aRTPPay = Gst.ElementFactory.make("rtpopuspay", "audioRTPPayload")
aRTPSink = Gst.ElementFactory.make("multiudpsink", "audioRTPSink")
class FileDump: aSource.set_property('device', 'hw:1,0')
aRTPSink.set_property('clients', '127.0.0.1:8001,127.0.0.1:8002')
aCaps = Gst.Caps.from_string('audio/x-raw,rate=48000,channels=1')
self._pipeline.add(aSource, aConvert, aScale, aEncode, aRTPPay, aRTPSink)
_linkElements(aSource, aConvert)
_linkElements(aConvert, aScale)
_linkElements(aScale, aEncode, aCaps)
_linkElements(aEncode, aRTPPay)
_linkElements(aRTPPay, aRTPSink)
waitForPath(vPath) # video is on usb, so wait until it comes back after we hard reset the bus
self.start()
class FileDump(ThreadedPipeline):
''' '''
Pipeline that takes audio and input from two udp ports, muxes them, and Pipeline that takes audio and input from two udp ports, muxes them, and
dumps the result to a file. Intended to work with the Camera above. Will dumps the result to a file. Intended to work with the Camera above. Will
@ -348,7 +383,7 @@ class FileDump:
mkdirSafe(self._savePath, logger) mkdirSafe(self._savePath, logger)
self.pipeline = Gst.Pipeline.new('filedump') super().__init__('filedump')
aSource = Gst.ElementFactory.make('udpsrc', 'audioSource') aSource = Gst.ElementFactory.make('udpsrc', 'audioSource')
aJitBuf = Gst.ElementFactory.make('rtpjitterbuffer', 'audioJitterBuffer') aJitBuf = Gst.ElementFactory.make('rtpjitterbuffer', 'audioJitterBuffer')
@ -373,28 +408,27 @@ class FileDump:
aSource.set_property('port', 8002) aSource.set_property('port', 8002)
vSource.set_property('port', 9002) vSource.set_property('port', 9002)
self.pipeline.add(aSource, aJitBuf, aDepay, aQueue, self._pipeline.add(aSource, aJitBuf, aDepay, aQueue,
vSource, vJitBuf, vDepay, vParse, vQueue, mux, self.sink) vSource, vJitBuf, vDepay, vParse, vQueue, mux, self.sink)
linkElements(aSource, aJitBuf, aCaps) _linkElements(aSource, aJitBuf, aCaps)
linkElements(aJitBuf, aDepay) _linkElements(aJitBuf, aDepay)
linkElements(aDepay, aQueue) _linkElements(aDepay, aQueue)
linkElements(aQueue, mux) _linkElements(aQueue, mux)
linkElements(vSource, vJitBuf, vCaps) _linkElements(vSource, vJitBuf, vCaps)
linkElements(vJitBuf, vDepay) _linkElements(vJitBuf, vDepay)
linkElements(vDepay, vParse) _linkElements(vDepay, vParse)
linkElements(vParse, vQueue) _linkElements(vParse, vQueue)
linkElements(vQueue, mux) _linkElements(vQueue, mux)
linkElements(mux, self.sink) _linkElements(mux, self.sink)
# TODO: there is probably a better way to init than starting up to PAUSE # TODO: there is probably a better way to init than starting up to PAUSE
# and then dropping back down to NULL # and then dropping back down to NULL
startPipeline(self.pipeline, play=False) self.start(play=False)
self.pipeline.post_message(Gst.Message.new_request_state(self.pipeline, Gst.State.NULL)) self._pipeline.post_message(Gst.Message.new_request_state(self._pipeline, Gst.State.NULL))
def addInitiator(self, identifier): def addInitiator(self, identifier):
with self._lock: with self._lock:
@ -403,11 +437,11 @@ class FileDump:
else: else:
self._initiators.append(identifier) self._initiators.append(identifier)
if self.pipeline.get_state(Gst.CLOCK_TIME_NONE).state == Gst.State.NULL: if self._pipeline.get_state(Gst.CLOCK_TIME_NONE).state == Gst.State.NULL:
filePath = os.path.join(self._savePath, '{}.mkv'.format(datetime.now())) filePath = os.path.join(self._savePath, '{}.mkv'.format(datetime.now()))
self.sink.set_property('location', filePath) self.sink.set_property('location', filePath)
# TODO: cannot post messages from null to change state, is this bad? # TODO: cannot post messages from null to change state, is this bad?
self.pipeline.set_state(Gst.State.PLAYING) self._pipeline.set_state(Gst.State.PLAYING)
def removeInitiator(self, identifier): def removeInitiator(self, identifier):
with self._lock: with self._lock:
@ -417,12 +451,6 @@ class FileDump:
logger.warn('Attempted to remove nonexistant identifier \'%s\'', identifier) logger.warn('Attempted to remove nonexistant identifier \'%s\'', identifier)
if len(self._initiators) == 0: if len(self._initiators) == 0:
self.pipeline.set_state(Gst.State.NULL) self._pipeline.set_state(Gst.State.NULL)
Gst.init(None) Gst.init(None)
# this works for file dump
# gst-launch-1.0 -v udpsrc port=8001 ! application/x-rtp,encoding-name=OPUS,payload=96 !
# rtpjitterbuffer ! rtpopusdepay ! queue ! matroskamux name=mux ! filesink location=testicle.mkv \
# udpsrc port=9001 ! application/x-rtp,encoding-name=H264,payload=96 ! rtpjitterbuffer ! \
# rtph264depay ! h264parse ! queue ! mux.