371 lines
17 KiB
Ruby
371 lines
17 KiB
Ruby
module JamRuby
|
|
class IcecastMount < ActiveRecord::Base
|
|
|
|
@@log = Logging.logger[IcecastMount]
|
|
|
|
attr_accessible :authentication_id, :name, :source_username, :source_pass, :max_listeners, :max_listener_duration,
|
|
:dump_file, :intro, :fallback_mount, :fallback_override, :fallback_when_full, :charset, :is_public,
|
|
:stream_name, :stream_description, :stream_url, :genre, :bitrate, :mime_type, :subtype, :burst_size,
|
|
:mp3_metadata_interval, :hidden, :on_connect, :on_disconnect,
|
|
:music_session_id, :icecast_server_id, :icecast_mount_template_id, :listeners, :sourced,
|
|
:sourced_needs_changing_at, as: :admin
|
|
|
|
attr_accessor :no_config_changed
|
|
|
|
belongs_to :authentication, class_name: "JamRuby::IcecastUserAuthentication", inverse_of: :mount, :foreign_key => 'authentication_id'
|
|
belongs_to :music_session, class_name: "JamRuby::ActiveMusicSession", inverse_of: :mount, foreign_key: 'music_session_id'
|
|
|
|
belongs_to :server, class_name: "JamRuby::IcecastServer", inverse_of: :mounts, foreign_key: 'icecast_server_id'
|
|
belongs_to :mount_template, class_name: "JamRuby::IcecastMountTemplate", inverse_of: :mounts, foreign_key: 'icecast_mount_template_id'
|
|
|
|
has_many :source_changes, class_name: "JamRuby::IcecastSourceChange", inverse_of: :mount, foreign_key: 'icecast_mount_id', order: 'created_at DESC'
|
|
|
|
validates :name, presence: true, uniqueness: true
|
|
validates :source_username, length: {minimum: 5}, if: lambda {|m| m.source_username.present?}
|
|
validates :source_pass, length: {minimum: 5}, if: lambda {|m| m.source_pass.present?}
|
|
validates :max_listeners, length: {in: 1..15000}, if: lambda {|m| m.max_listeners.present?}
|
|
validates :max_listener_duration, length: {in: 1..3600 * 48}, if: lambda {|m| m.max_listener_duration.present?}
|
|
validates :fallback_override, :inclusion => {:in => [0, 1]} , if: lambda {|m| m.fallback_mount.present?}
|
|
validates :fallback_when_full, :inclusion => {:in => [0, 1]} , if: lambda {|m| m.fallback_mount.present?}
|
|
validates :is_public, presence: true, :inclusion => {:in => [-1, 0, 1]}
|
|
validates :bitrate, numericality: {only_integer: true}, if: lambda {|m| m.bitrate.present?}
|
|
validates :burst_size, numericality: {only_integer: true}, if: lambda {|m| m.burst_size.present?}
|
|
validates :mp3_metadata_interval, numericality: {only_integer: true}, if: lambda {|m| m.mp3_metadata_interval.present?}
|
|
validates :hidden, :inclusion => {:in => [0, 1]}
|
|
validates :server, presence: true
|
|
validate :name_has_correct_format
|
|
|
|
before_save :sanitize_active_admin
|
|
after_save :after_save
|
|
#after_save :poke_config
|
|
before_destroy :poke_config
|
|
|
|
def name_has_correct_format
|
|
errors.add(:name, "must start with /") unless name && name.start_with?('/')
|
|
end
|
|
|
|
def poke_config
|
|
server.update_attribute(:config_changed, 1) if server
|
|
end
|
|
|
|
def after_save
|
|
server.update_attribute(:config_changed, 1) unless no_config_changed
|
|
|
|
if !sourced_was && sourced
|
|
|
|
# went from NOT SOURCED to SOURCED
|
|
notify_source_up
|
|
|
|
elsif sourced_was && !sourced
|
|
|
|
# went from SOURCED to NOT SOURCED
|
|
notify_source_down
|
|
|
|
end
|
|
|
|
if source_direction_was != source_direction
|
|
# temporarily removed; it seems better to leave all the data in for now. It should never be that much
|
|
# if the requested source direction has changed, then delete diagnostic info
|
|
#IcecastSourceChange.delete_all(["icecast_mount_id = ?", self.id]) if source_direction
|
|
|
|
# and tell anyone listening that the direction has changed
|
|
# SubscriptionMessage.mount_source_direction(self)
|
|
end
|
|
|
|
# Note:
|
|
# Notification.send_source_down_requested does not occur here.
|
|
# we set up a cron that checks for streams that have not been successfully source up/down (after timeout ) in IcecastSourceCheck
|
|
end
|
|
|
|
def sanitize_active_admin
|
|
self.authentication_id = nil if self.authentication_id == ''
|
|
self.music_session_id = nil if self.music_session_id == ''
|
|
self.icecast_server_id = nil if self.icecast_server_id == ''
|
|
end
|
|
|
|
# creates a templated
|
|
def self.build_session_mount(music_session, active_music_session, icecast_server)
|
|
|
|
# only public sessions get mounts currently
|
|
return nil unless music_session.fan_access
|
|
|
|
mount = nil
|
|
if icecast_server && icecast_server.mount_template_id
|
|
# we have a server with an associated mount_template; we can create a mount automatically
|
|
mount = icecast_server.mount_template.build_session_mount(music_session, active_music_session)
|
|
mount.server = icecast_server
|
|
end
|
|
mount
|
|
end
|
|
|
|
def fail_state(reason, detail = nil)
|
|
{success:false, reason: reason, detail: detail}
|
|
end
|
|
|
|
def success_state(reason, detail = nil)
|
|
{success:true, reason: reason, detail: detail}
|
|
end
|
|
|
|
# success messages:
|
|
# * source_up - we are broadcasting - detail is the source user or empty. if empty you should refresh state in a few seconds
|
|
# * source_down - we are broadcasting - detail is the last source user, or empty. if empty you should refresh state in a few seconds
|
|
# * transition_up/down - a source change occurred recently, and we don't yet have any info from the client. you should refresh state in a few seconds
|
|
#
|
|
# failure messages:
|
|
# * multiple_clients
|
|
# * unknown - represents a code error
|
|
# * transition_timeout_up - a source change for up has occurred, but no clients have reported any info
|
|
# * transition_timeout_down - a source change for up has occurred, but no clients have reported any info
|
|
# * source_wrong_up - the source should be up, but it could not succeed according to the most recent effort.
|
|
# * source_wrong_down - the source should be down, but it could not succeed according to the most recent effort.
|
|
#
|
|
# for both source_wrong_up and source_wrong_down, valid detail values:
|
|
# * 'no_client' if no client has said anything to a request after some amount of time, or otherwise it's client defined
|
|
# client-defined reason values
|
|
# * 'initialize_singleton' - code error in the client
|
|
# * 'initialize_thread' - code error in the client
|
|
# * 'initialize_ogg' - could not initialize ogg encoder... likely code error
|
|
# * 'initialize_mp3' - could not initialize mp3 encoder... likely code error
|
|
# * 'initialize_socket' - could not initialize socket... likely code error
|
|
# * 'icecast_response' - icecast was not accessible or returned an error
|
|
|
|
# * TODO client defined
|
|
def state
|
|
begin
|
|
result = fail_state('unknown')
|
|
|
|
if sourced == should_source?
|
|
|
|
first = source_changes.first
|
|
|
|
# don't check source_changes if actual source state mirrors desired source state... just say we are good, and pass down relevant sourcing user ID if present
|
|
result = success_state('source_' + (sourced ? 'up' : 'down'), first.nil? ? nil : first.user_id)
|
|
|
|
elsif source_changes.count > 0
|
|
# if the desired source direction is up, but we haven't sourced yet... let's try and find out why
|
|
|
|
# let's first see if we have N clients contributing, which is a code error condition
|
|
clients = Hash[source_changes.map { |source_change| [source_change.client_id, source_change] }]
|
|
|
|
#if clients.length > 1
|
|
# this means more than one client has contributed... this is an code error condition
|
|
# result = fail_state('multiple_clients', clients)
|
|
#else
|
|
first = source_changes.first
|
|
|
|
if first.source_direction == should_source?
|
|
if first.success?
|
|
# the last message from any client indicated we had the right source
|
|
# if less than a second has passed, this is not strange. But more than that, it's strange
|
|
if sourced_needs_changing_at.nil? || (Time.now - first.created_at < APP_CONFIG.source_changes_missing_secs)
|
|
result = success_state('transition_' + (source_direction ? 'up' : 'down'))
|
|
else
|
|
result = fail_state('transition_timeout_' + (source_direction ? 'up' : 'down'))
|
|
end
|
|
else
|
|
# so the last state indicated by the client agrees that our source info is wrong; we can use it's data to augment the frontend
|
|
result = fail_state('source_wrong_' + (source_direction ? 'up' : 'down'), first.reason)
|
|
end
|
|
else
|
|
# if the last message from the client is for the wrong source direction (meaning no attempt to change source state by client to the correct state)
|
|
# then report this info
|
|
|
|
if sourced_needs_changing_at.nil? || (Time.now - sourced_needs_changing_at < APP_CONFIG.source_changes_missing_secs)
|
|
result = fail_state('transition_' + (source_direction ? 'up' : 'down'))
|
|
else
|
|
result = fail_state('source_wrong_' + (source_direction ? 'up' : 'down'), 'no_client') # no client implies no client has attempted to source
|
|
end
|
|
end
|
|
#end
|
|
else
|
|
# we have the wrong source direction, but no source change info.
|
|
|
|
# if less than a second has passed, this is not strange. But more than that, it's strange
|
|
if sourced_needs_changing_at.nil?
|
|
#result = fail_state('db_error', 'sourced_needs_changing_at is nil')
|
|
result = success_state('transition_' + (source_direction ? 'up' : 'down'))
|
|
elsif Time.now - sourced_needs_changing_at < APP_CONFIG.source_changes_missing_secs
|
|
result = success_state('transition_' + (source_direction ? 'up' : 'down'), 'initial')
|
|
else
|
|
result = fail_state('transition_timeout_' + (source_direction ? 'up' : 'down'), 'initial')
|
|
end
|
|
end
|
|
|
|
rescue Exception => e
|
|
@@log.error("exception in IcecastMount.state #{e}")
|
|
end
|
|
result
|
|
end
|
|
|
|
def source_up
|
|
with_lock do
|
|
self.sourced = true
|
|
self.sourced_needs_changing_at = Time.now
|
|
self.no_config_changed = true
|
|
save(validate: false)
|
|
end
|
|
end
|
|
|
|
def source_down
|
|
with_lock do
|
|
self.sourced = false
|
|
self.sourced_needs_changing_at = Time.now
|
|
self.no_config_changed = true
|
|
save(validate: false)
|
|
end
|
|
end
|
|
|
|
def should_source?
|
|
self.listeners > 0
|
|
end
|
|
|
|
def listener_add
|
|
with_lock do
|
|
if listeners == 0 && !sourced && (self.sourced_needs_changing_at.nil? || Time.now - self.sourced_needs_changing_at > APP_CONFIG.source_changes_missing_secs)
|
|
# enough time has elapsed since the last time the source direction changed to instaneously request a source up
|
|
|
|
# listener count went above 0 and there is no source. ask the musician clients to source
|
|
notify_source_up_requested
|
|
end
|
|
|
|
# this is completely unsafe without that 'with_lock' statement above
|
|
self.listeners = self.listeners + 1
|
|
|
|
self.no_config_changed = true
|
|
save(validate: false)
|
|
end
|
|
end
|
|
|
|
def listener_remove
|
|
if listeners == 0
|
|
@@log.warn("listeners is at 0, but we are being asked to remove a listener. maybe we missed a listener_add request earlier")
|
|
return
|
|
end
|
|
|
|
with_lock do
|
|
self.sourced_needs_changing_at = Time.now if listeners == 1
|
|
|
|
# this is completely unsafe without that 'with_lock' statement above
|
|
self.listeners = self.listeners - 1
|
|
|
|
self.no_config_changed = true
|
|
save(validations: false)
|
|
end
|
|
end
|
|
|
|
|
|
def notify_source_up_requested
|
|
if music_session_id
|
|
self.sourced_needs_changing_at = Time.now
|
|
self.source_direction = true
|
|
self.no_config_changed = true
|
|
save!
|
|
|
|
source_change = IcecastSourceChange.new
|
|
source_change.source_direction = true
|
|
source_change.success = true
|
|
source_change.mount = self
|
|
source_change.change_type = IcecastSourceChange::CHANGE_TYPE_MOUNT_UP_REQUEST
|
|
source_change.save!
|
|
|
|
Notification.send_source_up_requested(music_session,
|
|
server.hostname,
|
|
server.pick_listen_socket(:port),
|
|
name,
|
|
resolve_string(:source_username),
|
|
resolve_string(:source_pass),
|
|
resolve_int(:bitrate))
|
|
SubscriptionMessage.mount_source_up_requested(self)
|
|
end
|
|
end
|
|
|
|
def notify_source_down_requested
|
|
if music_session_id
|
|
self.sourced_needs_changing_at = Time.now
|
|
self.source_direction = false
|
|
self.no_config_changed = true
|
|
save!
|
|
|
|
source_change = IcecastSourceChange.new
|
|
source_change.source_direction = false
|
|
source_change.success = true
|
|
source_change.mount = self
|
|
source_change.change_type = IcecastSourceChange::CHANGE_TYPE_MOUNT_DOWN_REQUEST
|
|
source_change.save!
|
|
|
|
Notification.send_source_down_requested(music_session, name)
|
|
|
|
SubscriptionMessage.mount_source_down_requested(self)
|
|
end
|
|
end
|
|
|
|
def notify_source_up
|
|
if music_session_id
|
|
Notification.send_source_up(music_session)
|
|
end
|
|
end
|
|
|
|
def notify_source_down
|
|
if music_session_id
|
|
Notification.send_source_down(music_session)
|
|
end
|
|
end
|
|
|
|
# Check if the icecast_mount specifies the value; if not, use the mount_template's value take effect
|
|
def dumpXml(builder)
|
|
builder.tag! 'mount' do |mount|
|
|
mount.tag! 'mount-name', name
|
|
mount.tag! 'username', resolve_string(:source_username) if string_present?(:source_username)
|
|
mount.tag! 'password', resolve_string(:source_pass) if string_present?(:source_pass)
|
|
mount.tag! 'max-listeners', resolve_int(:max_listeners) if int_present?(:max_listeners)
|
|
mount.tag! 'max-listener-duration', resolve_string(:max_listener_duration) if int_present?(:max_listener_duration)
|
|
mount.tag! 'dump-file', resolve_string(:dump_file) if string_present?(:dump_file)
|
|
mount.tag! 'intro', resolve_string(:intro) if string_present?(:intro)
|
|
mount.tag! 'fallback-mount', resolve_string(:fallback_mount) if string_present?(:fallback_mount)
|
|
mount.tag! 'fallback-override', resolve_int(:fallback_override) if int_present?(:fallback_override)
|
|
mount.tag! 'fallback-when-full', resolve_int(:fallback_when_full) if int_present?(:fallback_when_full)
|
|
mount.tag! 'charset', resolve_string(:charset) if string_present?(:charset)
|
|
mount.tag! 'public', resolve_int(:is_public) if int_present?(:is_public)
|
|
mount.tag! 'stream-name', resolve_string(:stream_name) if string_present?(:stream_name)
|
|
mount.tag! 'stream-description', resolve_string(:stream_description) if string_present?(:stream_description)
|
|
mount.tag! 'stream-url', resolve_string(:stream_url) if string_present?(:stream_url)
|
|
mount.tag! 'genre', resolve_string(:genre) if string_present?(:genre)
|
|
mount.tag! 'bitrate', resolve_int(:bitrate) if int_present?(:bitrate)
|
|
mount.tag! 'type', resolve_string(:mime_type) if string_present?(:mime_type)
|
|
mount.tag! 'subtype', resolve_string(:subtype) if string_present?(:subtype)
|
|
mount.tag! 'burst-size', resolve_int(:burst_size) if int_present?(:burst_size)
|
|
mount.tag! 'mp3-metadata-interval', resolve_int(:mp3_metadata_interval) if int_present?(:mp3_metadata_interval)
|
|
mount.tag! 'hidden', resolve_int(:hidden) if int_present?(:hidden)
|
|
mount.tag! 'on-connect', resolve_string(:on_connect) if string_present?(:on_connect)
|
|
mount.tag! 'on-disconnect', resolve_string(:on_disconnect) if string_present?(:on_disconnect)
|
|
|
|
authentication.dumpXml(builder) if authentication
|
|
end
|
|
end
|
|
|
|
|
|
def url
|
|
raise "Unassociated server to mount" if self.server.nil?
|
|
|
|
"http://#{server.hostname}:#{server.pick_listen_socket(:port)}#{self.name}"
|
|
end
|
|
|
|
|
|
def resolve_string(field)
|
|
self[field].present? ? self[field] : mount_template && mount_template[field]
|
|
end
|
|
|
|
def string_present?(field)
|
|
val = resolve_string(field)
|
|
val ? val.present? : false
|
|
end
|
|
|
|
def resolve_int(field)
|
|
!self[field].nil? ? self[field]: mount_template && mount_template[field]
|
|
end
|
|
|
|
def int_present?(field)
|
|
resolve_int(field)
|
|
end
|
|
end
|
|
end |