Submitting many ActionMail jobs at once with Sidekiq

Posted on August 24, 2021 by wjwh


I recently came across a situation where enqueueing several thousand mails from a Rails app to Sidekiq was surprisingly slow and the fix surprisingly complex, so I decided to write a blog post about it both for myself (if I need this again) and others (if they have the same problem).

The core problem starts when you have a bunch of mails to send that are similar but slightly different. In this case, this was because we wanted to include the name of the user (ie "Hello, #{user.first_name}") but there are many different scenarios where this could pop up. The most natural way to write this in Rails would be something like:

user_ids = User.where(<some condition here>)

user_ids.each do |user_id|
  MyMailer.some_mail_type.deliver_later(user_id)
end

This will loop over all the user ids and enqueue the mail defined by some_mail_type from MyMailer for each of the ids. ActionMailer is integrated with the ActiveJob system, so it is usually a good idea to use deliver_later instead of deliver_now and let the background workers take care of it without blocking the current request. There are many adapters to adapt ActiveJob to different background worker systems like Sidekiq, Sqewer and Resque. However, this can be surprisingly slow. Submitting “only” ten thousand mails to Sidekiq took over 40 minutes! Needless to say, 40 minutes is an eternity in computer time and sending ten thousand messages to Redis should take a few seconds at most. What is going on here?

Root cause

Because ActiveJob supports many different background worker systems, the adapters sometimes need to make some compromises to match their interface to the one expected by ActiveJob. This means that some of the fancier features often can’t be used, because they don’t map well onto the ActiveJob interface. In this case the offending code lives in the Sidekiq adapter file in the Rails repo:

class SidekiqAdapter
  def enqueue(job) #:nodoc:
    # Sidekiq::Client does not support symbols as keys
    job.provider_job_id = Sidekiq::Client.push \
      "class"   => JobWrapper,
      "wrapped" => job.class,
      "queue"   => job.queue_name,
      "args"    => [ job.serialize ]
  end

  # Some non-relevant code omitted for clarity

  class JobWrapper #:nodoc:
    include Sidekiq::Worker

    def perform(job_data)
      Base.execute job_data.merge("provider_job_id" => jid)
    end
  end
end

It looks very innocent, but notice that the Sidekiq::Client.push does not seem to be accepting an existing Redis connection. Perhaps Sidekiq maintains an internal connection (or even a connection pool) for us? Let’s inspect:

module Sidekiq
  class Client
    # This class is pretty big, so we're ignoring most of it for clarity

    class << self
      def push(item)
        new.push(item)
      end
    end

    def push(item)
      normed = normalize_item(item)
      payload = process_single(item["class"], normed)

      if payload
        raw_push([payload])
        payload["jid"]
      end
    end

    def raw_push(payloads)
      @redis_pool.with do |conn|
        conn.multi do
          atomic_push(conn, payloads)
        end
      end
      true
    end
  end

Whew! After looking at the code there definitely seems to be some connection pool involved, although Sidekiq::Client.push just constructing a new Client object worries me a bit. But where does that connection pool come from? Let’s look what the initializer for Sidekiq::Client looks like:

def initialize(redis_pool = nil)
  @redis_pool = 
    redis_pool ||
    Thread.current[] ||
    Sidekiq.redis_pool
end

Whew! If no pool is passed in as an argument (which it isn’t), and if the current Thread does not have any existing pool stored in a thread-local variable (almost never the case unless you do it on purpose, and Rails definitely doesn’t do it by default), then it will simply request the main Redis pool from Sidekiq and request a new connection from that. Since this redis_pool is indeed cached, this will usually result in the reuse of an existing connection. However, even though the connection gets reused there will still be N network roundtrips to Redis to post N messages. If N is large and/or the Redis server is on another server, this time spent sending messages one by one can really add up.

The push_bulk method

Sidekiq does have a built-in method to submit multiple jobs in one call, but it is not exposed by Rails. The push_bulk method simply takes an array of jobs to be executed just as push will take a single job. These jobs then get sent to Redis in one command, saving a lot of roundtrips. As mentioned, Rails does not make this available for ActiveJob and/or ActionMailer but with some finagling we can build our own implementation of it.

Submitting mailer jobs with push_bulk

A mailer job is an instance of ActionMailer::DeliveryJob, so we will need to submit jobs of that type to Redis. In addition, in Rails a Mailer class typically contains many “template” methods for different but related mail types. For example, the UserMailer might have methods for both welcome and password reset emails. This means that our enqueue_many_mails method needs to take both a mailer class and a template method in addition to an array of arguments. The push_bulk method expects a hash containing the class (always the JobWrapper), the class of the wrapped job, the queue to submit the jobs into and then an array of serialized ActionMailer::DeliveryJobs. The DeliveryJob itself then takes the mailer class, template, a magic string containing "deliver_now" and the arguments for the job. We can easily convert from the array of arguments to an array of DeliveryJobs with the map method from Enumerable:

def enqueue_many_mails(mail_class, template, args_array)
  job = ActionMailer::Parameterized::DeliveryJob

  # convert template and args array into an array of arrays containing args
  # for ActionMailer::DeliveryJob objects
  mailer_job_args = args_array.map { |args|
    [job.new(
      mail_class.name,
      template.to_s,
      "deliver_now",
      *args
    ).serialize]
  }

  # Adapted from the Activejob code for a single job at
  # https://github.com/rails/rails/blob/main/activejob/
  #    lib/active_job/queue_adapters/sidekiq_adapter.rb

  Sidekiq::Client.push_bulk(
    "class" => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper,
    "wrapped" => job,
    "queue" => MAILER_QUEUE,
    "args" => mailer_job_args
  )
end

Parametrized mails

If you use parametrized mailers (ie using MyMailer.with(some_arg: "foo").deliver_later), it works almost the same as before but now we need to use the ActionMailer::Parameterized::DeliveryJob class as the job class. The arguments that previously went into with() now go into the args array as the fourth arguments of the constructor:

def enqueue_many_parametrized_mails(mail_class, template, args_array)
  job = ActionMailer::Parameterized::DeliveryJob

  # convert template and args array into an array of arrays containing args
  # for ActionMailer::DeliveryJob objects
  mailer_job_args = args_array.map { |args|
    [job.new(
      mail_class.name,
      template.to_s,
      "deliver_now",
      {"foo"},
      *args
    ).serialize]
  }

  Sidekiq::Client.push_bulk(
    "class" => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper,
    "wrapped" => job,
    "queue" => MAILER_QUEUE,
    "args" => mailer_job_args
  )
end

In this case we hardcoded the parameters for the job but they can obviously also be passed into the method as another argument.

Integrating this into your app

So now that we know how to use the push_bulk method to submit many mails at once, how should you integrate this into your app? The flexibility of Ruby and Rails provides many possible points where we can make a few extra methods available to our code. For example, you can monkeypatch parts of Rails, add in the methods as-needed into the jobs that need them or even add in a new gem that abstracts these methods. Since we only had a few backgrounds jobs that actually needed this functionality, we eventually settled on making a Concern module that could be included into any jobs which requires it.

Conclusion

Many job queueing systems provide functionality for submitting more than one job at a time, but ActiveJob does not re-expose these methods so we need to do it ourselves. It would be pretty nice if this would eventually be a thing, but in the meantime we can use the methods in this post to access that functionality “by hand”. Though we didn’t go into detail about in this blog post, you can of course make similar methods for non-mailer jobs as well. Happy mail enqueueing!