add camera and filedump

This commit is contained in:
petrucci4prez 2017-05-29 22:03:07 -04:00
parent 82cfa64527
commit c002fc08eb
1 changed files with 349 additions and 65 deletions

414
stream.py
View File

@ -1,76 +1,360 @@
#! /bin/python #! /bin/python
# this entire module is lovingly based on the gst-launch tool kindly provided
# by the gstreamer wizards themselves...with unecessary crap cut out
# we make the following assumptions here and optimize as such
# - all streams are "live"
# - will not need EOS (no mp4s)
# - will not require SIGINT (this entire program won't understand them anyways)
# - no tags or TOCs
from auxilary import async from auxilary import async
from threading import Thread
import gi, time import gi, time
gi.require_version('Gst', '1.0') gi.require_version('Gst', '1.0')
gi.require_version('GObject', '2.0')
from gi.repository import Gst from gi.repository import Gst, GObject
Gst.init(None) class GstException(Exception):
pipe = Gst.Pipeline.new("streamer")
bus = pipe.get_bus()
vidSrc = Gst.ElementFactory.make("v4l2src", "vidSrc")
vidConv = Gst.ElementFactory.make("videoconvert", "vidConv")
vidScale = Gst.ElementFactory.make("videoscale", "vidScale")
vidClock = Gst.ElementFactory.make("clockoverlay", "vidClock")
vidEncode = Gst.ElementFactory.make("omxh264enc", "vidEncode")
vidParse = Gst.ElementFactory.make("h264parse", "vidParse")
mux = Gst.ElementFactory.make("mp4mux", "mux")
#~ sink = Gst.ElementFactory.make("tcpserversink", "sink")
sink = Gst.ElementFactory.make("filesink", "sink")
vidSrc.set_property('device', '/dev/video0')
#~ sink.set_property('host', '0.0.0.0')
#~ sink.set_property('port', 8080)
sink.set_property('location', '/home/alarm/testicle.mp4')
vidRawCaps = Gst.Caps.from_string('video/x-raw,width=320,height=240,framerate=30/1')
parseCaps = Gst.Caps.from_string('video/x-h264,stream-format=avc')
pipe.add(vidSrc, vidConv, vidScale, vidClock, vidEncode, vidParse, mux, sink)
print(vidSrc.link(vidConv))
print(vidConv.link(vidScale))
print(vidScale.link_filtered(vidClock, vidRawCaps))
print(vidClock.link(vidEncode))
print(vidEncode.link(vidParse))
print(vidParse.link_filtered(mux, parseCaps))
print(mux.link(sink))
pipe.set_state(Gst.State.PLAYING)
#~ signal.signal(signal.SIGTERM, exit())
def terminate():
pipe.set_state(Gst.State.NULL)
exit()
@async(daemon=True)
def errorHandler():
while 1:
msg = bus.timed_pop_filtered(1e18, Gst.MessageType.ERROR)
print('howdy')
print(msg.parse_error())
terminate()
@async(daemon=True)
def eosHandler():
while 1:
msg = bus.timed_pop_filtered(1e18, Gst.MessageType.EOS)
print('EOS reached')
terminate()
try:
errorHandler()
eosHandler()
while 1:
time.sleep(3600)
except KeyboardInterrupt:
pass pass
def gstPrintMsg(pName, frmt, *args, sName=None):
if sName:
print('[{}] [{}] '.format(pName, sName) + frmt.format(*args))
else:
print('[{}] '.format(pName) + frmt.format(*args))
def processErrorMessage(pName, sName, msg):
error, debug = msg.parse_error()
if debug:
gstPrintMsg(pName, '{} - Additional debug info: {}', error.message, debug, sName=sName)
else:
gstPrintMsg(pName, error.message, sName=sName)
raise GstException(error)
def linkElements(e1, e2, caps=None):
if caps:
if not e1.link_filtered(e2, caps):
raise GstException('cannot link \"{}\" to \"{}\" with caps {}'.format(e1.get_name(), e2.get_name(), caps.to_string()))
else:
if not e1.link(e2):
raise GstException('cannot link \"{}\" to \"{}\"'.format(e1.get_name(), e2.get_name()))
def linkTee(tee, *args):
i = 0
for e in args:
teePad = tee.get_request_pad('src_{}'.format(i))
ePad = e.get_static_pad('sink')
teePad.link(ePad)
i += 1
# TODO: make it clearer which pipeline everything comes from
def eventLoop(pipeline, block=True, doProgress=False, targetState=Gst.State.PLAYING):
buffering = False
inProgress = False
prerolled = targetState != Gst.State.PAUSED
pName = pipeline.get_name()
bus = pipeline.get_bus()
while 1:
msg = bus.timed_pop(1e18 if block else 0)
if not msg:
return
msgSrc = msg.src
msgSrcName = msgSrc.get_name()
msgType = msg.type
msgTypeName = Gst.MessageType.get_name(msgType)
# messages that involve manipulating the pipeline
if msgType == Gst.MessageType.REQUEST_STATE:
state = msg.parse_request_state()
print('Setting state to {} as requested by {}'.format(
Gst.Element.get_state_name(state),
msgSrcName)
)
pipeline.set_state(state)
elif msgType == Gst.MessageType.CLOCK_LOST:
print('Clock lost. Getting new one.')
pipeline.set_state(Gst.State.PAUSED)
pipeline.set_state(Gst.State.PLAYING)
elif msgType == Gst.MessageType.LATENCY:
gstPrintMsg(pName, 'Redistributing latency', sName=msgSrcName)
pipeline.recalculate_latency()
# messages that do not require pipeline manipulation
elif msgType == Gst.MessageType.BUFFERING:
gstPrintMsg(pName, 'Buffering: {}', msg.parse_buffering(), sName=msgSrcName)
elif msgType == Gst.MessageType.NEW_CLOCK:
clock = msg.parse_new_clock()
clock = clock.get_name() if clock else 'NULL'
gstPrintMsg(pName, 'New clock: {}', clock)
elif msgType == Gst.MessageType.INFO:
error, debug = msg.parse_info()
if debug:
gstPrintMsg(pName, debug, sName=msgSrcName)
elif msgType == Gst.MessageType.WARNING:
error, debug = msg.parse_warning()
if debug:
gstPrintMsg(pName, '{} - Additional debug info: {}', error.message, debug, sName=msgSrcName)
else:
gstPrintMsg(pName, error.message, sName=msgSrcName)
elif msgType == Gst.MessageType.ERROR:
processErrorMessage(pName, msgSrcName, msg)
elif msgType == Gst.MessageType.STATE_CHANGED:
# we only care about pipeline level state changes
if msgSrc == pipeline:
old, new, pending = msg.parse_state_changed()
# we only care if we reach the final target state
if targetState == Gst.State.PAUSED and new == Gst.State.PAUSED:
prerolled = True
if buffering:
gstPrintMsg(pName, 'Prerolled, waiting for buffering to finish')
continue
if inProgress:
gstPrintMsg(pName, 'Prerolled, waiting for progress to finish')
continue
return
elif msgType == Gst.MessageType.PROGRESS:
progressType, code, text = msg.parse_progress()
if (progressType == Gst.ProgressType.START or
progressType == Gst.ProgressType.CONTINUE):
if doProgress:
inProgress = True
block = True
elif (progressType == Gst.ProgressType.COMPLETE or
progressType == Gst.ProgressType.CANCELLED or
progressType == Gst.ProgressType.ERROR):
inProgress = False
gstPrintMsg(pName, 'Progress: ({}) {}', code, text, sName=msgSrcName)
if doProgress and not inProgress and not buffering and prerolled:
return
elif msgType == Gst.MessageType.HAVE_CONTEXT:
context = msg.parse_have_context()
gstPrintMsg(
pName,
'Got context: {}={}',
context.get_context_type(),
context.get_structure().to_string(),
sName = msgSrcName
)
elif msgType == Gst.MessageType.PROPERTY_NOTIFY:
obj, propName, val = msg.parse_property_notify()
valStr = '(no value)'
if val:
if GObject.type_check_value_holds(val, GObject.TYPE_STRING):
valStr = val.dup_string()
elif val.g_type.is_a(Gst.Caps.__gtype__):
valStr = val.get_boxed().to_string()
else:
valStr = Gst.value_serialize(val)
gstPrintMsg(pName, '{}: {} = {}', obj.get_name(), propName, valStr, sName=msgSrcName)
# these are things I might not need...
elif msgType == Gst.MessageType.STREAM_START:
if msgSrc == pipeline:
gstPrintMsg(pName, 'Started stream')
#~ elif msgType == Gst.MessageType.QOS:
#~ frmt, processed, dropped = msg.parse_qos_stats()
#~ jitter, proportion, quality = msg.parse_qos_values()
#~ gstPrintMsg(
#~ pName,
#~ 'QOS stats: jitter={} dropped={}',
#~ jitter,
#~ '-' if frmt == Gst.Format.UNDEFINED else dropped,
#~ sName = msgSrcName
#~ )
elif msgType == Gst.MessageType.ELEMENT:
gstPrintMsg(pName, 'Unknown message ELEMENT', sName=msgSrcName)
elif msgType == Gst.MessageType.UNKNOWN:
gstPrintMsg(pName, 'Unknown message', sName=msgSrcname)
@async(daemon=True)
def startPipeline(pipeline, play=True):
pName = pipeline.get_name()
stateChange = pipeline.set_state(Gst.State.PAUSED)
gstPrintMsg(pName, 'Setting to PAUSED')
if stateChange == Gst.StateChangeReturn.FAILURE:
gstPrintMsg(pName, 'Cannot set to PAUSE')
eventLoop(pipeline, block=False, doProgress=False, targetState=Gst.State.VOID_PENDING)
# we should always end up here because live
elif stateChange == Gst.StateChangeReturn.NO_PREROLL:
gstPrintMsg(pName, 'Live and does not need preroll')
elif stateChange == Gst.StateChangeReturn.ASYNC:
gstPrintMsg(pName, 'Prerolling')
try:
eventLoop(pipeline, block=True, doProgress=True, targetState=Gst.State.PAUSED)
except GstException:
gstPrintMsg(pName, 'Does not want to preroll')
# some cleanup here?
raise
elif stateChange == Gst.StateChangeReturn.SUCCESS:
gstPrintMsg(pName, 'Is prerolled')
# this should always succeed...
try:
eventLoop(pipeline, block=False, doProgress=True, targetState=Gst.State.PLAYING)
except GstException:
gstPrintMsg(pName, 'Does not want to preroll')
# some cleanup here?
raise
# ...and end up here
else:
if play:
gstPrintMsg(pName, 'Setting to PLAYING')
# and since this will ALWAYS be successful (maybe)...
if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
gstPrintMsg(pName, 'Cannot set to PLAYING')
err = pipeline.get_bus().pop_filtered(Gst.MessageType.Error)
processErrorMessage(pName, msgSrcName, err)
# ...we and end up here and loop until Tool releases their next album
try:
eventLoop(pipeline, block=True, doProgress=False, targetState=Gst.State.PLAYING)
except:
# cleanup or recover
raise
def Camera(video=True, audio=True):
pipeline = Gst.Pipeline.new("camera")
if video:
vSource = Gst.ElementFactory.make("v4l2src", "videoSource")
vConvert = Gst.ElementFactory.make("videoconvert", "videoConvert")
vScale = Gst.ElementFactory.make("videoscale", "videoScale")
vClock = Gst.ElementFactory.make("clockoverlay", "videoClock")
vEncode = Gst.ElementFactory.make("omxh264enc", "videoEncoder")
vRTPPay = Gst.ElementFactory.make("rtph264pay", "videoRTPPayload")
vRTPSink = Gst.ElementFactory.make("multiudpsink", "videoRTPSink")
vSource.set_property('device', '/dev/video0')
vRTPPay.set_property('config-interval', 1)
vRTPPay.set_property('pt', 96)
vRTPSink.set_property('clients', '127.0.0.1:9001,127.0.0.1:9002')
vCaps = Gst.Caps.from_string('video/x-raw,width=640,height=480,framerate=30/1')
pipeline.add(vSource, vConvert, vScale, vClock, vEncode, vRTPPay, vRTPSink)
linkElements(vSource, vConvert)
linkElements(vConvert, vScale)
linkElements(vScale, vClock, vCaps)
linkElements(vClock, vEncode)
linkElements(vEncode, vRTPPay)
linkElements(vRTPPay, vRTPSink)
if audio:
aSource = Gst.ElementFactory.make("alsasrc", "audioSource")
aConvert = Gst.ElementFactory.make("audioconvert", "audioConvert")
aScale = Gst.ElementFactory.make("audioresample", "audioResample")
aEncode = Gst.ElementFactory.make("opusenc", "audioEncode")
aRTPPay = Gst.ElementFactory.make("rtpopuspay", "audioRTPPayload")
aRTPSink = Gst.ElementFactory.make("multiudpsink", "audioRTPSink")
aSource.set_property('device', 'hw:1,0')
aRTPSink.set_property('clients', '127.0.0.1:8001,127.0.0.1:8002')
aCaps = Gst.Caps.from_string('audio/x-raw,rate=48000,channels=1')
pipeline.add(aSource, aConvert, aScale, aEncode, aRTPPay, aRTPSink)
linkElements(aSource, aConvert)
linkElements(aConvert, aScale)
linkElements(aScale, aEncode, aCaps)
linkElements(aEncode, aRTPPay)
linkElements(aRTPPay, aRTPSink)
startPipeline(pipeline)
class FileDump:
def __init__(self):
self.pipeline = Gst.Pipeline.new('filedump')
aSource = Gst.ElementFactory.make('udpsrc', 'audioSource')
aParse = Gst.ElementFactory.make('opusparse', 'audioParse')
aQueue = Gst.ElementFactory.make('queue', 'audioQueue')
vSource = Gst.ElementFactory.make('udpsrc', 'videoSource')
vParse = Gst.ElementFactory.make('h264parse', 'videoParse')
vQueue = Gst.ElementFactory.make('queue', 'videoQueue')
mux = Gst.ElementFactory.make('matroskamux', 'mux')
self.sink = Gst.ElementFactory.make('filesink', 'sink')
self.sink.set_property('location', '/dev/null')
aSource.set_property('port', 8000)
vSource.set_property('port', 9000)
self.pipeline.add(aSource, aParse, aQueue, vSource, vParse, vQueue, mux, self.sink)
linkElements(vSource, vParse)
linkElements(vParse, vQueue)
linkElements(vQueue, mux)
linkElements(aSource, aParse)
linkElements(aParse, aQueue)
linkElements(aQueue, mux)
linkElements(mux, self.sink)
startPipeline(self.pipeline, play=False)
def setPath(self, path):
self.sink.set_property('location', path)
def play(self):
self.pipeline.set_state(Gst.State.PLAYING)
def pause(self):
self.pipeline.set_state(Gst.State.PAUSED)
Gst.init(None)
Camera()
while 1:
time.sleep(3600)
# this works for file dump
# gst-launch-1.0 -v udpsrc port=8001 ! application/x-rtp,encoding-name=OPUS,payload=96 !
# rtpjitterbuffer ! rtpopusdepay ! queue ! matroskamux name=mux ! filesink location=testicle.mkv \
# udpsrc port=9001 ! application/x-rtp,encoding-name=H264,payload=96 ! rtpjitterbuffer ! \
# rtph264depay ! h264parse ! queue ! mux.