Основы Active Job

Это руководство даст вам все, что нужно, чтобы начать создавать, ставить в очередь и запускать фоновые задания.

После его прочтения, вы узнаете:

  • Как создавать задания.
  • Как ставить в очередь задания.
  • Как запускать задания в фоне.
  • Как асинхронно рассылать письма из вашего приложения.

1. Введение

Active Job - это фреймворк для объявления заданий и их запуска на разных бэкендах для очередей. Эти задания могут быть чем угодно, от регулярно запланированных чисток до списаний с карт или рассылок. В общем, всем, что может быть выделено в небольшие работающие части и запускаться параллельно.

2. Назначение Active Job

Главным является то, что он обеспечивает, что у всех приложений Rails имеется встроенная инфраструктура для заданий. Затем у нас могут появиться особенности фреймворка или других гемов, созданных на его основе, позволяющие не заботится об отличиях в API между различными исполнителями заданий, такими как Delayed Job и Resque. Подбор бэкенда для очередей станет более оперативной работой. Вы сможете переключаться между ними без необходимости переписывать свои задания.

По умолчанию, Rails поставляется с асинхронной реализацией очереди, запускающей задания с помощью пула тредов внутри процесса. Задания будут запущены асинхронно, но любые задания в очереди будут потеряны при перезагрузке.

3. Создание задания

Этот раздел предоставляет пошаговое руководство к созданию задания и добавлению его в очередь.

3.1. Создание задания

Active Job предоставляет генератор Rails для создания заданий. Следующая команда создаст задание в app/jobs (а также тестовый случай в test/jobs):

$ bin/rails generate job guests_cleanup
invoke  test_unit
create    test/jobs/guests_cleanup_job_test.rb
create  app/jobs/guests_cleanup_job.rb

Также можно создать задание, которое будет запущено в определенной очереди:

$ bin/rails generate job guests_cleanup --queue urgent

Если не хотите использовать генератор, можно создать файл очереди в app/jobs, просто убедитесь, что он наследуется от ApplicationJob.

Вот как выглядит задание:

class GuestsCleanupJob < ApplicationJob
  queue_as :default

  def perform(*guests)
    # Do something later
  end
end

Отметьте, что можно определить perform с любым количеством аргументов.

3.2. Помещение задания в очередь

Поместить задание в очередь можно так:

# Помещенное в очередь задание выполнится, как только освободится система очередей.
GuestsCleanupJob.perform_later guest

# Помещенное в очередь задание выполнится завтра в полдень.
GuestsCleanupJob.set(wait_until: Date.tomorrow.noon).perform_later(guest)

# Помещенное в очередь задание выполнится через неделю.
GuestsCleanupJob.set(wait: 1.week).perform_later(guest)

# `perform_now` и `perform_later` вызывают `perform`, поэтому
# можно передать столько аргументов, сколько определено в последнем.
GuestsCleanupJob.perform_later(guest1, guest2, filter: 'some_filter')

Вот и все!

4. Запуск заданий

Чтобы поместить задание в очередь и выполнить его в production, необходимо настроить бэкенд для очереди, т.е. нужно решить, какую стороннюю библиотеку для очереди Rails будет использовать. Rails предоставляет только внутрипроцессную систему очереди, хранящую задания в памяти. Если процесс упадет, или машина будет перезагружена, тогда в асинхронном бэкенде по умолчанию все оставшиеся задания будут потеряны. Это может быть нормальным для маленьких приложений или некритичных заданий, но для большей части серьезных приложений нужно подобрать сохраняющий бэкенд.

4.1. Бэкенды

У Active Job есть встроенные адаптеры для различных бэкендов очередей (Sidekiq, Resque, Delayed Job и другие). Чтобы получить актуальный список адаптеров, обратитесь к документации API по ActiveJob::QueueAdapters.

4.2. Настройка бэкенда

Настроить бэкенд — это просто:

# config/application.rb
module YourApp
  class Application < Rails::Application
    # Убедитесь, что гем адаптера добавлен в Gemfile, и что выполнены
    # инструкции по установке и развертыванию адаптера.
    config.active_job.queue_adapter = :sidekiq
  end
end

Также можно настроить бэкенд для отдельного задания.

class GuestsCleanupJob < ApplicationJob
  self.queue_adapter = :resque
  #....
end

# Теперь ваше задание будет использовать `resque` в качестве адаптера бэкенда очереди,
# переопределяя тот, что был настроен в `config.active_job.queue_adapter`.

4.3. Запуск бэкенда

Поскольку задания запускаются параллельно с вашим Rails приложением, большинство библиотек для работы с очередями требуют запуска специфичного для библиотеки сервиса очередей (помимо старта Rails приложения) для обработки заданий. Обратитесь к документации по библиотеке за инструкциями по запуску бэкенда очереди.

Вот неполный список документации:

5. Очереди

Большая часть адаптеров поддерживает несколько очередей. С помощью Active Job можно запланировать, что задание будет выполнено в определенной очереди:

class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  #....
end

Можно задать префикс для имени очереди для всех заданий с помощью config.active_job.queue_name_prefix в application.rb:

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.active_job.queue_name_prefix = Rails.env
  end
end

# app/jobs/guests_cleanup_job.rb
class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  #....
end

# Теперь ваше задание запустится в очереди production_low_priority в среде
# production и в staging_low_priority в среде staging

Разделитель префикса имени очереди по умолчанию '_'. Его можно изменить, установив config.active_job.queue_name_delimiter в application.rb:

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.active_job.queue_name_prefix = Rails.env
    config.active_job.queue_name_delimiter = '.'
  end
end

# app/jobs/guests_cleanup_job.rb
class GuestsCleanupJob < ApplicationJob
  queue_as :low_priority
  #....
end

# Теперь ваше задание запустится в очереди production.low_priority в среде
# production и в staging.low_priority в среде staging

Если хотите больше контроля, в какой очереди задание будет запущено, можно передать опцию :queue в #set:

MyJob.set(queue: :another_queue).perform_later(record)

Чтобы контролировать очередь на уровне задания, можно передать блок в #queue_as. Блок будет выполнен в контексте задания (таким образом, у вас будет доступ к self.arguments), и он должен вернуть имя очереди:

class ProcessVideoJob < ApplicationJob
  queue_as do
    video = self.arguments.first
    if video.owner.premium?
      :premium_videojobs
    else
      :videojobs
    end
  end

  def perform(video)
    # Делаем обработку видео
  end
end

ProcessVideoJob.perform_later(Video.last)

Убедитесь, что ваш бэкенд для очередей "слушает" имя вашей очереди. Для некоторых бэкендов необходимо указать очереди, которые нужно слушать.

6. Колбэки

Active Job предоставляет хуки для включения логики на протяжение жизненного цикла задания. Подобно другим колбэкам в Rails, можно реализовывать колбэки как обычные методы и использовать макрос-метод класса, чтобы зарегистрировать их в качестве колбэков:

class GuestsCleanupJob < ApplicationJob
  queue_as :default

  around_perform :around_cleanup

  def perform
    # Отложенное задание
  end

  private
    def around_cleanup(job)
      # Делаем что-то перед perform
      yield
      # Делаем что-то после perform
    end
end

Макрос-методы класса также могут принимать блок. Рассмотрите возможность использования этого макроса, если код внутри блока настолько короток, что он помещается в одну строчку. Например, можно отправлять показатели для каждого помещенного в очередь задания.

class ApplicationJob
  before_enqueue { |job| $statsd.increment "#{job.name.underscore}.enqueue" }
end

6.1. Доступные колбэки

  • before_enqueue
  • around_enqueue
  • after_enqueue
  • before_perform
  • around_perform
  • after_perform

7. Action Mailer

Одним из обычных заданий в современном веб-приложении является рассылка писем за пределами цикла запроса-отклика, чтобы пользователь не ждал. Active Job интегрируется с Action Mailer, поэтому рассылать письма асинхронно очень просто:

# Если хотите отправить письмо сейчас, используйте #deliver_now
UserMailer.welcome(@user).deliver_now

# Если хотите отправить письмо через Active Job, используйте #deliver_later
UserMailer.welcome(@user).deliver_later

Использование асинхронной очереди из задач Rake (например, для отправки электронной почты с помощью .deliver_later), как правило, не будет работать, потому что Rake, вероятно, завершится, в результате чего пул тредов внутри процесса будет удален до того, как любой/все из .deliver_later писем будут обработаны. Чтобы избежать этой проблемы, используйте .deliver_now или запустите сохраняющую очередь в development режиме.

8. Интернационализация

Каждое задание использует настройку I18n.locale при создании. Это полезно, если вы отправляете письма асинхронно:

I18n.locale = :eo

UserMailer.welcome(@user).deliver_later # Email будет локализован в Эсперанто.

9. GlobalID

Active Job поддерживает GlobalID для параметров. Это позволяет передавать объекты Active Record в ваши задания, вместо пар класс/id, которые нужно затем десериализовать вручную. Раньше задания выглядели так:

class TrashableCleanupJob < ApplicationJob
  def perform(trashable_class, trashable_id, depth)
    trashable = trashable_class.constantize.find(trashable_id)
    trashable.cleanup(depth)
  end
end

Теперь можно просто сделать так:

class TrashableCleanupJob < ApplicationJob
  def perform(trashable, depth)
    trashable.cleanup(depth)
  end
end

Это работает с любым классом, в который подмешан GlobalID::Identification, который по умолчанию был подмешан в классы Active Record.

10. Исключения

Active Job предоставляет способ отлова исключений, возникших в течение запуска задания:


class GuestsCleanupJob < ApplicationJob
  queue_as :default

  rescue_from(ActiveRecord::RecordNotFound) do |exception|
    # Сделать что-то с этим исключением
  end

  def perform
    # Отложенное задание
  end
end

10.1. Повторная отправка или отмена неудачных заданий

Также возможно повторить отправку или отменить задание, если во время выполнения было вызвано исключение.

Например:

class RemoteServiceJob < ApplicationJob
  retry_on CustomAppException # по умолчанию, ожидание: 3 сек., попыток: 5

  discard_on ActiveJob::DeserializationError

  def perform(*args)
    # Может быть вызвано CustomAppException или ActiveJob::DeserializationError
  end
end

Более подробную информацию смотрите в документации по API для ActiveJob::Exceptions.

10.2. Десериализация

GlobalID позволяет сериализовать полностью объекты Active Record, переданные в #perform.

Если переданная запись была удалена после того, как задание было помещено в очередь, но до того, как метод #perform был вызван, Active Job вызовет исключение ActiveJob::DeserializationError.

11. Тестирование заданий

Вы можете найти подробные инструкции о том, как тестировать ваши задания в руководстве по тестированию.