How to build a read model with Rails Event Store Projection

… and check why 5600+ Rails engineers read also this

Recently I faced interesting challenge in one of our customer’s application. Imagine that you take a test after which you get a personalised reports about your skills level. Existing mechanism for that was time and resource consuming. People had to wait for e-mail delivery with PDF-generated report several hours due to several constraints, which I would prefer not to dive into.

The solution was obvious — let’s progressively build read model every time someone answers the question. After the test is done, the report will be available instantly in a web ui.

Let’s start with a domain event

module TestExecution
  class AnswerRegistered < ::Event
    attribute :participant_id, Types::Integer
    attribute :test_id, Types::Integer
    attribute :question_id, Types::Integer
    attribute :answer_id, Types::Integer
    attribute :skill_id, Types::Integer
    attribute :score, Types::Float
    attribute :time_elapsed, Types::Integer
  end
end

Nothing fancy, a typical domain event powered by Rails Event Store, with a schema defined, keeping identifiers of involved entities and score calculated by the domain service which publishes the event above when its job is done.

Build the read model

Next building block is the asynchronous handler. Why asynchronous? Not to waste time on participant’s request—response cycle and lower their satisfaction from using our application:

module Reporting
  class CalculateparticipantReport < ApplicationJob
    prepend RailsEventStore::AsyncHandler

    def perform(event)
      participant_id = event.data.fetch(:participant_id)
      test_id        = event.data.fetch(:test_id)

      link_to_stream(event, test_id, participant_id)

      scores = calculate_scores(test_id, participant_id)

      ParticipantReport.write(
        *prepare_data_for_read_model(scores, test_id, participant_id)
      )
    end

    private

    def prepare_data_for_read_model(scores, test_id, participant_id)
      # magic happens, querying additional info, formatting data
      [scores, participant, test]
    end

    def calculate_scores(test_id, participant_id)
      RailsEventStore::Projection
        .from_stream(stream_name(test_id, participant_id))
        .init(-> { Hash.new { |scores, skill_id| scores[skill_id] = { score: 0, number_of_scores: 0 } })
        .when(
            SurveyExecution::AnswerRegistered,
            ->(state, event) do
              skill_id = event.data.fetch(:skill_id)
              state[skill_id][:score] += event.data.fetch(:score)
              state[skill_id][:number_of_scores] += 1
            end
          )
            .run(Rails.configuration.event_store)
            .reduce({}) do |scores, (skill_id, values)|
            scores[skill_id] = values[:score] / values[:n]
            scores
          end
    end

    def link_to_stream(event, test_id, participant_id)
      begin
        Rails.configuration.event_store.link(
          event.event_id,
          stream_name: stream_name(test_id, participant_id)
        )
      rescue RubyEventStore::EventDuplicatedInStream
      end
    end

    def stream_name(test_id, participant_id)
      "ParticipantReport$#{test_id}-#{participant_id}"
    end
  end
end

What happens here:

  1. AnswerRegistered event is linked to a dedicated report stream participantReport$123-456. By doing that, we can scope events in a way we desired, in our case, the stream contains id of a test and participant. RubyEventStore::EventDuplicatedInStream is being rescued to support deliver at least once strategy.
  2. Then, with the use of Projection reading from our dedicated stream ParticipantReport$123-456 all the scores are grouped by the skill_id, accumulated with additional info ( number of elements, specifically). After the projection is done, reduce is being used to do the math, resulting in average scores for each skill.
  3. When the scores are ready, further calculations come and additional info for read model (like skill names, etc.) is gathered and formatted. There’s no need to use any other query to present it to the participant.

How the read model looks like?

id report_slug participant_name test_name skills
997 cf827527c552 Jane Doe Important skillz test [{name: 'Sleeping', average: '2.5', global: '2.2'}, #...]
998 6adb1fc1d201 Ugly Joe Programming skills assessment [{name: 'Ruby', average: '4.0', global: '2.0'}, #...]
999 4cece2d44ae0 Mr Kobayashi Smartness test [{name: 'Whatever', average: '5.0', global: '1.0'}, #...]

Vaughn Vernon in his „Implementing Domain-Driven Design” book describes read model this way:

The query model is a denormalized data model. It is not meant to deliver domain behavior, only data for display (and possibly reporting). If this data model is a SQL database, each table would hold the data for a single kind of client view (display). The table can have many columns, even a superset of those needed by any given user interface display view. Table views can be created from tables, each of which is used as a logical subset of the whole.

Denormalization is not a popular technique in the Rails world. What it gives? Complex, often many queries replaced with simple lookup for a single record which contains all the data to be displayed in a pre—formatted manner.

How to deal with concurrency issues

Please, have a look at the read model implementation:

module Reporting
  class ParticipantReport < ApplicationRecord
    def self.write(scores, participant, test)
      ApplicationRecord.transaction do
        advisory_lock(participant.id, test.id)

        report = find_or_initialize_by(participant_id: participant.id, test_id: test.id)

        report.slug = SecureRandom.hex(6)
        report.participant_name = participant.name
        report.scores = scores
        report.save!
      end
    end

    private_class_method def self.advisory_lock(participant_id, test_id)
      args = [participant_id, test_id].join
      ApplicationRecord.connection.execute("SELECT pg_advisory_xact_lock(hash_64('#{args}'))")
    end
  end
end

It’s mostly obvious. One might think that, there’s already with_lock or simply lock! method in ActiveRecord. Yes, it is. However, it won’t work for the not–yet–existing records because it uses lock for update and on first write there’s no update operation, but create. So, in many cases ActiveRecord::RecordNotUnique errors would appear if two or more concurrent threads would try to insert the row for the first time. Thanks to pg_advisory_xact_lock ( key bigint ) → void we can obtain an exclusive transaction-level advisory lock, waiting if necessary. Yet another reason to use PostgreSQL.

We were wrong on computing lock key

Previous implementation of advisory_lock method in this blogpost didn’t provide identical hash across different MRI processes and code was prone to ActiveRecord::RecordNotUnique errors.

It required two different processes using two different connections to prove that previous advisory_lock didn’t work as expected and allowed share of the same resource:

private_class_method def self.advisory_lock(participant_id, test_id)
  bigint = [participant_id, test_id].join.hash
  ApplicationRecord.connection.execute("SELECT pg_advisory_xact_lock(#{bigint})")
end

Why this happens is explained in Ruby’s Object#hash docs:

The hash value for an object may not be identical across invocations or implementations of Ruby. If you need a stable identifier across Ruby invocations and implementations you will need to generate one with a custom method.

We fixed it by creating custom hash_64() function in our PostgreSQL database:

create function hash_64(_identifier character varying) returns bigint
    language plpgsql
as
$$
DECLARE
hash bigint;
BEGIN
  select left('x' || md5(_identifier), 16)::bit(64)::bigint into hash;
  return hash;
  END;
  $$;

alter function hash_64(varchar) owner to dbuser;

which then was used to fix the implementation of advisory_lock:

private_class_method def self.advisory_lock(participant_id, test_id)
  args = [participant_id, test_id].join
  ApplicationRecord.connection.execute("SELECT pg_advisory_xact_lock(hash_64('#{args}'))")
end

The hash_64() implementation was taken from Eventide codebase.

How to use the read model

# app/controllers/test_results_controller.rb
class TestResultsController < ApplicationController
  def show
    render locals: { report: ParticipantReport.find_by!(report_slug: params[:slug]) }
  end
end
\# app/views/test_results/show.html.haml
%h1
  = "Personalised report for #{report.participant_name}"
%h2= report.test_name
- report.skills.each do |skill|
  %div
    = "Your performance in #{skill[:name]} is"
    = "#{skill[:average]} comparing to"
    = "#{skill[:global]} earned by others"

But…

What if another field is required or there was a bug in the calculations? Not a problem, read models can be thrown out and rebuild with ease, because all the history behind them is known — thanks to domain events.

Btw. You might be also interested in other posts on read models on our blog.

You might also like