jam-cloud/ruby/lib/jam_ruby/resque/audiomixer.rb

304 lines
9.0 KiB
Ruby

require 'json'
require 'resque'
require 'resque-retry'
require 'net/http'
require 'digest/md5'
module JamRuby
# executes a mix of tracks, creating a final output mix
class AudioMixer
@queue = :audiomixer
@@log = Logging.logger[AudioMixer]
attr_accessor :mix_id, :manifest, :manifest_file, :output_filename, :error_out_filename, :postback_output_url,
:error_reason, :error_detail
def self.perform(mix_id, postback_output_url)
audiomixer = AudioMixer.new()
audiomixer.postback_output_url = postback_output_url
audiomixer.mix_id = mix_id
audiomixer.run
end
def initialize
#@s3_manager = S3Manager.new(APP_CONFIG.aws_bucket, APP_CONFIG.aws_access_key_id, APP_CONFIG.aws_secret_access_key)
end
def validate
raise "no manifest specified" unless @manifest
raise "no files specified" if !@manifest[:files] || @manifest[:files].length == 0
@manifest[:files].each do |file|
codec = file[:codec]
raise "no codec specified" unless codec
offset = file[:offset]
raise "no offset specified" unless offset
filename = file[:filename]
raise "no filename specified" unless filename
end
raise "no output specified" unless @manifest[:output]
raise "no output codec specified" unless @manifest[:output][:codec]
raise "no timeline specified" unless @manifest[:timeline]
raise "no recording_id specified" unless @manifest[:recording_id]
raise "no mix_id specified" unless @manifest[:mix_id]
end
def fetch_audio_files
@manifest[:files].each do |file|
filename = file[:filename]
if filename.start_with? "http"
# fetch it from wherever, put it somewhere on disk, and replace filename in the file parameter with the local disk one
download_filename = Dir::Tmpname.make_tmpname(["#{Dir.tmpdir}/audiomixer-file", '.ogg'], nil)
uri = URI(filename)
open download_filename, 'wb' do |io|
begin
Net::HTTP.start(uri.host, uri.port) do |http|
request = Net::HTTP::Get.new uri
http.request request do |response|
response_code = response.code.to_i
unless response_code >= 200 && response_code <= 299
raise "bad status code: #{response_code}. body: #{response.body}"
end
response.read_body do |chunk|
io.write chunk
end
end
end
rescue Exception => e
@error_reason = "unable to download"
@error_detail = "url #{filename}, error=#{e}"
raise e
end
end
@@log.debug("downloaded #{download_filename}")
filename = download_filename
file[:filename] = download_filename
end
raise "no file located at: #{filename}" unless File.exist? filename
end
end
def prepare
# make sure there is a place to write the .ogg mix
prepare_output
# make sure there is a place to write the error_out json file (if audiomixer fails this is needed)
prepare_error_out
prepare_manifest
end
# write the manifest object to file, to pass into audiomixer
def prepare_manifest
@manifest_file = Dir::Tmpname.make_tmpname( ["#{Dir.tmpdir}/audiomixer-manifest-#{@manifest[:recording_id]}", '.json'], nil)
File.open(@manifest_file,"w") do |f|
f.write(@manifest.to_json)
end
@@log.debug("manifest: #{@manifest}")
end
# make a suitable location to store the output mix, and pass the chosen filepath into the manifest
def prepare_output
@output_filename = Dir::Tmpname.make_tmpname( ["#{Dir.tmpdir}/audiomixer-output-#{@manifest[:recording_id]}", '.ogg'], nil)
# update manifest so that audiomixer writes here
@manifest[:output][:filename] = @output_filename
@@log.debug("output ogg file: #{@output_filename}")
end
# make a suitable location to store an output error file, which will be populated on failure to help diagnose problems.
def prepare_error_out
@error_out_filename = Dir::Tmpname.make_tmpname( ["#{Dir.tmpdir}/audiomixer-error-out-#{@manifest[:recording_id]}", '.ogg'], nil)
# update manifest so that audiomixer writes here
@manifest[:error_out] = @error_out_filename
@@log.debug("error_out: #{@error_out_filename}")
end
# read in and parse the error file that audiomixer pops out
def parse_error_out
error_out_data = File.read(@error_out_filename)
begin
@error_out = JSON.parse(error_out_data)
rescue
@error_reason = "unable-parse-error-out"
@@log.error("unable to parse error_out_data: #{error_out_data} from error_out: #{@error_out_filename}")
end
@error_reason = @error_out[:reason]
@error_reason = "unspecified-reason" unless @error_reason
@error_detail = @error_detail[:detail]
end
def postback
raise "no output file after mix" unless File.exist? @output_filename
@@log.debug("posting mix to #{@postback_output_url}")
uri = URI.parse(@postback_output_url)
http = Net::HTTP.new(uri.host, uri.port)
request = Net::HTTP::Put.new(uri.request_uri)
response = nil
File.open(@output_filename,"r") do |f|
request.body_stream=f
request["Content-Type"] = "audio/ogg"
request.add_field('Content-Length', File.size(@output_filename))
response = http.request(request)
end
response_code = response.code.to_i
unless response_code >= 200 && response_code <= 299
@error_reason = "postback-mix-to-s3"
raise "unable to put to url: #{@postback_output_url}, status: #{response.code}, body: #{response.body}"
end
end
def post_success(mix)
raise "no output file after mix" unless File.exist? @output_filename
length = File.size(@output_filename)
md5 = Digest::MD5.new
File.open(@output_filename, 'rb').each {|line| md5.update(line)}
mix.finish(length, md5.to_s)
end
def post_error(mix, e)
begin
# if error_reason is null, assume this is an unhandled error
unless @error_reason
@error_reason = "unhandled-job-exception"
@error_detail = e.to_s
end
mix.errored(error_reason, error_detail)
rescue
@@log.error "unable to post back to the database the error"
end
end
def run
@@log.info("audiomixer job starting. mix_id #{mix_id}")
mix = Mix.find(mix_id)
begin
# bailout check
if mix.completed
@@log.debug("mix is already completed. bailing")
return
end
@manifest = symbolize_keys(mix.manifest)
@manifest[:mix_id] = mix_id # slip in the mix_id so that the job can add it to the ogg comments
# sanity check the manifest
validate
# if http files are specified, bring them local
fetch_audio_files
# write the manifest to file, so that it can be passed to audiomixer as an filepath argument
prepare
execute(@manifest_file)
if $? == 0
postback
post_success(mix)
@@log.info("audiomixer job successful. mix_id #{mix_id}")
else
parse_error_out
error_msg = "audiomixer job failed status=#{$?} error_reason=#{error_reason} error_detail=#{error_detail}"
@@log.info(error_msg)
raise error_msg
end
rescue Exception => e
post_error(mix, e)
raise
end
end
def manifest=(value)
@manifest = symbolize_keys(value)
end
private
def execute(manifest_file)
unless File.exist? APP_CONFIG.audiomixer_path
@@log.error("unable to find audiomixer")
error_msg = "audiomixer job failed status=#{$?} error_reason=#{error_reason} error_detail=#{error_detail}"
@@log.info(error_msg)
@error_reason = "unable-find-appmixer"
@error_detail = APP_CONFIG.audiomixer_path
raise error_msg
end
audiomixer_cmd = "#{APP_CONFIG.audiomixer_path} #{manifest_file}"
@@log.debug("executing #{audiomixer_cmd}")
system(audiomixer_cmd)
end
def symbolize_keys(obj)
case obj
when Array
obj.inject([]){|res, val|
res << case val
when Hash, Array
symbolize_keys(val)
else
val
end
res
}
when Hash
obj.inject({}){|res, (key, val)|
nkey = case key
when String
key.to_sym
else
key
end
nval = case val
when Hash, Array
symbolize_keys(val)
else
val
end
res[nkey] = nval
res
}
else
obj
end
end
end
end