* VRFS-3435 allow multiple onboarders to execute

This commit is contained in:
Seth Call 2015-08-15 06:22:46 -05:00
parent c4981376ac
commit e62d9e126f
2 changed files with 48 additions and 5 deletions

View File

@ -355,14 +355,54 @@ module JamRuby
found[0] # 3 letter code
end
#http://stackoverflow.com/questions/22740252/how-to-generate-javas-string-hashcode-using-ruby
def jhash(str)
result = 0
mul = 1
max_mod = 2**31 - 1
str.chars.reverse_each do |c|
result += mul * c.ord
result %= max_mod
mul *= 31
end
result
end
def prevent_concurrent_processing(metalocation)
# use a PG advisory lock to see if someone else is doing this same unit of work right now
track_code = jhash(metalocation)
locked = ActiveRecord::Base.connection.execute("SELECT pg_try_advisory_xact_lock(#{track_code})").values[0][0]
if locked == 'f'
finish("other_processing", "")
raise ActiveRecord::Rollback
end
end
def synchronize_metadata(jam_track, metadata, metalocation, original_artist, name, options)
metadata ||= {}
self.name = metadata["name"] || name
prevent_concurrent_processing(metalocation)
if jam_track.new_record?
latest_jamtrack = JamTrack.order('created_at desc').first
id = latest_jamtrack.nil? ? 1 : latest_jamtrack.id.to_i + 1
if ENV['NODE_NUMBER']
# complicated goofy code to support parallel processing of importers
node_number = ENV['NODE_NUMBER'].to_i
node_count = ENV['NODE_COUNT'].to_i
raise "NO NODE_COUNT" if node_count == 0
r = id % node_count
id = r + id # get to the same base number if both are working at the same time
id = id + node_number # offset by your node number
@@log.debug("JAM TRACK ID: #{id}")
end
jam_track.id = "#{id}" # default is UUID, but the initial import was based on auto-increment ID, so we'll maintain that
jam_track.status = 'Staging'
jam_track.metalocation = metalocation
@ -399,6 +439,7 @@ module JamRuby
end
@@log.debug("about to save")
saved = jam_track.save
if !saved
@ -1566,7 +1607,7 @@ module JamRuby
@@log.info("-------")
importers.each do |importer|
if importer
if importer.reason == "success" || importer.reason == "jam_track_exists"
if importer.reason == "success" || importer.reason == "jam_track_exists" || importer.reason == "other_processing"
@@log.info("#{importer.name} #{importer.reason}")
else
@@log.error("#{importer.name} failed to import.")
@ -1613,7 +1654,7 @@ module JamRuby
@@log.info("-------")
importers.each do |importer|
if importer
if importer.reason == "success" || importer.reason == "jam_track_exists"
if importer.reason == "success" || importer.reason == "jam_track_exists" || importer.reason == "other_processing"
@@log.info("#{importer.name} #{importer.reason}")
else
@@log.error("#{importer.name} failed to import.")
@ -1686,7 +1727,7 @@ module JamRuby
importer = synchronize_from_meta(metalocation, options)
importers << importer
if importer.reason != 'jam_track_exists'
if importer.reason != 'jam_track_exists' && importer.reason != "other_processing"
count+=1
end
@ -1700,7 +1741,7 @@ module JamRuby
@@log.info("-------")
importers.each do |importer|
if importer
if importer.reason == "success" || importer.reason == "jam_track_exists"
if importer.reason == "success" || importer.reason == "jam_track_exists" || importer.reason == "other_processing"
@@log.info("#{importer.name} #{importer.reason}")
else
@@log.error("#{importer.name} failed to import.")
@ -1851,7 +1892,10 @@ module JamRuby
def sync_from_metadata(jam_track, meta, metalocation, options)
jam_track_importer = JamTrackImporter.new(@storage_format)
JamTrack.connection.execute('SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED')
JamTrack.transaction do
#begin
jam_track_importer.synchronize(jam_track, meta, metalocation, options)
#rescue Exception => e

View File

@ -31,6 +31,5 @@ module JamRuby
GenericState.find('default')
end
end
end