Thoughts on Event Sourcing in Ruby.

The idea is to have commands which record events in an event store.

A denormalizer listens to the EventStore for new events and creates highly denormalized record(s) in a relational database. Entities are then populated by querying the relational database, since it it totally denormalized no JOIN’s are required.

The relational tables serve the same purpose as relation views, the event data might be persisted to multiple tables. For example a command might create an event from which the denormalizer will create record in tables which are used for persistance of entities and for aggregate reporting.

The relational data can be recreated using the event store.

# an event
class Event
  include Virtus
  
  attribute :event_name,   String
  attribute :aggregate_id, Integer
  attribute :data,         Hashie::Mash, default: {}
  
  def valid?
    %w(event_name, aggregate_id, data).all?(:&present?)
  end
  
  # expose data keys as attributes
  def method_missing(name, *args, &block)
    data.respond_to?(name) ? data.send(name) : super
  end
  
  def respond_to?(name)
    data.respond_to?(name) || super
  end
end

event store, e.g. MongoDB

Using Mongoid might be overkill.

class EventStore
  include Mongoid::Document # make this pluggable
  include Wisper.publisher
  
  field :event_name,   type: String
  field :aggregate_id, type: Integer
  field :data,         type: Hash
  field :happend_at,   type: Time
  
  include FigLeaf
  
  hide Mongoid::Document
  
  # stores an event
  def self.put(event)
    create(event_name: event.name, aggregate_id: event.aggregate_id, data: data, happend_at: Time.now)
    publish("event_created", event)
  end
  
  # returns all events for aggregate
  def self.get(aggregate_id)
    find(aggregate_id: aggregate_id).order('created_at asc').map(&:to_event)
  end
  
  def to_event
    Event.new(attributes)
  end
end

form for command (populated by UI)

class AddItemToBasket
  class Form
    include Virtus.model
    include Validations # kachick/validation
    
    attribute :basket_id,      Integer
    attribute :product_id,     Integer
    attribute :quantity,       Integer, default: 1
    
    attr_validator :basket_id, Integer
    attr_validator :item_id,   Integer
    attr_validator :quantity,  Integer
  end
end

command

class AddItemToBasket
  include EventSource
  include Wisper.publisher
  
  def call(form)
   if form.valid?
      # enforce any business rules here, e.g. user can only have 3 products in basket...
      create_event(aggregate_id: form.basket_id, data: { product_id: form.product_id, form.qty, qty }) # see EventSource module
      publish(:item_added_to_basket, form.basket_id)
      true # consider response object
    else
      publish(:item_not_added_to_basket, form.basket_id)
      false # consider response object, so reason for failure can be reported
    end
  end
end

# helper method for commands
# allow EventStore to be injected in initializer.
module EventSource
  # creates an event
  #
  def create_event(event_name: nil, aggregate_id: , data: {})
    event_name ||= self.class.name
    event = Event.new(event_name, aggregate_id, data)
    EventStore.put(event)
  end
end

User Interface

e.g. Rails/Sinatra/Lotus controller

def create
  @form = AddItemToBasket::Form.new(params[:form])
  
  command = AddItemToBasket.new
  command.on(:item_added_to_basket)     { |basket_id| redirect_to basket_path(basket_id) }
  command.on(:item_not_added_to_basket) { |basket_id| render action: :new }
  command.call(@form)
end

def index
  # this gets the whole basket with one SQL query and no JOIN's :)
  # use projection instead a command instead? RetriveBasket.call
  @basket = BasketProjection.get(current_user.basket_id) # just user_id, if basket is a singleton resource?
end

denormalizer

Subscribe to all events (in an initalizer)

EventStore.subscribe(Denormalizer.new, prefix: 'on', async: true)

class Denormalizer
  # call denormalizer for received event
  def on_new_event(event)
    "Denormalizer::#{event.name.constantize}".call(event)
  end
end

# find way to elegantly handle multiple projections per event, maybe just subscribe each projection to
# event store instead of mapping event name to a class, or move each projection to a private method.
class Denormalizer
  class ItemAddedToBasket
    def call(event)
      product = ProductRepo.find(event.product_id)
      
      BasketProjection.put(basket_id:    event.aggregate_id, 
                           product_id:   product.id, 
                           product_name: product.name, 
                           product_desc: product.description,
                           quantity:     event.qty)
      
      StatsBasketProductProjection.put(product_id: product.id, product_name: product.name)
    end
  end
end

projection / repo

class BasketProjection < ActiveRecord::Base

  # user_id, Integer, basket_id: Integer, product_name: varchar, product_sku: varchar, quantity: Integer
  
  table_name :basket_items
  
  include FigLeaf
  
  hide ActiveRecord::Base
  
  def self.put(basket_id, product_id, product_name, product_desc, quantity)
    create(basket_id:    basket_id, 
           product_id:   data.product_id, 
           product_name: product.name, 
           product_desc: product.description,
           quantity:     data.qty)
  end
  
  def self.get(basket_id)  # user_id instead?
    records = where(basket_id)
    user_id = record[0].user_id # suspect...
    Basket.new(id: basket_id, user_id: user_id, items: record.map(&:attributes))
  end
end

# projection / repo for statistical view of products added to basket
# provides data for a report to show which products are most often added to a basket
# Could be Redis instead of Relational
class BasketProductStatProjection < ActiveRecord::Base
  # schema - product_id: integer, product_name: varchar, count: integer
  include FigLeaf
  
  hide ActiveRecord::Base, expect: [:all]
  
  def self.put(product_id, product_name)
    record = find_or_create(product_id: data.product_id, product_name: product_name)
    record.increment_counter!       
  end
  
  def self.get(product_id)
    find(product_id)
  end
  
  def increment_counter
    increment!(:count)
  end
end

Entities

class Basket
  include Virtus.model
  
  attribute :id # aggregate root, so has id
  attribute :items, Array[Item]
  attribute :user,  User
  
  def items_addable?
    items.size < 3
  end
end

# no id required, value object
class Item
  include Virtus.model
  
  attribute :qty,          Integer
  attribute :product_name, String
  attribute :product_desc, String
  attribute :product_sku,  String
end

class User
  include Virtus.model
  
  attribute :id, Integer
end

NOTES

  • since a user can only have one basket, maybe user is the aggregate root and we can have a
  • Basket projection, which would allow removal of basket_id and basket becomes a value object.
  • The data stored would be user_id (aggregate_id), product_name, product_desc, quantity