Skip to content

Commit

Permalink
Streaming stability improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Sightler committed Jul 1, 2024
1 parent 5abf9b3 commit 2f8bb43
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 79 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Auto detect text files and perform LF normalization
* text=auto
*.sh text
29 changes: 22 additions & 7 deletions devices/camera-livestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ import { StreamingSession } from '../lib/streaming/streaming-session.js'
const deviceName = workerData.deviceName
const doorbotId = workerData.doorbotId
let liveStream = false
let streamStopping = false

parentPort.on("message", async(data) => {
const streamData = data.streamData
switch (data.command) {
case 'start':
if (!liveStream) {
if (streamStopping) {
parentPort.postMessage({type: 'log_error', data: "Live stream could not be started because it is in stopping state"})
parentPort.postMessage({type: 'state', data: 'failed'})
} else if (!liveStream) {
startLiveStream(streamData)
} else {
parentPort.postMessage({type: 'log_error', data: "Live stream could not be started because there is already an active stream"})
parentPort.postMessage({type: 'state', data: 'active'})
}
break;
case 'stop':
Expand Down Expand Up @@ -87,11 +94,19 @@ async function startLiveStream(streamData) {
}

async function stopLiveStream() {
liveStream.stop()
await new Promise(res => setTimeout(res, 2000))
if (liveStream) {
parentPort.postMessage({type: 'log_info', data: 'Live stream failed to stop on request, deleting anyway...'})
parentPort.postMessage({type: 'state', data: 'inactive'})
liveStream = false
if (!streamStopping) {
streamStopping = true
let stopTimeout = 10
liveStream.stop()
do {
await new Promise(res => setTimeout(res, 200))
if (liveStream) {
parentPort.postMessage({type: 'log_info', data: 'Live stream failed to stop on request, deleting anyway...'})
parentPort.postMessage({type: 'state', data: 'inactive'})
liveStream = false
}
stopTimeout--
} while (liveStream && stopTimeout)
streamStopping = false
}
}
8 changes: 4 additions & 4 deletions lib/go2rtc.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ export default new class Go2RTC {
config.streams = {}
for (const camera of cameras) {
config.streams[`${camera.deviceId}_live`] =
`exec:${dirname(fileURLToPath(new URL('.', import.meta.url)))}/scripts/start-stream.sh ${camera.deviceId} live ${camera.deviceTopic} {output}#killsignal=15#killtimeout=5`
`exec:${dirname(fileURLToPath(new URL('.', import.meta.url)))}/scripts/start-stream.sh ${camera.deviceId} live ${camera.deviceTopic} {output}#killsignal=15`
config.streams[`${camera.deviceId}_event`] =
`exec:${dirname(fileURLToPath(new URL('.', import.meta.url)))}/scripts/start-stream.sh ${camera.deviceId} event ${camera.deviceTopic} {output}#killsignal=15#killtimeout=5`
`exec:${dirname(fileURLToPath(new URL('.', import.meta.url)))}/scripts/start-stream.sh ${camera.deviceId} event ${camera.deviceTopic} {output}#killsignal=15`
}
try {
await writeFileAtomic(configFile, yaml.dump(config, { lineWidth: -1 }))
Expand Down Expand Up @@ -91,13 +91,13 @@ export default new class Go2RTC {
const stdoutLine = readline.createInterface({ input: this.go2rtcProcess.stdout })
stdoutLine.on('line', (line) => {
// Replace time in go2rtc log messages with tag
debug(line.replace(/^.*\d{2}:\d{2}:\d{2}\.\d{3}([^\s]+) /, chalk.green('[go2rtc] ')))
debug(line.replace(/^.*\d{2}:\d{2}:\d{2}\.\d{3} /, chalk.green('[go2rtc] ')))
})

const stderrLine = readline.createInterface({ input: this.go2rtcProcess.stderr })
stderrLine.on('line', (line) => {
// Replace time in go2rtc log messages with tag
debug(line.replace(/^.*\d{2}:\d{2}:\d{2}\.\d{3}([^\s]+) /, '[go2rtc] '))
debug(line.replace(/^.*\d{2}:\d{2}:\d{2}\.\d{3} /, chalk.green('[go2rtc] ')))
})
}

Expand Down
103 changes: 103 additions & 0 deletions scripts/monitor-stream.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#!/bin/bash
# Activate video stream on Ring cameras via ring-mqtt
# Intended only for use as on-demand script for rtsp-simple-server
# Requires mosquitto MQTT clients package to be installed
# Uses ring-mqtt internal IPC broker for communications with main process
# Provides status updates and termintates stream on script exit

# Required command line arguments
device_id=${1} # Camera device Id
type=${2} # Stream type ("live" or "event")
base_topic=${3} # Command topic for Camera entity
rtsp_pub_url=${4} # URL for publishing RTSP stream
client_id="${device_id}_${type}" # Id used to connect to the MQTT broker, camera Id + event type
activated="false"
reason="none"

[[ ${type} = "live" ]] && base_topic="${base_topic}/stream" || base_topic="${base_topic}/event_stream"
json_attribute_topic="${base_topic}/attributes"
command_topic="${base_topic}/command"
debug_topic="${base_topic}/debug"

# Set some colors for debug output
red='\e[0;31m'
yellow='\e[0;33m'
green='\e[0;32m'
blue='\e[0;34m'
reset='\e[0m'

cleanup() {
local ffpids=$(pgrep -f "ffmpeg.*${rtsp_pub_url}" | grep -v ^$$\$)
[ -n "$ffpids" ] && kill -9 $ffpids
local pids=$(pgrep -f "mosquitto_sub.*${client_id}_sub" | grep -v ^$$\$)
[ -n "$pids" ] && kill $pids
for fd in $(ls /proc/$$/fd); do
eval "exec $fd>&-"
done
exit 0
}

# go2rtc does not pass stdout through from child processes so send debug logs
# via main process using MQTT messages
logger() {
mosquitto_pub -i "${client_id}_pub" -L "mqtt://127.0.0.1:51883/${debug_topic}" -m "${1}"
}

# Trap signals so that the MQTT command to stop the stream can be published on exit
trap cleanup INT TERM

# This loop starts mosquitto_sub with a subscription on the camera stream topic that sends all received
# messages via file descriptor to the read process. On initial startup the script publishes the message
# 'ON-DEMAND' to the stream command topic which lets ring-mqtt know that an RTSP client has requested
# the stream. Stream state is determined via the the detailed stream state messages received via the
# json_attributes_topic:
#
# "inactive" = There is no active video stream and none currently requested
# "activating" = A video stream has been requested and is initializing but has not yet started
# "active" = The stream was requested successfully and an active stream is currently in progress
# "failed" = A live stream was requested but failed to start
mosquitto_sub -q 1 -i "${client_id}_sub" -L "mqtt://127.0.0.1:51883/${json_attribute_topic}" |
while read message; do
# Otherwise it should be a JSON message from the stream state attribute topic so extract the detailed stream state
stream_state=`echo ${message} | jq -r '.status'`
case ${stream_state,,} in
activating)
if [ ${activated} = "false" ]; then
logger "State indicates ${type} stream is activating"
fi
;;
active)
if [ ${activated} = "false" ]; then
logger "State indicates ${type} stream is active"
activated="true"
fi
;;
deactivate)
if [ ${activated} = "true" ]; then
reason='deactivate'
fi
;;
inactive)
if [ ${reason} = "deactivate" ] ; then
logmsg="State indicates ${type} stream is inactive"
else
logmsg=$(echo -en "${yellow}State indicates ${type} stream has gone unexpectedly inactive${reset}")
fi
logger "${logmsg}"
reason='inactive'
cleanup
;;
failed)
logmsg=$(echo -en "${red}ERROR - State indicates ${type} stream failed to activate${reset}")
logger "${logmsg}"
reason='failed'
cleanup
;;
*)
logmsg=$(echo -en "${red}ERROR - Received unknown ${type} stream state on topic ${blue}${json_attribute_topic}${reset}")
logger "${logmsg}"
;;
esac
done

cleanup
119 changes: 51 additions & 68 deletions scripts/start-stream.sh
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
#!/bin/bash
# Activate video stream on Ring cameras via ring-mqtt
# Intended only for use as on-demand script for rtsp-simple-server
# Activate Ring camera video stream via ring-mqtt
#
# This script is intended for use only with ring-mqtt
# and go2rtc.
#
# Requires mosquitto MQTT clients package to be installed
# Uses ring-mqtt internal IPC broker for communications with main process
# Provides status updates and termintates stream on script exit
# Uses ring-mqtt internal IPC broker for communication with
# ring-mqtt process
#
# Spawns stream control in background due to issues with
# process exit hanging go2rtc. Script then just monitors
# for control script to exit or, if script is killed,
# sends commands to control script prior to exiting

# Required command line arguments
device_id=${1} # Camera device Id
type=${2} # Stream type ("live" or "event")
base_topic=${3} # Command topic for Camera entity
rtsp_pub_url=${4} # URL for publishing RTSP stream
client_id="${device_id}_${type}" # Id used to connect to the MQTT broker, camera Id + event type
activated="false"

[[ ${type} = "live" ]] && base_topic="${base_topic}/stream" || base_topic="${base_topic}/event_stream"
# If previous run hasn't exited yet, just perform a short wait and exit with error
if test -f /tmp/ring-mqtt-${device_id}.lock; then
sleep .1
exit 1
else
touch /tmp/ring-mqtt-${device_id}.lock
fi

script_dir=$(dirname "$0")
${script_dir}/monitor-stream.sh ${1} ${2} ${3} ${4} &

# Build the MQTT topics
[[ ${type} = "live" ]] && base_topic="${base_topic}/stream" || base_topic="${base_topic}/event_stream"
json_attribute_topic="${base_topic}/attributes"
command_topic="${base_topic}/command"
debug_topic="${base_topic}/debug"
Expand All @@ -26,76 +44,41 @@ green='\e[0;32m'
blue='\e[0;34m'
reset='\e[0m'

stop() {
# Interrupted by signal so send command to stop stream
# Send message to monitor script that stream was requested to stop so that it doesn't log a warning
mosquitto_pub -i "${client_id}_pub" -L "mqtt://127.0.0.1:51883/${json_attribute_topic}" -m {\"status\":\"deactivate\"}

# Send ring-mqtt the command to stop the stream
mosquitto_pub -i "${client_id}_pub" -L "mqtt://127.0.0.1:51883/${debug_topic}" -m "Deactivating ${type} stream due to signal from RTSP server (no more active clients or publisher ended stream)"
mosquitto_pub -i "${client_id}_pub" -L "mqtt://127.0.0.1:51883/${command_topic}" -m "OFF"

# Send kill signal to monitor script and wait for it to exit
local pids=$(jobs -pr)
[ -n "$pids" ] && kill $pids
wait
cleanup
}

# If control script is still runnning send kill signal and exit
cleanup() {
if [ -z ${reason} ]; then
# If no reason defined, that means we were interrupted by a signal, send the command to stop the live stream
mosquitto_pub -i "${client_id}_pub" -L "mqtt://127.0.0.1:51883/${debug_topic}" -m "Deactivating ${type} stream due to signal from RTSP server (no more active clients or publisher ended stream)"
mosquitto_pub -i "${client_id}_pub" -L "mqtt://127.0.0.1:51883/${command_topic}" -m "OFF"
fi
# Kill the spawed mosquitto_sub process or it will stay listening forever
kill $(pgrep -f "mosquitto_sub.*${client_id}_sub" | grep -v ^$$\$)
rm -f /tmp/ring-mqtt-${device_id}.lock
for fd in $(ls /proc/$$/fd); do
eval "exec $fd>&-"
done
exit 0
}

# go2rtc does not pass stdout through from child processes so send debug logs
# via main process using MQTT messages
# Send debug logs via main process using MQTT messages
logger() {
mosquitto_pub -i "${client_id}_pub" -L "mqtt://127.0.0.1:51883/${debug_topic}" -m "${1}"
}

# Trap signals so that the MQTT command to stop the stream can be published on exit
trap cleanup INT TERM QUIT
trap stop INT TERM

# This loop starts mosquitto_sub with a subscription on the camera stream topic that sends all received
# messages via file descriptor to the read process. On initial startup the script publishes the message
# 'ON-DEMAND' to the stream command topic which lets ring-mqtt know that an RTSP client has requested
# the stream. Stream state is determined via the the detailed stream state messages received via the
# json_attributes_topic:
#
# "inactive" = There is no active video stream and none currently requested
# "activating" = A video stream has been requested and is initializing but has not yet started
# "active" = The stream was requested successfully and an active stream is currently in progress
# "failed" = A live stream was requested but failed to start
while read -u 10 message
do
# If start message received, publish the command to start stream
if [ ${message} = "START" ]; then
logger "Sending command to activate ${type} stream ON-DEMAND"
mosquitto_pub -i "${client_id}_pub" -L "mqtt://127.0.0.1:51883/${command_topic}" -m "ON-DEMAND ${rtsp_pub_url}"
else
# Otherwise it should be a JSON message from the stream state attribute topic so extract the detailed stream state
stream_state=`echo ${message} | jq -r '.status'`
case ${stream_state,,} in
activating)
if [ ${activated} = "false" ]; then
logger "State indicates ${type} stream is activating"
fi
;;
active)
if [ ${activated} = "false" ]; then
logger "State indicates ${type} stream is active"
activated="true"
fi
;;
inactive)
logmsg=$(echo -en "${yellow}State indicates ${type} stream has gone inactive${reset}")
logger "${logmsg}"
reason='inactive'
cleanup
;;
failed)
logmsg=$(echo -en "${red}ERROR - State indicates ${type} stream failed to activate${reset}")
logger "${logmsg}"
reason='failed'
cleanup
;;
*)
logmsg=$(echo -en "${red}ERROR - Received unknown ${type} stream state on topic ${blue}${json_attribute_topic}${reset}")
logger "${logmsg}"
;;
esac
fi
done 10< <(mosquitto_sub -q 1 -i "${client_id}_sub" -L "mqtt://127.0.0.1:51883/${json_attribute_topic}" & echo "START")
logger "Sending command to activate ${type} stream ON-DEMAND"
mosquitto_pub -i "${client_id}_pub" -L "mqtt://127.0.0.1:51883/${command_topic}" -m "ON-DEMAND ${rtsp_pub_url}"

cleanup
exit 0
wait
cleanup

0 comments on commit 2f8bb43

Please sign in to comment.