Rails. Processing SSE connections without blocking server threads using Rack Hijacking API

This article discusses the use of Server-Sent Events (SSE) technology for establishing a persistent connection between a Ruby on Rails application and a client. Emphasis is placed on the usage of Rack Hijacking API to take over the connections and free the application’s web server threads in order to handle more connections using fewer server threads.

Server-Sent Events

The SSE technology makes it possible to create a persistent one-way HTTP connection allowing the server to send a stream of data to the client. Rails already has a feature for working with SSE: the ActionController::Live module. Here is an example of a Rails controller using this module.

# ActionController::Live usage example
#
class TasksController < ActionController::Base
  include ActionController::Live

  def perform
    response.headers["Content-Type"] = "text/event-stream"
    perform_task(response.stream)
  end

  private

  def perform_task(stream)
    sse = ActionController::Live::SSE.new(stream, retry: 300, event: "taskProgress")

    task_progress = 0 # progress percentage

    while task_progress < 100 do
      task_progress += 5
      sse.write(progress: task_progress)
      sleep 1
    end
  ensure
    sse.close
  end
end

The above code example demonstrates how a server can periodically send a long-running task completion status to a client. The client implementation code might look like this:

var source = new EventSource('/tasks/perform');

source.addEventListener('taskProgress', function(e) {
  var progress = JSON.parse(e.data).progress
  console.log(progress);

  if (progress >= 100) {
    source.close();
    console.log("Task Completed!");
  }
});

Working with SSE requires a threaded web-server, such as Puma, Passenger and Thin. Puma, which is the default server for Rails 5, is perfect for this purpose.

Blocking server threads problem

The number of requests that a threaded server can handle concurrently depends on the size of the thread pool. For instance, a single Puma process has, by default, 16 threads and is able to handle up to 16 client requests at the same time (thread per request mode).

In the case of SSE connections, if they are created the same way as shown in the above code, a separate server thread will be used for each connection. Therefore, when all server threads are busy, the next client won't be able to connect to the server until there is a free thread.

The obvious way to solve this problem is to increase the number of threads. However, this would result in unnecessary overhead, as maintenance of a thread is much more expensive than just keeping an SSE connection alive. This is especially critical when a large number of connections are being established.

Alternatively, a more efficient method that allows SSE connections to be established and processed without blocking web server threads would be to use the Rack Hijacking API.

Taking over the connection using the Rack Hijacking API

The Rack Hijacking API allows you to take over control of the connection and do with it what you please. You can free the server thread, while keeping the connection open even after the request processing cycle in the Rails controller has been completed.

The algorithm is the following:

  • Take control of the connection via the Rack Hijacking API.
  • Continue processing the hijacked connection in another program thread.
  • Return from the Rails controller, thus informing the web server that its request processing work has been completed and it can free up the server thread for processing new requests.

The Rack Hijacking API can be used in two modes:

  1. Full hijacking allows you to take over the connection immediately after it is established, and before the application (in our case, it is the Rails application) sends any data, including headers. This mode provides the full control over the connection, but also requires greater responsibility.

    Since you take control of the connection, some responsibility for handling it falls on you. In order to work with SSE you have to assign and send out the necessary response headers and you should close the connection when it is no longer needed.

  2. Partial hijacking gives control over the connection after the application sends headers. When using this mode, you are also responsible for closing the connection.

In our simple case, the choice of wich method to use doesn't play a significant role. However, we will look at both of them to show their fundamental differences.

Full hijacking

Full hijacking is performed through request env variables provided by Rack and available in the application. In a Rails controller you can access those variables through the request.env hash. In order to make sure that your application server supports the Rack Hijacking API, you can check the request.env['rack.hijack?'] value, which can be true or false.

Taking over the connection is performed by calling request.env['rack.hijack'].call, after which the hijacked connection stream is accessible through request.env['rack.hijack_io'].

# Rack Full Hijacking API usage example
#
class TasksController < ActionController::Base
  def perform
    request.env['rack.hijack'].call
    stream = request.env['rack.hijack_io']

    send_headers(stream)

    Thread.new do
      perform_task(stream)
    end

    response.close
  end

  private

  def perform_task(stream)
    sse = ActionController::Live::SSE.new(stream, retry: 300, event: "taskProgress")

    task_progress = 0 # progress percentage

    while task_progress < 100 do
      task_progress += 5
      sse.write(progress: task_progress)
      sleep 1
    end
  ensure
    sse.close
  end

  def send_headers(stream)
    headers = [
      "HTTP/1.1 200 OK",
      "Content-Type: text/event-stream"
    ]
    stream.write(headers.map { |header| header + "\r\n" }.join)
    stream.write("\r\n")
    stream.flush
  rescue
    stream.close
    raise
  end
end

In this example, the ActionController::Live module is no longer needed and hasn't been used. However, you can still use the ActionController::Live::SSE class, which provides a convenient API for sending messages via SSE in the required format. The difference is that this time when instantiating an object of that class, the stream of the hijacked connection env["rack.hijack_io"] is used, instead of response.stream that was provided by ActionController::Live in the previous example.

Notice that in order to return from Rails controller and free the main application thread, the work with the hijacked connection is continued in a separate program thread. In a more complex example, you could create a worker responsible for handling connections asynchronously. You could also put the connections in an array to organize a queue or, at least, track the number of open connections.

In the example, the Rails controller work is completed by calling response.close. However, how the work is completed doesn't actually matter in this case. The goal is to free the server thread as soon as possible. You could explicitly return any response using, for example, head or render methods, or you could do nothing, then the Rails controller would try to find and render an associated view and build the response by itself. In any case, that would have no effect on the communication with the client and the application further functioning since the client receives the request sent through the hijacked connection, not the one that is returned from the Rails controller.

Partial hijacking

Partial hijacking is performed by assigning a Proc object to the "rack.hijack" response header (in a Rails controller it is accessible through response.headers["rack.hijack"]). The Proc object should describe further processing of the hijacked connection and is called after the application has sent headers to the client. You can access the hijacked connection stream through the single argument passed to the Proc.

# Rack Partial Hijacking API usage example
#
class TasksController < ActionController::Base
  def perform
    response.headers["Content-Type"] = "text/event-stream"

    response.headers["rack.hijack"] = proc do |stream|
      Thread.new do
        perform_task(stream)
      end
    end

    head :ok
  end

  private

  def perform_task(stream)
    sse = ActionController::Live::SSE.new(stream, retry: 300, event: "taskProgress")

    task_progress = 0 # progress percentage

    while task_progress < 100 do
      task_progress += 5
      sse.write(progress: task_progress)
      sleep 1
    end
  ensure
    sse.close
  end
end

In this approach, the Rails controller is responsible for sending out headers required for establishing SSE connection. Therefore, this time the response returned by the Rails controller matters since it includes those headers. The connection is taken over after the headers have been sent. The further processing of the hijacked connection follows the scenario described in the Proc assigned to response.headers["rack.hijack"].

Conclusion

In this article, you saw examples of using Server-Sent Events (SSE) technology in a Ruby on Rails application. We discussed ways of processing SSE connections using Rails’s ActionController::Live module and the Rack Hijacking API. It has been shown how the Rack Hijacking API allows to avoid blocking web server threads.

A demo Rails application including examples used in this article is available at the following link: https://github.com/chumakoff-blog/rails_sse_rack_hijacking_api_example.

It is worthwhile to note that Action Cable, Rails feature for working with WebSockets, also uses the Rack Hijacking API under the hood. Action Cable internally manages connections and provides their multi-threaded processing. Perhaps in the future Rails will have similar tools for working with SSE.

7
Comments (4)
 ReplyAndrey Khataev
Hi! Thanks for your post! There is one thing in Rails' SSE, which I don't understand: why it "eats" servers threads, though it also uses parallel thread for streaming. This is quote from author of ActionController::Live module [article](https://tenderlovemaking.com/2012/07/30/is-it-live.html): "In order to make this feature work, I had to work around the Rack API and I did this by executing Live Stream actions in a new thread". Also it could be inspected in [sources](https://github.com/rails/rails/blob/master/actionpack/lib/action_controller/metal/live.rb#L291) May be you could explain? Thanks in advance!
0
 ReplyAnton Chumakov Andrey Khataev
Hi, Andrey! Thank you for your interest in this post! The responsibility of a *Rails* `controller` is to build and return a *Rack*-compatible `response` wich includes the HTTP response code, headers and the response body. The reason for executing a Live `action` in a new thread is that the `Rack response` must be returned immediately, while the body object that it includes is still being processed (receiving new portions of data). If the `action` weren't executed in a new thread, it would be impossible to return the `Rack response` until all the streaming logic is done. Thats's how the *Rails-Rack* stack works internally. "*executing Live Stream actions in a new thread*" means that it is only the provided by a developer `action` that is executed in a new thread, not everything. But what happens in the `action` is only a part of the work, a lot goes on behind the scenes. Furthermore, the code in the `action` uses the `response stream` from the main thread. The new thread only solves the *Rails-Rack* response transition problem. That's why you cannot shut down the controller and free the web server thread in that case. On the other hand, when you take over the connection using **Rack Hijacking API**, you get the full control over the client socket which is no longer dependent on the *Rails*'s controller `response`.
1
 ReplyAndrey Khataev Anton Chumakov
Thank you for your complete answer! So to summarize as I've understood: new thread in ActionController::Live module is a price we pay, replacing simple iterable object (such as Array or Enumerator), which we could pass to Rack, with some long running code (such as while loop in perform_task in your examples here). But this still requires use of thread from App servers' thread pool. And introducing hijacking API we don't stick to this thread, releasing it quikly?
1
 ReplyAnton Chumakov Andrey Khataev
Exactly! I couldn't have said it better myself.
0

Unknown user Sign in