Cloudera Enterprise 6.0.x | Other versions

Kafka Cluster Sizing

Cluster Sizing - Network and Disk Message Throughput

There are many variables that go into determining the correct hardware footprint for a Kafka cluster. The most accurate way to model your use case is to simulate the load you expect on your own hardware, and you can do this using the load generation tools that ship with Kafka, kafka-producer-perf-test and kafka-consumer-perf-test. You can find more details in Kafka Administration using Command Line Tools.

However, if you want to size a cluster without simulation, a very simple rule could be to size the cluster based on the amount of disk-space required (which can be computed from the estimated rate at which you get data times the required data retention period).

A slightly more sophisticated estimation can be done based on network and disk throughput requirements. To make this estimation, let's plan for a use case with the following characteristics:

  • W - MB/sec of data that will be written
  • R - Replication factor
  • C - Number of consumer groups, that is the number of readers for each write

Kafka is mostly limited by the disk and network throughput: let’s first describe the disk and network requirements for the cluster.

The volume of writing expected is W * R (that is, each replica writes each message). Data is read by replicas as part of the internal cluster replication and also by consumers. Because all the replicas other than the master reads each write this means a read volume of (R-1) * W for replication. In addition each of the C consumers reads each write, so there will be a read volume of C * W. This gives the following:

  • Writes: W * R
  • Reads: (R+C- 1) * W

However, note that reads may actually be cached, in which case no actual disk I/O happens. We can model the effect of caching fairly easily. If the cluster has M MB of memory, then a write rate of W MB/second allows M/(W * R) seconds of writes to be cached. So a server with 32 GB of memory taking writes at 50 MB/second serves roughly the last 10 minutes of data from cache. Readers may fall out of cache for a variety of reasons—a slow consumer or a failed server that recovers and needs to catch up. An easy way to model this is to assume a number of lagging readers you to budget for. To model this, let’s call the number of lagging readers L. A very pessimistic assumption would be that L = R + C -1, that is that all consumers are lagging all the time. A more realistic assumption might be to assume no more than two consumers are lagging at any given time.

Based on this, we can calculate our cluster-wide I/O requirements:

  • Disk Throughput (Read + Write): W * R + L * W
  • Network Read Throughput: (R + C -1) * W
  • Network Write Throughput: W * R

A single server provides a given disk throughput as well as network throughput. For example, if you have a 1 Gigabit Ethernet card with full duplex, then that would give 125 MB/sec read and 125 MB/sec write; likewise 6 7200 SATA drives might give roughly 300 MB/sec read + write throughput. Once we know the total requirements, as well as what is provided by one machine, you can divide to get the total number of machines needed. This gives a machine count running at maximum capacity, assuming no overhead for network protocols, as well as perfect balance of data and load. Since there is protocol overhead as well as imbalance, you want to have at least 2x this ideal capacity to ensure sufficient capacity.

Choosing the Number of Partitions for a Topic

Choosing the proper number of partitions for a topic is the key to achieving a high degree of parallelism with respect to writes to and reads and to distribute load. Evenly distributed load over partitions is a key factor to have good throughput (avoid hot spots). Making a good decision requires estimation based on the desired throughput of producers and consumers per partition.



For example, if you want to be able to read 1 GB/sec, but your consumer is only able process 50 MB/sec, then you need at least 20 partitions and 20 consumers in the consumer group. Similarly, if you want to achieve the same for producers, and 1 producer can only write at 100 MB/sec, you need 10 partitions. In this case, if you have 20 partitions, you can maintain 1 GB/sec for producing and consuming messages. You should adjust the exact number of partitions to number of consumers or producers, so that each consumer and producer achieve their target throughput.

So a simple formula could be:

#Partitions = max(NP, NC)

where:

  • NP is the number of required producers determined by calculating: TT/TP
  • NC is the number of required consumers determined by calculating: TT/TC
  • TT is the total expected throughput for our system
  • TP is the max throughput of a single producer to a single partition
  • TC is the max throughput of a single consumer from a single partition

This calculation gives you a rough indication of the number of partitions. It's a good place to start. Keep in mind the following considerations for improving the number of partitions after you have your system in place:

  • The number of partitions can be specified at topic creation time or later.
  • Increasing the number of partitions also affects the number of open file descriptors. So make sure you set file descriptor limit properly.
  • Reassigning partitions can be very expensive, and therefore it's better to over- than under-provision.
  • Changing the number of partitions that are based on keys is challenging and involves manual copying (see Kafka Administration).
  • Reducing the number of partitions is not currently supported. Instead, create a new a topic with a lower number of partitions and copy over existing data.
  • Metadata about partitions are stored in ZooKeeper in the form of znodes. Having a large number of partitions has effects on ZooKeeper and on client resources:
    • Unneeded partitions put extra pressure on ZooKeeper (more network requests), and might introduce delay in controller and/or partition leader election if a broker goes down,
    • Producer and consumer clients need more memory, because they need to keep track of more partitions and also buffer data for all partitions.
  • As guideline for optimal performance, you should not have more than 3000 partitions per broker and not more than 30,000 partitions in a cluster.

Make sure consumers don’t lag behind producers by monitoring consumer lag. To check consumers' position in a consumer group (that is, how far behind the end of the log they are), use the following command:

$ kafka-consumer-groups --bootstrap-server BROKER_ADDRESS --describe --group CONSUMER_GROUP --new-consumer
Page generated July 25, 2018.