Event Sourcing Rails
Thoughts on Event Sourcing in Ruby.
The idea is to have commands which record events in an event store.
A denormalizer listens to the EventStore for new events and creates highly denormalized record(s) in a relational database. Entities are then populated by querying the relational database, since it it totally denormalized no JOIN’s are required.
The relational tables serve the same purpose as relation views, the event data might be persisted to multiple tables. For example a command might create an event from which the denormalizer will create record in tables which are used for persistance of entities and for aggregate reporting.
The relational data can be recreated using the event store.
# an event
class Event
include Virtus
attribute :event_name, String
attribute :aggregate_id, Integer
attribute :data, Hashie::Mash, default: {}
def valid?
%w(event_name, aggregate_id, data).all?(:&present?)
end
# expose data keys as attributes
def method_missing(name, *args, &block)
data.respond_to?(name) ? data.send(name) : super
end
def respond_to?(name)
data.respond_to?(name) || super
end
end
event store, e.g. MongoDB
Using Mongoid might be overkill.
class EventStore
include Mongoid::Document # make this pluggable
include Wisper.publisher
field :event_name, type: String
field :aggregate_id, type: Integer
field :data, type: Hash
field :happend_at, type: Time
include FigLeaf
hide Mongoid::Document
# stores an event
def self.put(event)
create(event_name: event.name, aggregate_id: event.aggregate_id, data: data, happend_at: Time.now)
publish("event_created", event)
end
# returns all events for aggregate
def self.get(aggregate_id)
find(aggregate_id: aggregate_id).order('created_at asc').map(&:to_event)
end
def to_event
Event.new(attributes)
end
end
form for command (populated by UI)
class AddItemToBasket
class Form
include Virtus.model
include Validations # kachick/validation
attribute :basket_id, Integer
attribute :product_id, Integer
attribute :quantity, Integer, default: 1
attr_validator :basket_id, Integer
attr_validator :item_id, Integer
attr_validator :quantity, Integer
end
end
command
class AddItemToBasket
include EventSource
include Wisper.publisher
def call(form)
if form.valid?
# enforce any business rules here, e.g. user can only have 3 products in basket...
create_event(aggregate_id: form.basket_id, data: { product_id: form.product_id, form.qty, qty }) # see EventSource module
publish(:item_added_to_basket, form.basket_id)
true # consider response object
else
publish(:item_not_added_to_basket, form.basket_id)
false # consider response object, so reason for failure can be reported
end
end
end
# helper method for commands
# allow EventStore to be injected in initializer.
module EventSource
# creates an event
#
def create_event(event_name: nil, aggregate_id: , data: {})
event_name ||= self.class.name
event = Event.new(event_name, aggregate_id, data)
EventStore.put(event)
end
end
User Interface
e.g. Rails/Sinatra/Lotus controller
def create
@form = AddItemToBasket::Form.new(params[:form])
command = AddItemToBasket.new
command.on(:item_added_to_basket) { |basket_id| redirect_to basket_path(basket_id) }
command.on(:item_not_added_to_basket) { |basket_id| render action: :new }
command.call(@form)
end
def index
# this gets the whole basket with one SQL query and no JOIN's :)
# use projection instead a command instead? RetriveBasket.call
@basket = BasketProjection.get(current_user.basket_id) # just user_id, if basket is a singleton resource?
end
denormalizer
Subscribe to all events (in an initalizer)
EventStore.subscribe(Denormalizer.new, prefix: 'on', async: true)
class Denormalizer
# call denormalizer for received event
def on_new_event(event)
"Denormalizer::#{event.name.constantize}".call(event)
end
end
# find way to elegantly handle multiple projections per event, maybe just subscribe each projection to
# event store instead of mapping event name to a class, or move each projection to a private method.
class Denormalizer
class ItemAddedToBasket
def call(event)
product = ProductRepo.find(event.product_id)
BasketProjection.put(basket_id: event.aggregate_id,
product_id: product.id,
product_name: product.name,
product_desc: product.description,
quantity: event.qty)
StatsBasketProductProjection.put(product_id: product.id, product_name: product.name)
end
end
end
projection / repo
class BasketProjection < ActiveRecord::Base
# user_id, Integer, basket_id: Integer, product_name: varchar, product_sku: varchar, quantity: Integer
table_name :basket_items
include FigLeaf
hide ActiveRecord::Base
def self.put(basket_id, product_id, product_name, product_desc, quantity)
create(basket_id: basket_id,
product_id: data.product_id,
product_name: product.name,
product_desc: product.description,
quantity: data.qty)
end
def self.get(basket_id) # user_id instead?
records = where(basket_id)
user_id = record[0].user_id # suspect...
Basket.new(id: basket_id, user_id: user_id, items: record.map(&:attributes))
end
end
# projection / repo for statistical view of products added to basket
# provides data for a report to show which products are most often added to a basket
# Could be Redis instead of Relational
class BasketProductStatProjection < ActiveRecord::Base
# schema - product_id: integer, product_name: varchar, count: integer
include FigLeaf
hide ActiveRecord::Base, expect: [:all]
def self.put(product_id, product_name)
record = find_or_create(product_id: data.product_id, product_name: product_name)
record.increment_counter!
end
def self.get(product_id)
find(product_id)
end
def increment_counter
increment!(:count)
end
end
Entities
class Basket
include Virtus.model
attribute :id # aggregate root, so has id
attribute :items, Array[Item]
attribute :user, User
def items_addable?
items.size < 3
end
end
# no id required, value object
class Item
include Virtus.model
attribute :qty, Integer
attribute :product_name, String
attribute :product_desc, String
attribute :product_sku, String
end
class User
include Virtus.model
attribute :id, Integer
end
NOTES
- since a user can only have one basket, maybe user is the aggregate root and we can have a
- Basket projection, which would allow removal of basket_id and basket becomes a value object.
- The data stored would be user_id (aggregate_id), product_name, product_desc, quantity