Coleta do último ponto de deslocamento do lote não está funcionando

Olá, estou progredindo com minha migração de um fórum Drupal muito grande. Reaproveitei a função import_private_messages do script de importação do Discuz, que foi o mais próximo de funcionar com o estranho esquema de banco de dados do Drupal. Atualmente, está processando mais de 1,5 milhão de mensagens privadas, mas encontrou um erro (que já contornei) e, quando reiniciei o script de importação, ele começou do início novamente. Agora está rodando 3-4 vezes mais rápido até atingir o ponto em que falhou antes, mas definitivamente não está retomando no último valor de deslocamento em lote como o script faz ao importar postagens públicas. Alguém pode notar algo errado em meu script que está impedindo a recuperação de funcionar?


def import_private_messages
	puts '', 'criando mensagens privadas'

	pm_indexes = 'pm_index'
	pm_messages = 'pm_message'
	total_count = mysql_query("SELECT count(*) count FROM #{pm_indexes}").first['count']

	batches(BATCH_SIZE) do |offset|
		results = mysql_query("\nSELECT pi.mid id, thread_id, pi.recipient to_user_id, pi.deleted deleted, pm.author user_id, pm.subject subject, pm.body message, pm.format, pm.timestamp created_at FROM pm_index pi LEFT JOIN pm_message pm ON pi.mid=pm.mid WHERE deleted = 0
             LIMIT #{BATCH_SIZE}
            OFFSET #{offset};")

		break if results.size < 1

		# next if all_records_exist? :posts, results.map {|m| "pm:#{m['id']}"}

		create_posts(results, total: total_count, offset: offset) do |m|
			skip = false
			mapped = {}
			mapped[:id] = "pm:#{m['id']}"
			mapped[:user_id] = user_id_from_imported_user_id(m['user_id']) || -1
			mapped[:raw] = preprocess_raw(m['message'])
			mapped[:created_at] = Time.zone.at(m['created_at'])
			thread_id = "pm_#{m['thread_id']}"
			if is_first_pm(m['id'], m['thread_id'])
				# encontrar o título na tabela de lista
				#          pm_thread = mysql_query("\n
				#                SELECT thread_id, subject
				#                  FROM #{table_name 'ucenter_pm_lists'}
				#                 WHERE plid = #{m['thread_id']};").first
				mapped[:title] = m['subject']
				mapped[:archetype] = Archetype.private_message

          # Encontrar os usuários que fazem parte desta mensagem privada.
          import_user_ids = mysql_query("\n
                SELECT thread_id plid, recipient user_id
                  FROM pm_index
                 WHERE thread_id = #{m['thread_id']};
              ").map { |r| r['user_id'] }.uniq
          mapped[:target_usernames] = import_user_ids.map! do |import_user_id|
            import_user_id.to_s == m['user_id'].to_s ? nil : User.find_by(id: user_id_from_imported_user_id(import_user_id)).try(:username)
          end.compact
          if mapped[:target_usernames].empty? # pm consigo mesmo?
            skip = true
            puts "Pulando pm:#{m['id']} devido à falta de destino"
          else
            @first_post_id_by_topic_id[thread_id] = mapped[:id]
          end
        else
          parent = topic_lookup_from_imported_post_id(@first_post_id_by_topic_id[thread_id])
          if parent
            mapped[:topic_id] = parent[:topic_id]
          else
            puts "Post pai pm thread:#{thread_id} não existe. Pulando #{m["id"]}: #{m["message"][0..40]}"
            skip = true
          end
        end
        skip ? nil : mapped
      end

    end
end