Kafka is “publish-subscribe messaging rethought as a distributed commit log”.

We are exploring Kafka for doing something which looks like Event Sourcing from Domain Driven Design.

Kafka’s performance is pretty much constant with respect to data size so retaining events forever should not present a problem.

Install Kafka

These instructions are for MacOS, for other OS’s see the Kafka documentation.

$ brew install kafka

This will also install Zookeeper, a dependency of the Kafka client and server.

The Zookeeper dependency on Kafka client will be removed in version 0.9.

You can inspect the default Zookeeper config at /usr/local/etc/zookeeper/zoo.cfg. The default port is 2181.

$ zkServer start

Check Zookeeper is running. I found that I had to start Zookeeper in the foreground.

$ zkServer start-foreground

Zookeeper is distributed system for maintaining configuration, naming and synchronisation. Even though we are only starting one instance of Kafka and not a cluster we still need Zookeeper.

The default Kafka config can be found at /usr/local/etc/kafka/server.properties. The default port is 9092.

Start up Kafka:

$ kafka-server-start.sh /usr/local/etc/kafka/server.properties

Built-in tools

Kafka comes with clients for producing and consuming messages from the Kafka server.

Kafka demo

Producer

When using the builtin tools you either need to specify the broker (Kafka) or Zookeeper.

kafka-console-producer.sh --topic test --broker localhost:9092

You can ignore the warning “WARN Property topic is not valid”, it is wrong and fixed in the next release.

Now type in some messages each followed by enter. Type Ctrl+D when finished.

These messages have been published to Kafka, there is no imposed format to the messages, they can be free text, JSON, XML, whatever you like.

You can also pipe and dump messages to the producer command:

$ echo "hello Kafka" | kafka-console-producer.sh ...
$ kafka-console-producer.sh ... < my_file.json

In the case of the json file each message must be on a newline. Therefore any newlines, and any other control characters, within the json must be double escaped, e.g. \\n instead of \n.

Consumer

Start a consumer:

kafka-console-consumer.sh --topic test --zookeeper localhost:2181 --from-beginning

Using --from-beginning you will get all messages from offset 0 (the very first message). Without it you will only get new messages published from now onwards. This is great for event sourcing as it allows us to reply events from any point in the past. For example RabbitMQ is not suitable, on its own as an event store, since it does retain messages after consumption. By default Kafka does not retain messages forever, but can be configured to do so, more later.

Press Ctrl+C to close the consumer, but maybe just leave it running for now.

Unlike other queues Kafka does not maintain the position of each consumer in the event stream. Kafka consumers are responsible for telling Kafka the offset from which they want messages every time they fetch a batch of messages. Internally kafka-console-consumer uses the Java ‘high level consumer’ which maintains the consumers offset in Zookeeper. However every time you start kafka-console-consumer.sh it will register itself with a different name with Zookeeper and so you will always get messages from now onwards unless you explicitly give an offset.

Many of the configuration options for kafka-console-consumer must be specified using a configuration file instead of as command line arguments. For example if you want maintain an offset between restarts create a file, consumer1.conf, and add the following:

group.id 1000
consumer.id 1000

And start with:

kafka-console-consumer.sh --topic test --zookeeper localhost:2181 --consumer.config consumer1.conf

The consumer.id uniquely identifies a consumer, every consumer belongs to a group, only one consumer in each group receives a given message. Groups are used for read parallelism when distributing the load across a cluster. Since we are using a single instance we want every consumer to have a unique group.id, which is what happens by default using the build-in tools.

Try stopping the consumer, publishing a few messages and then restarting the consumer. You should get all messages from the point the consumer was stopped.

Kafka always sends a batch of messages. Each batch has a maximum size, the default is 1MB. This give you better control over memory consumption than specifiying the number of messages in the batch.

Ruby

$ gem install poseidon
$ irb

Producer

producer = Poseidon::Producer.new(["localhost:9092"], "my_test_producer")
messages = []
messages << Poseidon::MessageToSend.new("test", "from Ruby!!!")
producer.send_messages(messages)

There are additional options you can pass when initialising the producer, for example sync (blocks while sending messages) or async

Poseidon.logger = Logger.new('poseidon.log')

Consumer

consumer = Poseidon::PartitionConsumer.new("my_test_consumer", "localhost", 9092, "test", 0, :earliest_offset)
messages = consumer.fetch
messages[0].value

messages.map(&:value) # => ['..', '..', ... ]

The arguments for initialising the consumer are: client_id, host, port, topic, partition and offset.

fetch returns a batch of messages from the given offset to now.

The offset is any point in the Kafka message log, exactly the same as the offset in a relational database. You can think of offset as a time.

We decide to start receiving message from the earliest_offset, it could be replaced by 0

If we run consumer.fetch again we will get all the messages again.

As messages are consumed we need to persist the offset since messages are not discarded once the client has consumed them. This is ideal for an event store since we want the event to stay around forever.

The Poseidon gem interacts directly with the Kafka API. Therefore it does not maintain the offset in Zookeeper like the built-in consumer.

The other option is to use JRuby and the Kafka gem which wraps the High level Java consumer and behaves in the same way as the build-in consumer (they both wrap the high level Java conssumer).

Zookeeper and offsets

The kafka-console-consumer, via the ‘high level consumer’ Java object, uses Zookeeper to store the offset.

We can inspect Zookeeper using zookeeper-shell.sh.

zookeeper-shell.sh does not have readline support for navigating history using the arrow keys. rlwrap adds readline support to any command.

$ rlwrap zookeeper-shell.sh localhost:2181
  ls /consumers
  [console-consumer-40227, console-consumer-67157, console-consumer-99157]
  ls /consumers/console-consumer-40227/offsets/test

Batch publishing events

We now have a way to produce messages. For optimal results you want to batch as many events together as possible. It also allows for a sort of transaction. You can start storing messages and actually send them to Kafka as a batch if no exception is raised. I the case of a web application you might want to store all messages in an Array and only send them, as a batch, when the controller action has complete.

Simplistically:

class ApplicationController
  after_filter :send_events

  private

  def fire_event(event)
    @events << Poseidon::MessageToSend.new(event.fetch(:topic), event.fetch(:payload))
  end

  def events
    @events ||= []
  end

  def send_events
    producer.send_messages(@events)
  end
end

Guarantees

Producer

A producer can be configured to wait for an acknowledgement that a message has been committed to the Kafka log. This allows us a higher level of certainty that a message was received. For example a producer experiences a network failure during sending a message, was it received or not? By enabling ack we can be sure at the cost of having to wait for the ack.

Another option is to include a uuid with every message. If an error occurs during publishing we can resend the message. The consumer must then keep a track of the message uuid’s it has processed and skip any it has already received.

Consumer

Kafka has one log for all consumers, it does not prepare a stream per consumers and track which messages have been delivered. It is up to the consumer to say where it would like to start receiving messages from.

If a consumer was never stopped we could store the offset in memory. Or if the consumer was guaranteed to be idempotent.

def on_event(event)
  process_event
  increment_offset
end

What happens if the event is processed but an error occurs during the increment_offset. The message will get redelivered.

def on_event(event)
  increment_offset
  process_event
end

We could increment the offset then process the event, but what happens if an error occurs during processing the message will not get redelivered.

def on_event(event)
  increment_offset
  process_event
rescue
  decrement_offset
end

This suffers the same problem, the error can occur when decrementing the offset.

An option if you are using a relational database to store data and offset is to store your data and offset within a transaction:

def on_event(event)
  ActiveRecord::Base.transaction do
    process_event
    increment_offset
  end
end

The data and offset are stored together, or not at all. Therefore you are guaranteed to process each message only once.

In the end you control the guarantees and make a choice about the tradeoffs.

Retention

When using Kafka as an event store there is a very important setting you need to change.

log.retention.hours=168

This means the events are deleted after 168 hours.

In Kafka 0.8 the highest you can set it to is 2147483647, which is 89,478,485 days or ~250,000 years. This should be plenty :)

log.retention.hours = 2147483647
log.retention.bytes = -1

In Kafka 0.9 (https://issues.apache.org/jira/browse/KAFKA-1990) you can set this to -1 to store events forever.

Next

The next issues we need to investigate are:

  • How to manage the schema of the event payload, e.g. Avro, a schema format in JSON which can be embedded with the payload.
  • How to name topics, by aggregate?
  • How to route event streams to consumers.
  • How consumers can maintain their offset.
  • How consumers can maintain local state for cases where they need to aggregate multiple event streams (more examples here)
  • Concurrency for consumers and producers

We will also look at Samaza, an message processing framework.