Thoughts on a simple stream processor using Kafka and Ruby.

Only needs to run on one machine, no scaling across multiple machines. The only scaling issue is offset sync between multiple processes.

Derive state from event stream.

  • register handlers (object with #call(event)) to a topic
  • handler gets an isolated database for storing local state
  • app maintains offset for each handler between restarts
  • handler can reject an event
  • handler can be reset to any offset
  • topic can be reset to any offset
  • app starts a new process to connect to Kafka
  • each handler receives events concurrently (or not)
  • buildin support for JSON/Arvo payload

Off the top of the head API:

app = Axe::App.new

app.configure do |config|
  config.error_handler = lambda { |exception| Airbrake... }
end

# route topics to handlers
#
app.processors.add(topic: "studies/study_created", 
                   id: "studies__create_study",
                   handler: CreateStudy.new)

app.processors.add(topic: "studies/study_created", 
                   id: "studies__do_stuff",
                   handler: lambda { |event| ... })

# optionally set offsets
#
app.processor["do_stuff"].offset     # => 50 (already processed 50 events)
app.processor["do_stuff"].offset = 0 # force starting from first event, this is assuming it is idempotent or the derived data has been deleted

app.topics["studies"] # => [...]

# start
#
app.start # connect to Kafka and start consuming messages

Storage of offset needs to be atomic, e.g if file lock it during write or INCR for Redis counter.

Ideally one process and a thread per handler (client might already handle this otherwise we can use Celluloid).

require 'app/models/study'

class CreateStudy
  include Axe::StreamProcessor
  
  def call(event)
    # ...
  end
end

CreateStudy gets access to a database for local state., e.g. SQLite, Redis, MySQL, YAML file, PStore, in-memory Hash etc.

Builtin support for Arvo, it is assumed the message is JSON+Arvo and it is deserialized to a typed Hash.

When an event is processed (#call exits without exception) then the current offset is incremented atomcially.

Use existing Kafka Ruby client to connect and recevive events.

An error handler can be configured to report errors.

Example:

Count how many studies exist

class StudyCounter
  def call(event)
    redis.study_count.inc!
  end
end

Further reading: