Building a scalable exactly-once data pipeline with Kafka and Camel
The Problem
In this post I’m looking to solve the problem of consuming events from a Kafka topic using Camel to meet the following requirements:
- Exactly once delivery: without exception, all events must be written into a RDBMS table only once.
2. Consumer clustering: if a node goes down, processing should continue on other available nodes.
3. Load balancing: in order to adapt to event spikes, events should be processed by multiple consumers in parallel.
Application Stack
Kafka stores events in topics. A topic is materialized into one or more partitions. While more then one producer can write to a topic, in our scenario we’ll use only one. This article focuses on the consumer side. Here are few Kafka concepts that are important to note:
- when a producer writes to a topic, Kafka selects a partition based on the key supplied by the producer (producer : partition cardinality is N:N).
- the key corresponds to an entity id and is not a unique event id; if speed readings from a vehicle are events then the event key could be the vehicle registration number.
- consumer groups allow multiple processes to read messages from a topic; a consumer group member can read from multiple partitions, but each partition in a topic can be read by only one consumer in the group (partition : consumer group member cardinality is N:1)
You can read details about Kafka consumers at Kafka javadoc link https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html, specifically Consumer Groups and Topic Subscriptions section.
Camel has a Kafka component than can be configured to send or receive messages to/from a Kafka topic. When receiving, Camel allows applications to control:
a) maximum number of events a poll() operation fetches, and
b) if/when the offset in a partition is advanced.
Note that consumer offset is always maintained by Kafka, and Camel does not support a consumer seeking to a specific offset.
RDBMSs provide well-established storage capabilities to many organizations. In this example we’ll use Microsoft SQL Server and leverage atomic transactions within T-SQL stored procedures to ensure data consistency and enforce separation of concerns within our application.
Kafka Word Pipeline
For illustrating the solution, we’re using a simple data pipeline based on Kafka cluster example from Camel in Action book available at https://github.com/camelinaction/camelinaction2/tree/master/chapter17/cluster-kafka.
A producer sends Word events to a Kafka topic. The topic has two partitions so that two consumers can receive events in parallel.
A Word event has the following elements:
Unique Identifier (UID): as the name implies, no two events share the same UID and they are sequential; this is important for checking no messages are lost during failure scenarios
Timestamp (producedAt): time the event was produced; values is used to calculate end-to-end latency across our distributed system.
Event Key: this is the actual word sent randomly from a list of fixed words and it’s the same as the message payload.
The picture below depicts all the components that work together in our Kafka data pipeline.
Data Pipeline Code
You can find step-by-step instructions on how to run the code highlighted here at https://github.com/lucian-d/lab/tree/master/camel-kafka-words.
The producer uses the following WordBean method to generate an event:
public String generateWord() {
Timestamp timestamp = new Timestamp(System.currentTimeMillis());
String tsStr = sdf.format(timestamp);
int ran = new Random().nextInt(words.length);
return "#" + ++counter + "*" + words[ran] + "@" + tsStr;
}
You can see we use a simple #uid*payload@timestamp template. Next, this bean is embedded into a Camel route:
// use a timer to trigger every x milliseconds and generate an #uid*word@timestamp event
// which is sent to kafka
from("timer:producer?period=10")//inject 100 events per second
.bean(new WordBean())
.setHeader("kafka.KEY", body().regexReplaceAll("#\\d+\\*","").regexReplaceAll("@.+",""))
.to("kafka:words2")
.to("log:words?groupInterval=1000");
The route sends events to Kafka words2 topic which is created as follows:
.\kafka-topics.bat --create --topic words2 --replication-factor 1 --partitions 2 --bootstrap-server localhost:9092
Consumers use the following Camel route to poll words2 topic:
from("kafka:words2?groupId=mygroup&maxPollRecords=50&autoOffsetReset=earliest&autoCommitEnable=false&allowManualCommit=true&breakOnFirstError=true")
.routeId(_name)
.process(exchange -> {
String body = exchange.getIn().getBody().toString();
// body::= #uid*word@ts
String[] parts = body.split("[#*@]");
Message msg = exchange.getIn();
msg.setHeader("eventUID", parts[1]);
msg.setHeader("word", parts[2]);
msg.setHeader("producedAt", parts[3]);
msg.setHeader("consumer", _name);
msg.setHeader("PiB", _posInBatch);
})
.log(_name +
" got word ${body}; topic ${headers[kafka.TOPIC]}; prt# ${headers[kafka.PARTITION]};"
+ " offset ${headers[kafka.OFFSET]}; key ${headers[kafka.KEY]}; UID ${headers[eventUID]};"
+ " LRBC ${headers[kafka.LAST_RECORD_BEFORE_COMMIT]}; LPR ${headers[kafka.LAST_POLL_RECORD]}; PiB ${headers[PiB]};" )
.to("mssql-sp:[dbo].[UpsertEvent](VARCHAR ${headers[eventUID]}, VARCHAR ${headers[word]}, "
+"VARCHAR ${headers[consumer]}, VARCHAR ${headers[kafka.PARTITION]}, "
+"VARCHAR ${headers[kafka.OFFSET]}, VARCHAR ${headers[producedAt]})")
.process(exchange -> {
// manually commit offset if it is last message in batch
Boolean lastOne = exchange.getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class);
if (lastOne) {
KafkaManualCommit manual =
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
if (manual != null) {
//System.out.println("manually committing the offset for batch");
manual.commitSync();
}
}
});
Consumer route invokes the following idempotent stored procedure:
CREATE PROCEDURE UpsertEvent
@uid int,
@word nvarchar(100),
@consumer nvarchar(10),
@tpPrt int,
@offset int,
@producedAt datetime2
AS
BEGIN TRANSACTION
IF (SELECT count(*)
FROM Words
WHERE EventUID=@uid) > 0
UPDATE Words SET [ConsumedAt] = GETDATE(), IsDuplicate=1
WHERE EventUID = @uid
ELSE
INSERT INTO Words
(EventUID, Word, Consumer, TpPrt, Offset, IsDuplicate, ProducedAt)
VALUES
(@uid, @word, @consumer, @tpPrt, @offset, 0, @producedAt)
COMMIT
Logic above updates or inserts records into the following table:
create table Words
(
EventUID int,
Word char(100),
Consumer char(10),
TpPrt int,
Offset int,
IsDuplicate bit,
ConsumedAt datetime2 DEFAULT(getdate()),
ProducedAt datetime2
);
Consumer resilience and event processing performance
Let’s look at how events are distributed across consumers in mygroup. More specifically we’ll measure producer-to-database throughput and latency.
All components (Kafka, Camel routes, Microsoft SQL Server) have been setup and tested on a single 4 core machine. The metrics gathered serve as baseline to help us compare between normal and fault execution conditions. They are not a performance measure of the technologies used. Surely, by tuning and changing the deployment topology the numbers will look quite different.
Once Kafka is running and the topic is created, Camel consumers can be started and we can see how load balancing works.
Foo starts first and tells Kafka that: a) wants to read from words2 topic, and b) it belongs to mygroup. Since it’s the only consumer in the group so far, both partitions are assigned to Foo.
Bar starts second and provides the same topic and group name, so Kafka reassigns the partitions between the two active consumers, as seen below.
At this point Foo listens to partition-0 and Bar listens to partition-1. If one of the consumers in mygroup goes down both partitions are reassigned to the one still running. Kafka knows of such events by means of a heartbeat between the brokers and connected consumers.
While producers and consumers are running, we can visualize the data pipeline by querying Words table.
Here we can see that 7,744 events have been processed by Foo and Bar together. Bar consumed 1,925 from partition-1 only, while Foo consumed the rest from both partition-0 and partition-1.
On a test where producer generated 100k events at a rate of 100 events/sec, the two consumers together consumed at a rate of about 80 events/sec (that’s the rate records were added to the table). The imbalance between producing and consuming the events lead to a 15 secs average latency through the data pipeline (i.e. time between an event was created and time it was written to the database).
The metrics discussed here were captured using the following queries:
SELECT Consumer, TpPrt, COUNT(*) EvtsConsumed,
MIN(Offset) MinOffset, MAX(Offset) MaxOffset, MAX(ConsumedAt) LastActive,
COUNT(*)/DATEDIFF(second, MIN([ConsumedAt]), MAX([ConsumedAt])) EvtsPerSec,
AVG(DATEDIFF(second, ProducedAt, ConsumedAt)) AvgP2CLtncySec
FROM [mydb].[dbo].[Words]
GROUP BY Consumer, TpPrt
SELECT COUNT(*) Duplicates, MIN(ConsumedAt) FromT, MAX(ConsumedAt) ToT
FROM [mydb].[dbo].[Words]
WHERE IsDuplicate=1
SELECT count(EventUID) OutOfSeqUID --<-- consumers inbalance
FROM [mydb].[dbo].[Words]
WHERE EventUID + 1 NOT IN (SELECT EventUID FROM [mydb].[dbo].[Words])
SELECT TOP (100) *
FROM [mydb].[dbo].[Words]
ORDER BY EventUID DESC
Database down
We fault the data pipeline first by taking the database down, simply stopping Microsoft SQL Server engine service.
As expected, both consumers report a Cannot get a JDBC connection error.
Bar failed to add event #6375 read from partition-1 at at offset 11364. By default it will keep retrying every 5 seconds. Because the offset was not committed the same event will be served again to a mygroup consumer.
After a route failure, the consumer re-connects to Words2 topic and as a result the partitions get reassigned. When the database is brought back up, we can see that event #6375 is now processed successfully by the other consumer, Foo.
After a little while we can see that all 25750 events produced are successfully posted to the database. No events are lost or duplicated in this scenario.
First consumer down, second consumer bounced
In our second failure scenario, we’ll crash Bar consumer and let Foo do the work.
While both consumer are up, we check Words table to see events flowing through the data pipeline normally.
Next we use ips and taskkill commands to terminate Bar.
At this point in time we have only Foo running. We could leave it running, but to increase the chance of reading events multiple time let’s kill the its java process, too. On restarting Foo, we can see that it took partition-1 over from Bar, starting from next offset value 13393.
Last, we bring Bar back up running again.
We can see that in this failure scenario 47 events were read multiple times.
Equally important, no events were lost. When we compare Kafka offsets against RDBMS table all events/records are accounted for.
In addition, we can see that while most of the work was done by Foo, the data pipeline throughput is 41 events/second. Not surprisingly, it is half the throughput in the case where both consumers are running.
Over the 150k events produced and with consumer stop/start delays during the test, the average produced-to-db latency was about 10 mins.