Skip to content

Commit

Permalink
New Alpha streamer added. Requires much testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
Wessie committed Oct 23, 2012
1 parent 23e1601 commit ca03ebd
Show file tree
Hide file tree
Showing 4 changed files with 361 additions and 0 deletions.
83 changes: 83 additions & 0 deletions audio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import encoder
import files
import icecast
import logging


logger = logging.getLogger('audio')


class Manager(object):
def __init__(self, icecast_config={}, next_file=lambda self: None):
super(Manager, self).__init__()

self.next_file = next_file

self.source = UnendingSource(self.give_source)

self.encoder = encoder.Encoder(self.source)
self.encoder.start()

self.icecast = icecast.Icecast(self.encoder, icecast_config)
self.icecast.connect()

def give_source(self):
try:
return files.AudioFile(self.next_file())
except (files.AudioError) as err:
logger.exception("Unsupported file.")
return self.give_source()

class UnendingSource(object):
def __init__(self, source_function):
super(UnendingSource, self).__init__()
self.source_function = source_function
self.source = source_function()

self.eof = False

def read(self, size=4096, timeout=10.0):
if self.eof:
return b''
try:
data = self.source.read(size, timeout)
except (ValueError) as err:
if err.message == 'MD5 mismatch at end of stream':
pass
if data == b'':
self.source = self.source_function()
if self.source == None:
self.eof = True
return b''
return data

def close(self):
self.eof = True

def __getattr__(self, key):
return getattr(self.source, key)

import os
def test_dir(directory='/media/F/Music'):
files = set()
for base, dir, filenames in os.walk(directory):
for name in filenames:
files.add(os.path.join(base, name))

def pop_file():
filename = files.pop()
if (filename.endswith('.flac') or
filename.endswith('.mp3') or
filename.endswith('.ogg')):
return filename
else:
return pop_file()
return pop_file

def test_config(password=None):
return {'host': 'stream.r-a-d.io',
'port': 1130,
'password': password,
'format': 1,
'protocol': 0,
'mount': 'test.mp3'}
77 changes: 77 additions & 0 deletions audio/encoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import subprocess
import threading
import decimal
import time
import select


LAME_BIN = 'lame'


class EncodingError(Exception):
pass


class Encoder(object):
def __init__(self, source):
super(Encoder, self).__init__()
self.source = source
self.compression = ['--cbr', '-b', '192', '--resample', '44.1']
self.mode = 'j'

self.out_file = '-'

self.running = threading.Event()

def run(self):
while not self.running.is_set():
data = self.source.read()
if data == b'':
# EOF we just sleep and wait for a new source
time.sleep(0.3)
self.write(data)

def start(self):
arguments = [LAME_BIN, '--quiet',
'--flush',
'-r',
'-s', str(decimal.Decimal(self.source.sample_rate) / 1000),
'--bitwidth', str(self.source.bits_per_sample),
'--signed', '--little-endian',
'-m', self.mode] + self.compression + ['-', self.out_file]

self.process = subprocess.Popen(args=arguments,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)

self.feed_thread = threading.Thread(target=self.run,
name='Encoder Feeder')
self.feed_thread.daemon = True
self.feed_thread.start()

def switch_source(self, new_source):
self.source = new_source

def write(self, data):
try:
self.process.stdin.write(data)
except (IOError, ValueError) as err:
self.process.stdin.close()
self.process.stdout.close()
self.process.wait()
raise EncodingError(str(err))
except (Exception) as err:
self.process.stdin.close()
self.process.stdout.close()
self.process.wait()
raise err

def read(self, size=4096, timeout=10.0):
reader, writer, error = select.select([self.process.stdout],
[], [], timeout)
if not reader:
return b''
return reader[0].read(size)

def close(self):
self.running.set()
50 changes: 50 additions & 0 deletions audio/files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import audiotools

class AudioError(Exception):
pass


class AudioFile(object):
def __init__(self, filename):
super(AudioFile, self).__init__()
self._reader = self._open_file(filename)

def read(self, size=4096, timeout=0.0):
return self._reader.read(size).to_bytes(False, True)

def __getattr__(self, key):
try:
return getattr(self._reader, key)
except (AttributeError):
return getattr(self.file, key)

def progress(self, current, total):
"""Dummy progress function"""
pass

def _open_file(self, filename):
"""Open a file for reading and wrap it in several helpers."""
try:
reader = audiotools.open(filename)
except (audiotools.UnsupportedFile) as err:
raise AudioError("Unsupported file")

self.file = reader
total_frames = reader.total_frames()

# Wrap in a PCMReader because we want PCM
reader = reader.to_pcm()


# Wrap in a converter
reader = audiotools.PCMConverter(reader, sample_rate=44100,
channels=2,
channel_mask=audiotools.ChannelMask(0x1 | 0x2),
bits_per_sample=24)

# And for file progress!
reader = audiotools.PCMReaderProgress(reader, total_frames,
self.progress)

return reader

151 changes: 151 additions & 0 deletions audio/icecast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import threading
import time
import pylibshout
import logging


logger = logging.getLogger('audio.icecast')

class Icecast(object):
connecting_timeout = 5.0
def __init__(self, source, config):
super(Icecast, self).__init__()
self.config = (config if isinstance(config, IcecastConfig)
else IcecastConfig(config))
self.source = source

self._shout = self.setup_libshout()
self.start()

def connect(self):
"""Connect the libshout object to the configured server."""
try:
self._shout.open()
except (pylibshout.ShoutException) as err:
logger.exception("Failed to connect to Icecast server.")
raise IcecastError("Failed to connect to icecast server.")

def connected(self):
"""Returns True if the libshout object is currently connected to
an icecast server."""
try:
return True if self._shout.connected() == -7 else False
except AttributeError:
return False

def read(self, size, timeout=None):
raise NotImplementedError("Icecast does not support reading.")

def nonblocking(self, state):
pass

def close(self):
"""Closes the libshout object and tries to join the thread if we are
not calling this from our own thread."""
self._should_run.set()
try:
self._shout.close()
except (pylibshout.ShoutException) as err:
if err[0] == pylibshout.SHOUTERR_UNCONNECTED:
pass
else:
logger.exception("Exception in pylibshout close call.")
raise IcecastError("Exception in pylibshout close.")
try:
self._thread.join(5.0)
except (RuntimeError) as err:
pass

def run(self):
while not self._should_run.is_set():
while self.connected():
buff = self.source.read(4096)
if buff == b'':
# EOF
self.close()
logger.exception("Source EOF, closing ourself.")
break
try:
self._shout.send(buff)
self._shout.sync()
except (pylibshout.ShoutException) as err:
logger.exception("Failed sending stream data.")
self.reboot_libshout()

if not self._should_run.is_set():
time.sleep(self.connecting_timeout)

def start(self):
"""Starts the thread that reads from source and feeds it to icecast."""
self._should_run = threading.Event()

self._thread = threading.Thread(target=self.run)
self._thread.name = "Icecast"
self._thread.daemon = True
self._thread.start()

def switch_source(self, new_source):
"""Tries to change the source without disconnect from icecast."""
self._should_run.set() # Gracefully try to get rid of the thread
try:
self._thread.join(5.0)
except RuntimeError as err:
logger.exception("Got called from my own thread.")
self.source = new_source # Swap out our source
self.start() # Start a new thread (so roundabout)

def set_metadata(self, metadata):
metadata = (metadata.encode('utf-8', 'replace') if
isinstance(meta, unicode) else metadata)
try:
self._shout.metadata = {'song': metadata} # Stupid library
except (pylibshout.ShoutException) as err:
logger.exception("Failed sending metadata. No action taken.")

def setup_libshout(self):
"""Internal method
Creates a libshout object and puts the configuration to use.
"""
shout = pylibshout.Shout()
self.config.setup(shout)
return shout

def reboot_libshout(self):
"""Internal method
Tries to recreate the libshout object.
"""
try:
self._shout = self.setup_libshout()
except (IcecastError) as err:
logger.exception("Configuration failed.")
self.close()
try:
self.connect()
except (IcecastError) as err:
logger.exception("Connection failure.")
self.close()

class IcecastConfig(dict):
"""Simple dict subclass that knows how to apply the keys to a
libshout object.
"""
def __init__(self, attributes=None):
super(IcecastConfig, self).__init__(attributes or {})

def setup(self, shout):
"""Setup 'shout' configuration by setting attributes on the object.
'shout' is a pylibshout.Shout object.
"""
for key, value in self.iteritems():
try:
setattr(shout, key, value)
except pylibshout.ShoutException as err:
raise IcecastError(("Incorrect configuration option '{:s}' or "
" value '{:s}' used.").format(key, value))


class IcecastError(Exception):
pass

0 comments on commit ca03ebd

Please sign in to comment.