Rails. Обработка SSE-соединений без блокировки серверных потоков с использованием Rack Hijacking API
В данной статье рассматривается использование технологии Server-Sent Events (SSE) для создания постоянного соединения между приложением на Ruby on Rails и клиентом. Акцент сделан на применении Rack Hijacking API для перехвата контроля над соединениями и освобождения потоков веб-сервера приложения с целью обработки большего количества соединений используя меньшее количество серверных потоков.
Server-Sent Events
Технология SSE позволяет создать постоянное однонаправленное HTTP-соединение, при котором сервер способен отправлять поток данных клиенту. В Rails уже имеется инструмент для работы с SSE - модуль ActionController::Live
. Далее приведен пример Rails-контроллера, использующего этот модуль.
# ActionController::Live example
#
class TasksController < ActionController::Base
include ActionController::Live
def perform
response.headers["Content-Type"] = "text/event-stream"
perform_task(response.stream)
end
private
def perform_task(stream)
sse = ActionController::Live::SSE.new(stream, retry: 300, event: "taskProgress")
task_progress = 0 # progress percentage
while task_progress < 100 do
task_progress += 5
sse.write(progress: task_progress)
sleep 1
end
ensure
sse.close
end
end
В коде демонстрируется периодическая отправка с сервера на клиент информации о степени завершенности условной продолжительной задачи. Код на стороне клиента может выглядеть примерно так:
var source = new EventSource('/tasks/perform');
source.addEventListener('taskProgress', function(e) {
var progress = JSON.parse(e.data).progress
console.log(progress);
if (progress >= 100) {
source.close();
console.log("Task Completed!");
}
});
Для работы с SSE требуется многопоточный веб-сервер, такой как Puma, Passenger, Thin. Включенный в Rails 5 по умолчанию веб-сервер Puma отлично справится с этой задачей.
Проблема блокировки серверных потоков
Максимальное количество клиентских запросов, одновременно обрабатываемых многопоточным сервером, обычно определяется количеством серверных процессов и потоков. Например, один процесс Puma по умолчанию имеет 16 потоков и может одновременно обрабатывать до 16 клиентских запросов (один поток на каждый запрос).
В случае создания SSE-соединений тем способом, как показано в приведенном выше коде, для каждого такого соединения будет задействован отдельный серверный поток, и, следовательно, когда потоки закончатся, очередной клиент не сможет подключиться к серверу, пока они не освободятся.
Очевидным способом решения данной проблемы может быть увеличение количества потоков, но это приведет к чрезмерной затрате ресурсов, которых потребуется намного больше, чем реально необходимо для того, чтобы просто держать SSE-соединение открытым. Это особенно критично при большом количестве соединений.
Есть другой, более эффективный способ, позволяющий устанавливать и обрабатывать SSE-соединения, не блокируя при этом серверные потоки.
Перехват соединения с помощью Rack Hijacking API
Rack Hijacking API позволяет перехватывать контроль над соединением и производить с ним различные действия. Мы можем освободить серверный поток, но при этом держать соединение открытым даже после того, как цикл обработки запроса в Rails-контроллере завершится.
Алгоритм действий следующий:
- Перехватить соединение (получить контроль над ним) с помощью Rack Hijacking API.
- Продолжить обработку перехваченного соединения в другом программном потоке.
- Вернуть ответ из Rails-контроллера, тем самым сообщив веб-серверу, что его работа по обработке запроса завершена и он может освободить серверный поток для обработки новых запросов.
Rack Hijacking API можно использовать двумя способами: полный перехват (Full hijacking) и частичный перехват (Partial hijacking).
Full hijacking позволяет перехватывать соединение сразу после того, как оно установлено, и до того, как приложение (в нашем случае это Rails-приложение) отправляет какие-либо данные, в том числе заголовки. Данный режим обеспечивает полный контроль над соединением, но и возлагает большую ответственность.
Так как мы перехватываем контроль над соединением, некоторая ответственность за его обработку ложится на нас. Для использования технологии SSE необходимо устанавливать и передавать необходимые заголовки ответа и, что очень важно, нужно не забывать закрывать соединение, когда в нем больше нет необходимости.
Partial hijacking позволяет получить контроль над соединением после того, как приложение отправляет заголовки (headers). При использовании данного режима также требуется закрыть соединение самостоятельно.
В нашем простом примере выбор того или иного способа не сыграет большой роли, тем не менее мы рассмотрим оба из них, чтобы показать их принципиальные различия.
Full hijacking
Работа с Rack Hijacking API данным способом осуществляется через переменные запроса, устанавливаемые Rack и доступные в приложении. В Rails-контроллере доступ к этим переменным осуществляется через Hash-объект request.env
. Мы можем убедиться, что наш сервер приложения поддерживает работу с Rack Hijacking API проверив значение request.env['rack.hijack?']
, равное true
или false
.
Перехват контроля над соединением происходит вызовом request.env['rack.hijack'].call
, после которого доступ к стриму перехваченного соединения осуществляется через request.env['rack.hijack_io']
.
# Rack Full Hijacking API example
#
class TasksController < ActionController::Base
def perform
request.env['rack.hijack'].call
stream = request.env['rack.hijack_io']
send_headers(stream)
Thread.new do
perform_task(stream)
end
response.close
end
private
def perform_task(stream)
sse = ActionController::Live::SSE.new(stream, retry: 300, event: "taskProgress")
task_progress = 0 # progress percentage
while task_progress < 100 do
task_progress += 5
sse.write(progress: task_progress)
sleep 1
end
ensure
sse.close
end
def send_headers(stream)
headers = [
"HTTP/1.1 200 OK",
"Content-Type: text/event-stream"
]
stream.write(headers.map { |header| header + "\r\n" }.join)
stream.write("\r\n")
stream.flush
rescue
stream.close
raise
end
end
В данном примере мы больше не использовали ActionController::Live
, так как он не требовался, но мы по-прежнему использовали класс ActionController::Live::SSE
, который предоставляет удобное АПИ для отправки сообщений в требуемом для SSE формате. Но на этот раз при создании объекта этого класса использовался не стрим response.stream
, который в предыдущем примере был доступен благодаря включенному модулю ActionController::Live
, а стрим перехваченного соединения env["rack.hijack_io"]
, полученный с помощью Rack Hijacking API.
Обратите внимание, что для того, чтобы завершить работу Rails-контроллера и освободить основной поток приложения, передача данных через перехваченное соединение выполняется в отдельном программном потоке. В более сложном примере можно было бы создать какой-нибудь воркер, отвечающий за асинхронную обработку соединений, а сами соединения складывать в массив, чтобы, например, организовать очередь или иметь хоть какую-то информацию о количестве открытых соединений.
Завершение работы Rails-контроллера произведено с помощью инструкции response.close
. Способ завершения здесь не имеет значения, главное - быстрее освободить серверный поток. Можно было явным образом вернуть какой-нибудь ответ используя методы head
или render
, или ничего не возвращать вообще, тогда бы Rails-контроллер попытался найти связанное с контроллером представление (view) и сформировать ответ самостоятельно. Но, в любом случае, это бы никак не повлияло на взаимодействие с клиентом, и дальнейшую работу приложения, так как клиент получает уже не этот ответ, а тот, который отправляется через перехваченное соединение.
Partial hijacking
Этот способ заключается в присвоении заголовку ответа response.headers["rack.hijack"]
Proc-объекта, описывающего дальнейшую обработку перехваченного соединения. Proc-объект будет выполнен после того, как приложение отправит клиенту заголовки. Доступ к стриму перехваченного соединения внутри Proc-объекта осуществляется через его единственный аргумент.
# Rack Partial Hijacking API example
#
class TasksController < ActionController::Base
def perform
response.headers["Content-Type"] = "text/event-stream"
response.headers["rack.hijack"] = proc do |stream|
Thread.new do
perform_task(stream)
end
end
head :ok
end
private
def perform_task(stream)
sse = ActionController::Live::SSE.new(stream, retry: 300, event: "taskProgress")
task_progress = 0 # progress percentage
while task_progress < 100 do
task_progress += 5
sse.write(progress: task_progress)
sleep 1
end
ensure
sse.close
end
end
В этом примере передача требуемых для установления SSE-соединения заголовков (headers) - это ответственность Rails-контроллера. Поэтому в этот раз ответ, возвращаемый Rails-контроллером, имеет значение, ведь именно в нем отправляются заголовки. Перехват соединения осуществляется уже после отправки заголовков, а дальнейшая его обработка идет по сценарию, описанному в Proc-объекте, который присваивается response.headers["rack.hijack"]
.
Заключение
В статье были приведены примеры использования технологии Server-Sent Events (SSE) в веб-приложении на Ruby on Rails. Рассмотрены способы обработки SSE-соединений с использованием Rails ActionController::Live и Rack Hijacking API. Было показано как Rack Hijacking API позволяет избежать блокировки серверных потоков.
Код демонстрационного Rails-приложения, включающего используемые в статье примеры, доступен по этой ссылке: https://github.com/chumakoff-blog/rails_sse_rack_hijacking_api_example.
Также стоит отметить, что Action Cable, инструмент Rails для работы с WebSockets, также использует Rack Hijacking API. Action Cable внутренне управляет соединениями и обеспечивает их многопоточную обработку. Возможно, в будущем в Rails появятся аналогичные инструменты для работы с SSE.
Комментарии (0)