Pickup from last batch offset point not working

Hi there, I’m making progress with my migration from a very large Drupal forum. I repurposed the import_private_messages function from the Discuz importer script, which was the closest to working with the weird Drupal database schema. It’s currently crunching through over 1.5M private messages, but it hit an error (that I already worked around) and when I restarted the importer script it started at the beginning again. It’s running 3-4x faster now until it reaches the point where it failed before, but it’s definitely not picking up at the last batch offset value like the script does when importing public posts. Can anybody notice anything wrong with my script that’s preventing the recovery from working?


def import_private_messages
	puts '', 'creating private messages'

	pm_indexes = 'pm_index'
	pm_messages = 'pm_message'
	total_count = mysql_query("SELECT count(*) count FROM #{pm_indexes}").first['count']

	batches(BATCH_SIZE) do |offset|
		results = mysql_query("
SELECT pi.mid id, thread_id, pi.recipient to_user_id, pi.deleted deleted, pm.author user_id, pm.subject subject, pm.body message, pm.format, pm.timestamp created_at FROM pm_index pi LEFT JOIN pm_message pm ON pi.mid=pm.mid WHERE deleted = 0
             LIMIT #{BATCH_SIZE}
            OFFSET #{offset};")

		break if results.size < 1

		# next if all_records_exist? :posts, results.map {|m| "pm:#{m['id']}"}

		create_posts(results, total: total_count, offset: offset) do |m|
			skip = false
			mapped = {}
			mapped[:id] = "pm:#{m['id']}"
			mapped[:user_id] = user_id_from_imported_user_id(m['user_id']) || -1
			mapped[:raw] = preprocess_raw(m['message'])
			mapped[:created_at] = Time.zone.at(m['created_at'])
			thread_id = "pm_#{m['thread_id']}"
			if is_first_pm(m['id'], m['thread_id'])
				# find the title from list table
				#          pm_thread = mysql_query("
				#                SELECT thread_id, subject
				#                  FROM #{table_name 'ucenter_pm_lists'}
				#                 WHERE plid = #{m['thread_id']};").first
				mapped[:title] = m['subject']
				mapped[:archetype] = Archetype.private_message

          # Find the users who are part of this private message.
          import_user_ids = mysql_query("
                SELECT thread_id plid, recipient user_id
                  FROM pm_index
                 WHERE thread_id = #{m['thread_id']};
              ").map { |r| r['user_id'] }.uniq
          mapped[:target_usernames] = import_user_ids.map! do |import_user_id|
            import_user_id.to_s == m['user_id'].to_s ? nil : User.find_by(id: user_id_from_imported_user_id(import_user_id)).try(:username)
          end.compact
          if mapped[:target_usernames].empty? # pm with yourself?
            skip = true
            puts "Skipping pm:#{m['id']} due to no target"
          else
            @first_post_id_by_topic_id[thread_id] = mapped[:id]
          end
        else
          parent = topic_lookup_from_imported_post_id(@first_post_id_by_topic_id[thread_id])
          if parent
            mapped[:topic_id] = parent[:topic_id]
          else
            puts "Parent post pm thread:#{thread_id} doesn't exist. Skipping #{m["id"]}: #{m["message"][0..40]}"
            skip = true
          end
        end
        skip ? nil : mapped
      end

    end
end