# initializer
$event_store = EventStore.new
event_store.subscribe(StudyDenormalizer.new, prefix: ‘on’)
# controller
# push a new event (write database)
uuid = UUID.new
event_store.push(aggregate_id: uuid, event: ‘StudyCreated’, data: { … })
# newly created record (read database)
@study = Study.find(uuid)
# Listener (which will update the canonical data store (read database))
class StudyDenormalizer
def on_study_created(aggregate_id, data) # could be a class...
study = Study.create(data.merge(uuid: aggregate_id))
end
def on_study_updated(aggregate_id, data)
study = Study.find(aggregate_id)
study.update_attributes(data)
end
# never deleted, discountinued_at is set
def on_study_discontinued
study = Study.find(aggregate_id)
study.update_attributes(data)
end
def process(event, aggregate_id, data)
send("on_#{event}", aggregate_id, data)
end
end
# Get all history
stream = $event_store.stream(aggregate_id: aggregate_id)
steam.each do |aggregate_id, event, data|
puts "event: #{event} => #{data}"
end
# We can delete our read database and reply an event steam to recreate it
Study.delete_all
stream = $event_store.stream(event: /study_*/) # regex
steam.each do |aggregate_id, event, data| # lazy enum
StudyDenormalizer.process(event, aggregate_id, data)
end