Kafka is “publish-subscribe messaging rethought as a distributed commit log”.
We are exploring Kafka for doing something which looks like Event Sourcing from Domain Driven Design.
Kafka’s performance is pretty much constant with respect to data size so retaining events forever should not present a problem.
These instructions are for MacOS, for other OS’s see the Kafka documentation.
$ brew install kafka
This will also install Zookeeper, a dependency of the Kafka client and server.
The Zookeeper dependency on Kafka client will be removed in version 0.9.
You can inspect the default Zookeeper config at
/usr/local/etc/zookeeper/zoo.cfg. The default port is 2181.
$ zkServer start
Check Zookeeper is running. I found that I had to start Zookeeper in the foreground.
$ zkServer start-foreground
Zookeeper is distributed system for maintaining configuration, naming and synchronisation. Even though we are only starting one instance of Kafka and not a cluster we still need Zookeeper.
The default Kafka config can be found at
/usr/local/etc/kafka/server.properties. The default port is 9092.
Start up Kafka:
$ kafka-server-start.sh /usr/local/etc/kafka/server.properties
Kafka comes with clients for producing and consuming messages from the Kafka server.
When using the builtin tools you either need to specify the broker (Kafka) or Zookeeper.
kafka-console-producer.sh --topic test --broker localhost:9092
You can ignore the warning “WARN Property topic is not valid”, it is wrong and fixed in the next release.
Now type in some messages each followed by enter. Type Ctrl+D when finished.
These messages have been published to Kafka, there is no imposed format to the messages, they can be free text, JSON, XML, whatever you like.
You can also pipe and dump messages to the producer command:
$ echo "hello Kafka" | kafka-console-producer.sh ... $ kafka-console-producer.sh ... < my_file.json
In the case of the json file each message must be on a newline. Therefore any newlines, and any other control characters, within the json must be double escaped, e.g.
\\n instead of
Start a consumer:
kafka-console-consumer.sh --topic test --zookeeper localhost:2181 --from-beginning
--from-beginning you will get all messages from offset 0 (the very first message). Without it you will only get new messages published from now onwards.
This is great for event sourcing as it allows us to reply events from any point in the past.
For example RabbitMQ is not suitable, on its own as an event store, since it does retain messages after consumption.
By default Kafka does not retain messages forever, but can be configured to do
so, more later.
Press Ctrl+C to close the consumer, but maybe just leave it running for now.
Unlike other queues Kafka does not maintain the position of each consumer in the event stream.
Kafka consumers are responsible for telling Kafka the offset from which they want messages every time they fetch a batch of messages.
kafka-console-consumer uses the Java ‘high level consumer’ which maintains the consumers offset in Zookeeper. However every time you start
kafka-console-consumer.sh it will register itself with a different name with Zookeeper and so you will always get messages from now onwards unless you explicitly give an offset.
Many of the configuration options for
kafka-console-consumer must be specified using a configuration file instead of as command line arguments. For example if you want maintain an offset between restarts create a file,
consumer1.conf, and add the following:
group.id 1000 consumer.id 1000
And start with:
kafka-console-consumer.sh --topic test --zookeeper localhost:2181 --consumer.config consumer1.conf
consumer.id uniquely identifies a consumer, every consumer belongs to a group, only one consumer in each group receives a given message. Groups are used for read parallelism when distributing the load across a cluster. Since we are using a single instance
we want every consumer to have a unique group.id, which is what happens by default using the build-in tools.
Try stopping the consumer, publishing a few messages and then restarting the consumer. You should get all messages from the point the consumer was stopped.
Kafka always sends a batch of messages. Each batch has a maximum size, the default is 1MB. This give you better control over memory consumption than specifiying the number of messages in the batch.
$ gem install poseidon $ irb
producer = Poseidon::Producer.new(["localhost:9092"], "my_test_producer") messages =  messages << Poseidon::MessageToSend.new("test", "from Ruby!!!") producer.send_messages(messages)
There are additional options you can pass when initialising the producer, for example
sync (blocks while sending messages) or async
Poseidon.logger = Logger.new('poseidon.log')
consumer = Poseidon::PartitionConsumer.new("my_test_consumer", "localhost", 9092, "test", 0, :earliest_offset) messages = consumer.fetch messages.value messages.map(&:value) # => ['..', '..', ... ]
The arguments for initialising the consumer are: client_id, host, port, topic, partition and offset.
fetch returns a batch of messages from the given offset to now.
The offset is any point in the Kafka message log, exactly the same as the offset in a relational database. You can think of offset as a time.
We decide to start receiving message from the
earliest_offset, it could be replaced by
If we run
consumer.fetch again we will get all the messages again.
As messages are consumed we need to persist the offset since messages are not discarded once the client has consumed them. This is ideal for an event store since we want the event to stay around forever.
The Poseidon gem interacts directly with the Kafka API. Therefore it does not maintain the offset in Zookeeper like the built-in consumer.
The other option is to use JRuby and the Kafka gem which wraps the High level Java consumer and behaves in the same way as the build-in consumer (they both wrap the high level Java conssumer).
Zookeeper and offsets
kafka-console-consumer, via the ‘high level consumer’ Java object, uses Zookeeper to store the offset.
We can inspect Zookeeper using
zookeeper-shell.sh does not have readline support for navigating history using the arrow keys. rlwrap adds readline support to any command.
$ rlwrap zookeeper-shell.sh localhost:2181 ls /consumers [console-consumer-40227, console-consumer-67157, console-consumer-99157] ls /consumers/console-consumer-40227/offsets/test
Batch publishing events
We now have a way to produce messages. For optimal results you want to batch as many events together as possible. It also allows for a sort of transaction. You can start storing messages and actually send them to Kafka as a batch if no exception is raised. I the case of a web application you might want to store all messages in an Array and only send them, as a batch, when the controller action has complete.
class ApplicationController after_filter :send_events private def fire_event(event) @events << Poseidon::MessageToSend.new(event.fetch(:topic), event.fetch(:payload)) end def events @events ||=  end def send_events producer.send_messages(@events) end end
A producer can be configured to wait for an acknowledgement that a message has been committed to the Kafka log. This allows us a higher level of certainty that a message was received. For example a producer experiences a network failure during sending a message, was it received or not? By enabling ack we can be sure at the cost of having to wait for the ack.
Another option is to include a uuid with every message. If an error occurs during publishing we can resend the message. The consumer must then keep a track of the message uuid’s it has processed and skip any it has already received.
Kafka has one log for all consumers, it does not prepare a stream per consumers and track which messages have been delivered. It is up to the consumer to say where it would like to start receiving messages from.
If a consumer was never stopped we could store the offset in memory. Or if the consumer was guaranteed to be idempotent.
def on_event(event) process_event increment_offset end
What happens if the event is processed but an error occurs during the increment_offset. The message will get redelivered.
def on_event(event) increment_offset process_event end
We could increment the offset then process the event, but what happens if an error occurs during processing the message will not get redelivered.
def on_event(event) increment_offset process_event rescue decrement_offset end
This suffers the same problem, the error can occur when decrementing the offset.
An option if you are using a relational database to store data and offset is to store your data and offset within a transaction:
def on_event(event) ActiveRecord::Base.transaction do process_event increment_offset end end
The data and offset are stored together, or not at all. Therefore you are guaranteed to process each message only once.
In the end you control the guarantees and make a choice about the tradeoffs.
When using Kafka as an event store there is a very important setting you need to change.
This means the events are deleted after 168 hours.
In Kafka 0.8 the highest you can set it to is
2147483647, which is 89,478,485 days or ~250,000 years. This should be plenty :)
log.retention.hours = 2147483647 log.retention.bytes = -1
In Kafka 0.9 (https://issues.apache.org/jira/browse/KAFKA-1990) you can set this to -1 to store events forever.
The next issues we need to investigate are:
- How to manage the schema of the event payload, e.g. Avro, a schema format in JSON which can be embedded with the payload.
- How to name topics, by aggregate?
- How to route event streams to consumers.
- How consumers can maintain their offset.
- How consumers can maintain local state for cases where they need to aggregate multiple event streams (more examples here)
- Concurrency for consumers and producers
We will also look at Samaza, an message processing framework.