Rails. Обработка SSE-соединений без блокировки серверных потоков с использованием Rack Hijacking API

В данной статье рассматривается использование технологии Server-Sent Events (SSE) для создания постоянного соединения между приложением на Ruby on Rails и клиентом. Акцент сделан на применении Rack Hijacking API для перехвата контроля над соединениями и освобождения потоков веб-сервера приложения с целью обработки большего количества соединений используя меньшее количество серверных потоков.

Server-Sent Events

Технология SSE позволяет создать постоянное однонаправленное HTTP-соединение, при котором сервер способен отправлять поток данных клиенту. В Rails уже имеется инструмент для работы с SSE - модуль ActionController::Live. Далее приведен пример Rails-контроллера, использующего этот модуль.

# ActionController::Live example
#
class TasksController < ActionController::Base
  include ActionController::Live

  def perform
    response.headers["Content-Type"] = "text/event-stream"
    perform_task(response.stream)
  end

  private

  def perform_task(stream)
    sse = ActionController::Live::SSE.new(stream, retry: 300, event: "taskProgress")

    task_progress = 0 # progress percentage

    while task_progress < 100 do
      task_progress += 5
      sse.write(progress: task_progress)
      sleep 1
    end
  ensure
    sse.close
  end
end

В коде демонстрируется периодическая отправка с сервера на клиент информации о степени завершенности условной продолжительной задачи. Код на стороне клиента может выглядеть примерно так:

var source = new EventSource('/tasks/perform');

source.addEventListener('taskProgress', function(e) {
  var progress = JSON.parse(e.data).progress
  console.log(progress);

  if (progress >= 100) {
    source.close();
    console.log("Task Completed!");
  }
});

Для работы с SSE требуется многопоточный веб-сервер, такой как Puma, Passenger, Thin. Включенный в Rails 5 по умолчанию веб-сервер Puma отлично справится с этой задачей.

Проблема блокировки серверных потоков

Максимальное количество клиентских запросов, одновременно обрабатываемых многопоточным сервером, обычно определяется количеством серверных процессов и потоков. Например, один процесс Puma по умолчанию имеет 16 потоков и может одновременно обрабатывать до 16 клиентских запросов (один поток на каждый запрос).

В случае создания SSE-соединений тем способом, как показано в приведенном выше коде, для каждого такого соединения будет задействован отдельный серверный поток, и, следовательно, когда потоки закончатся, очередной клиент не сможет подключиться к серверу, пока они не освободятся.

Очевидным способом решения данной проблемы может быть увеличение количества потоков, но это приведет к чрезмерной затрате ресурсов, которых потребуется намного больше, чем реально необходимо для того, чтобы просто держать SSE-соединение открытым. Это особенно критично при большом количестве соединений.

Есть другой, более эффективный способ, позволяющий устанавливать и обрабатывать SSE-соединения, не блокируя при этом серверные потоки.

Перехват соединения с помощью Rack Hijacking API

Rack Hijacking API позволяет перехватывать контроль над соединением и производить с ним различные действия. Мы можем освободить серверный поток, но при этом держать соединение открытым даже после того, как цикл обработки запроса в Rails-контроллере завершится.

Алгоритм действий следующий:

  • Перехватить соединение (получить контроль над ним) с помощью Rack Hijacking API.
  • Продолжить обработку перехваченного соединения в другом программном потоке.
  • Вернуть ответ из Rails-контроллера, тем самым сообщив веб-серверу, что его работа по обработке запроса завершена и он может освободить серверный поток для обработки новых запросов.

Rack Hijacking API можно использовать двумя способами: полный перехват (Full hijacking) и частичный перехват (Partial hijacking).

  1. Full hijacking позволяет перехватывать соединение сразу после того, как оно установлено, и до того, как приложение (в нашем случае это Rails-приложение) отправляет какие-либо данные, в том числе заголовки. Данный режим обеспечивает полный контроль над соединением, но и возлагает большую ответственность.

    Так как мы перехватываем контроль над соединением, некоторая ответственность за его обработку ложится на нас. Для использования технологии SSE необходимо устанавливать и передавать необходимые заголовки ответа и, что очень важно, нужно не забывать закрывать соединение, когда в нем больше нет необходимости.

  2. Partial hijacking позволяет получить контроль над соединением после того, как приложение отправляет заголовки (headers). При использовании данного режима также требуется закрыть соединение самостоятельно.

В нашем простом примере выбор того или иного способа не сыграет большой роли, тем не менее мы рассмотрим оба из них, чтобы показать их принципиальные различия.

Full hijacking

Работа с Rack Hijacking API данным способом осуществляется через переменные запроса, устанавливаемые Rack и доступные в приложении. В Rails-контроллере доступ к этим переменным осуществляется через Hash-объект request.env. Мы можем убедиться, что наш сервер приложения поддерживает работу с Rack Hijacking API проверив значение request.env['rack.hijack?'], равное true или false.

Перехват контроля над соединением происходит вызовом request.env['rack.hijack'].call, после которого доступ к стриму перехваченного соединения осуществляется через request.env['rack.hijack_io'].

# Rack Full Hijacking API example
#
class TasksController < ActionController::Base
  def perform
    request.env['rack.hijack'].call
    stream = request.env['rack.hijack_io']

    send_headers(stream)

    Thread.new do
      perform_task(stream)
    end

    response.close
  end

  private

  def perform_task(stream)
    sse = ActionController::Live::SSE.new(stream, retry: 300, event: "taskProgress")

    task_progress = 0 # progress percentage

    while task_progress < 100 do
      task_progress += 5
      sse.write(progress: task_progress)
      sleep 1
    end
  ensure
    sse.close
  end

  def send_headers(stream)
    headers = [
      "HTTP/1.1 200 OK",
      "Content-Type: text/event-stream"
    ]
    stream.write(headers.map { |header| header + "\r\n" }.join)
    stream.write("\r\n")
    stream.flush
  rescue
    stream.close
    raise
  end
end

В данном примере мы больше не использовали ActionController::Live, так как он не требовался, но мы по-прежнему использовали класс ActionController::Live::SSE, который предоставляет удобное АПИ для отправки сообщений в требуемом для SSE формате. Но на этот раз при создании объекта этого класса использовался не стрим response.stream, который в предыдущем примере был доступен благодаря включенному модулю ActionController::Live, а стрим перехваченного соединения env["rack.hijack_io"], полученный с помощью Rack Hijacking API.

Обратите внимание, что для того, чтобы завершить работу Rails-контроллера и освободить основной поток приложения, передача данных через перехваченное соединение выполняется в отдельном программном потоке. В более сложном примере можно было бы создать какой-нибудь воркер, отвечающий за асинхронную обработку соединений, а сами соединения складывать в массив, чтобы, например, организовать очередь или иметь хоть какую-то информацию о количестве открытых соединений.

Завершение работы Rails-контроллера произведено с помощью инструкции response.close. Способ завершения здесь не имеет значения, главное - быстрее освободить серверный поток. Можно было явным образом вернуть какой-нибудь ответ используя методы head или render, или ничего не возвращать вообще, тогда бы Rails-контроллер попытался найти связанное с контроллером представление (view) и сформировать ответ самостоятельно. Но, в любом случае, это бы никак не повлияло на взаимодействие с клиентом, и дальнейшую работу приложения, так как клиент получает уже не этот ответ, а тот, который отправляется через перехваченное соединение.

Partial hijacking

Этот способ заключается в присвоении заголовку ответа response.headers["rack.hijack"] Proc-объекта, описывающего дальнейшую обработку перехваченного соединения. Proc-объект будет выполнен после того, как приложение отправит клиенту заголовки. Доступ к стриму перехваченного соединения внутри Proc-объекта осуществляется через его единственный аргумент.

# Rack Partial Hijacking API example
#
class TasksController < ActionController::Base
  def perform
    response.headers["Content-Type"] = "text/event-stream"

    response.headers["rack.hijack"] = proc do |stream|
      Thread.new do
        perform_task(stream)
      end
    end

    head :ok
  end

  private

  def perform_task(stream)
    sse = ActionController::Live::SSE.new(stream, retry: 300, event: "taskProgress")

    task_progress = 0 # progress percentage

    while task_progress < 100 do
      task_progress += 5
      sse.write(progress: task_progress)
      sleep 1
    end
  ensure
    sse.close
  end
end

В этом примере передача требуемых для установления SSE-соединения заголовков (headers) - это ответственность Rails-контроллера. Поэтому в этот раз ответ, возвращаемый Rails-контроллером, имеет значение, ведь именно в нем отправляются заголовки. Перехват соединения осуществляется уже после отправки заголовков, а дальнейшая его обработка идет по сценарию, описанному в Proc-объекте, который присваивается response.headers["rack.hijack"].

Заключение

В статье были приведены примеры использования технологии Server-Sent Events (SSE) в веб-приложении на Ruby on Rails. Рассмотрены способы обработки SSE-соединений с использованием Rails ActionController::Live и Rack Hijacking API. Было показано как Rack Hijacking API позволяет избежать блокировки серверных потоков.

Код демонстрационного Rails-приложения, включающего используемые в статье примеры, доступен по этой ссылке: https://github.com/chumakoff-blog/rails_sse_rack_hijacking_api_example.

Также стоит отметить, что Action Cable, инструмент Rails для работы с WebSockets, также использует Rack Hijacking API. Action Cable внутренне управляет соединениями и обеспечивает их многопоточную обработку. Возможно, в будущем в Rails появятся аналогичные инструменты для работы с SSE.

4
Комментарии (0)

Неизвестный пользователь Войти