Il recupero dal punto di offset del batch precedente non funziona

Ciao, sto facendo progressi con la mia migrazione da un forum Drupal molto grande. Ho riutilizzato la funzione import_private_messages dallo script di importazione di Discuz, che era la più vicina a funzionare con il bizzarro schema del database Drupal. Attualmente sta elaborando oltre 1,5 milioni di messaggi privati, ma ha riscontrato un errore (che ho già risolto) e quando ho riavviato lo script di importazione è ripartito dall’inizio. Ora sta funzionando 3-4 volte più velocemente fino a raggiungere il punto in cui si è verificato l’errore in precedenza, ma non sta riprendendo dall’ultimo valore di offset del batch come fa lo script quando importa i post pubblici. Qualcuno può notare qualcosa di sbagliato nel mio script che impedisce il ripristino?


def import_private_messages
	puts '', 'creazione messaggi privati'

	pm_indexes = 'pm_index'
	pm_messages = 'pm_message'
	total_count = mysql_query("SELECT count(*) count FROM #{pm_indexes}").first['count']

	batches(BATCH_SIZE) do |offset|
		results = mysql_query("\nSELECT pi.mid id, thread_id, pi.recipient to_user_id, pi.deleted deleted, pm.author user_id, pm.subject subject, pm.body message, pm.format, pm.timestamp created_at FROM pm_index pi LEFT JOIN pm_message pm ON pi.mid=pm.mid WHERE deleted = 0
             LIMIT #{BATCH_SIZE}
            OFFSET #{offset};")

		break if results.size < 1

		# next if all_records_exist? :posts, results.map {|m| "pm:#{m['id']}"}

		create_posts(results, total: total_count, offset: offset) do |m|
			skip = false
			mapped = {}
			mapped[:id] = "pm:#{m['id']}"
			mapped[:user_id] = user_id_from_imported_user_id(m['user_id']) || -1
			mapped[:raw] = preprocess_raw(m['message'])
			mapped[:created_at] = Time.zone.at(m['created_at'])
			thread_id = "pm_#{m['thread_id']}"
			if is_first_pm(m['id'], m['thread_id'])
				# trova il titolo dalla tabella dell'elenco
				#          pm_thread = mysql_query("\n
				#                SELECT thread_id, subject
				#                  FROM #{table_name 'ucenter_pm_lists'}
				#                 WHERE plid = #{m['thread_id']};").first
				mapped[:title] = m['subject']
				mapped[:archetype] = Archetype.private_message

          # Trova gli utenti che fanno parte di questo messaggio privato.
          import_user_ids = mysql_query("\n
                SELECT thread_id plid, recipient user_id
                  FROM pm_index
                 WHERE thread_id = #{m['thread_id']};
              ").map { |r| r['user_id'] }.uniq
          mapped[:target_usernames] = import_user_ids.map! do |import_user_id|
            import_user_id.to_s == m['user_id'].to_s ? nil : User.find_by(id: user_id_from_imported_user_id(import_user_id)).try(:username)
          end.compact
          if mapped[:target_usernames].empty? # pm con te stesso?
            skip = true
            puts "Saltando pm:#{m['id']} a causa di nessun destinatario"
          else
            @first_post_id_by_topic_id[thread_id] = mapped[:id]
          end
        else
          parent = topic_lookup_from_imported_post_id(@first_post_id_by_topic_id[thread_id])
          if parent
            mapped[:topic_id] = parent[:topic_id]
          else
            puts "Il post padre pm thread:#{thread_id} non esiste. Saltando #{m["id"]}: #{m["message"][0..40]}"
            skip = true
          end
        end
        skip ? nil : mapped
      end

    end
end