La recogida del último punto de desplazamiento por lotes no funciona

Hola, estoy progresando con mi migración de un foro muy grande de Drupal. Reutilicé la función import_private_messages del script de importación de Discuz, que fue lo más cercano a funcionar con el extraño esquema de base de datos de Drupal. Actualmente está procesando más de 1.5 millones de mensajes privados, pero encontró un error (que ya solucioné) y cuando reinicié el script de importación, comenzó de nuevo desde el principio. Ahora va 3-4 veces más rápido hasta que llega al punto donde falló antes, pero definitivamente no está continuando desde el último valor de desplazamiento del lote como lo hace el script al importar publicaciones públicas. ¿Alguien puede notar algo malo en mi script que esté impidiendo que la recuperación funcione?


def import_private_messages
	puts '', 'creando mensajes privados'

	pm_indexes = 'pm_index'
	pm_messages = 'pm_message'
	total_count = mysql_query("SELECT count(*) count FROM #{pm_indexes}").first['count']

	batches(BATCH_SIZE) do |offset|
		results = mysql_query("
SELECT pi.mid id, thread_id, pi.recipient to_user_id, pi.deleted deleted, pm.author user_id, pm.subject subject, pm.body message, pm.format, pm.timestamp created_at FROM pm_index pi LEFT JOIN pm_message pm ON pi.mid=pm.mid WHERE deleted = 0
             LIMIT #{BATCH_SIZE}
            OFFSET #{offset};")

		break if results.size < 1

		# next if all_records_exist? :posts, results.map {|m| "pm:#{m['id']}"}

		create_posts(results, total: total_count, offset: offset) do |m|
			skip = false
			mapped = {}
			mapped[:id] = "pm:#{m['id']}"
			mapped[:user_id] = user_id_from_imported_user_id(m['user_id']) || -1
			mapped[:raw] = preprocess_raw(m['message'])
			mapped[:created_at] = Time.zone.at(m['created_at'])
			thread_id = "pm_#{m['thread_id']}"
			if is_first_pm(m['id'], m['thread_id'])
				# encontrar el título de la tabla de lista
				#          pm_thread = mysql_query("
				#                SELECT thread_id, subject
				#                  FROM #{table_name 'ucenter_pm_lists'}
				#                 WHERE plid = #{m['thread_id']};").first
				mapped[:title] = m['subject']
				mapped[:archetype] = Archetype.private_message

          # Encontrar los usuarios que forman parte de este mensaje privado.
          import_user_ids = mysql_query("
                SELECT thread_id plid, recipient user_id
                  FROM pm_index
                 WHERE thread_id = #{m['thread_id']};
              ").map { |r| r['user_id'] }.uniq
          mapped[:target_usernames] = import_user_ids.map! do |import_user_id|
            import_user_id.to_s == m['user_id'].to_s ? nil : User.find_by(id: user_id_from_imported_user_id(import_user_id)).try(:username)
          end.compact
          if mapped[:target_usernames].empty? # ¿pm contigo mismo?
            skip = true
            puts "Omitiendo pm:#{m['id']} debido a que no hay destinatario"
          else
            @first_post_id_by_topic_id[thread_id] = mapped[:id]
          end
        else
          parent = topic_lookup_from_imported_post_id(@first_post_id_by_topic_id[thread_id])
          if parent
            mapped[:topic_id] = parent[:topic_id]
          else
            puts "El post padre del hilo de pm:#{thread_id} no existe. Omitiendo #{m["id"]}: #{m["message"][0..40]}"
            skip = true
          end
        end
        skip ? nil : mapped
      end

    end
end