Multi-Tenancy Data Models in Kafka

Data modeling for relational databases is a well researched field. NoSQL databases, however, are more specialized and require different approaches for deriving an appropriate data model. My current project Ameto is making use of the distributed streaming platform Apache Kafka. Ameto is a multi-tenant application and requires an appropriate data model. While there is some advice available on the internet about how to organize your data in Kafka,[1] little can be found about how multi-tenancy can be approached. In this article, I would like to share my thoughts and findings about structuring your data in Kafka for a system with multiple users.

Introduction to Apache Kafka

This section will give you a refresher on how Apache Kafka works. It will explain the concept of topics and partitions without going into too much detail. If you already know all of this, continue with the next section.

Apache Kafka is a distributed streaming platform. It allows a producer to publish messages to so called topics in a durable way. Consumers can then subscribe to a stream of messages in a topic. For example, the jobs topic may contain all status updates of jobs (job created, job status changed, job deleted).

A topic consists of one or more partitions. Messages sent to the same partition are guaranteed to be persisted in the exact order they were sent. For our jobs topic, this means that we want all status updates of a specific job to go into the same partition. Otherwise, if the job_created event and the job_updated event are sent to different partitions, we cannot guarantee that a subscriber receives the events in the correct order. By default, the partition of a message is determined by the message key.

Single tenant layout

Let's have a look at a data model for a single tenant. We will use a subset of Ameto's data model. Ameto has three core concepts: assets, pipelines, and jobs. An asset describes binary data that the user uploads, a pipeline defines specific steps for processing an asset, and a job is created when an asset is processed in a pipeline. In the early days of Ameto, these different entities were stored in three different Kafka topics, so that a topic only ever contains a single entity type.

Per-tenant topics

When introducing multi-tenancy to the system, Ameto initially used one topic per user per type of entity. The topics jobs, for example, was be split into user1.jobs, user2.jobs, …, each of which only contain the messages relevant to the respective user. The individual message keys are based on the entity IDs, so that all events concerning a job instance are routed to the same partition.

The benefit of this approach is that user data is stored inherently separate, making it more difficult to accidentally access data from a different user. Also the deletion of a user simply involves deleting the corresponding user-specific topics. Most importantly, though, an Ameto service instance does not need to know about multi-tenancy at all. It can be easily assigned to work for a single user by consuming from or producing to a single topic.

On the downside, Kafka has to do quite some bookkeeping to create or delete a topic: News topics have to be registered with Zookeeper, which involves remote access. For each partition in a topic, a leader has to be elected and new log files have to be created on the disk. Tests in Ameto that relied on topics being created or deleted on-the-fly tended to be flaky, especially on machines with slow disks (read: HDDs). This approach will also make problems, if you expect a large number of tenants since a growing number of tenants leads to a growing number of partitions. For three topics with a replication factor of three and a minimum of three partitions this approach will create 333=27 partitions for a single user. This adds up quickly. A single Kafka broker is designed to handle hundreds or maybe thousands of partitions.[1:1] [2] This means that you either have to keep the number of tenants low, or you will have to scale up your Kafka cluster so you that you do not end up with high latencies, high memory consumption or long unavailability in case of an outage.[2:1] Overall, it feels like Kafka is not designed to handle many dynamic topics. So what other options do we have to organize our data in Kafka?

Per-tenant partitions

The current solution in Ameto is to create one topic per entity type (i.e. assets, pipelines, jobs) and send all messages of a single user to the same partition. For example, all jobs of user1 go to partition 1 of the jobs topic, and all jobs of user2 go to partition 2. We do this by prefixing the message key with the relevant user identifier, e.g. user1:job5. A custom Partitioner can ensure that messages with the same user prefix are sent to the same partition with very little code. The following example uses the default hash partitioner, but only hashes the prefix, instead of the whole message key (written in Python using kafka-python-1.4):

from kafka import KafkaProducer
from kafka.partitioner.hashed import Murmur2Partitioner

class PrefixPartitioner(Murmur2Partitioner):
    def partition(self, key, partitions=None):
        prefix_separator = b':'
        prefix, _ = key.split(prefix_separator, maxsplit=1)
        return super().partition(key=prefix, partitions=partitions)

producer = KafkaProducer(bootstrap_servers=kafka_brokers,
                         partitioner=PrefixPartitioner())

Note that this approach does not imply that each user has a dedicated partition. Two tenants might share the same partition. Therefore, we can choose the number of partitions independently from the number of tenants. Topics with a high message volume may be assigned a larger number of partitions, for example, to increase consumer throughput. The performance of the Kafka cluster is now decoupled from the number of tenants in the system and can be tuned independently. As an added benefit, this layout provides stronger ordering guarantees on messages: It is not only guaranteed that all updates to the same entity (e.g. job updates) are stored and delivered in order, it is even guaranteed that all job events of a tenant are stored and delivered in order.[3]

Conclusion

For a small number of tenants or if tenants produce a high volume of messages that are costly to consume, create a topic per tenant (and per message type). This approach will couple the number of partitions to the number of tenants, but nicely separates the data by tenants. You will be able to subscribe or produce to a tenant-specific topic and can make use of the topics' partitioning to parallelize stream processing.

If the number of tenants is large, try to partition your data by tenants. You will lose the ability to throw multiple consumers per tenant at a message stream, but the number of partitions in a topic and the number of tenants in the system are decoupled.

This article assumes that a topic contains only a single type of messages. Although I recommend doing so, there are certainly legitimate use cases where multiple message types need to be stored in a topic. In any case, it is worth thinking about how your data will be consumed, in order to come up with an organizational structure for Kafka that fits your application.


  1. Martin Kleppmann (2018) - Should You Put Several Event Types in the Same Kafka Topic? ↩︎ ↩︎

  2. Jun Rao (2015) - How to choose the number of topics/partitions in a Kafka cluster? ↩︎ ↩︎

  3. Another interesting property of per-tenant partitions is that we only read from a single partition (and therefore a single broker instance) to consume all messages related to an entity type. Say we have a consumer that creates monthly billing data from a stream of job messages. Previously, the consumer was assigned multiple partitions, possibly located on different Kafka brokers. Each broker will then perform a disk seek to the respective offset in the data log of the partition, assemble a batch of messages and send it over the network to the consumer. When all job messages of a user are stored in a single partition, only a single disk seek will be triggered across the Kafka cluster and the message batch will be larger. This property may be seen as a benefit when message size is small and the batch size hardly fills the send and receive buffers of the brokers and consumers. In the same vein it may be seen as a drawback: When message size is large and send/recieve buffers are generally full, we lose the option to add another consumer. Depending on your specific use case, this aspect may be worth a benchmark. ↩︎