34 How Should Authorization Be Handled in Cloud Environments

34 How Should Authorization be Handled in Cloud Environments #

Hello, I am Hu Xi. Today, the topic I want to share is: Authorization Mechanism in Kafka.

What is Authorization Mechanism? #

In the previous lecture, we spent a lot of time discussing Kafka’s authentication mechanism. Today, let’s take a look at Kafka’s authorization mechanism. Authorization generally refers to granting access permissions to resources related to information security or computer security, especially access control.

Specifically, there are four common permission models.

  • ACL: Access-Control List, access control list.
  • RBAC: Role-Based Access Control, role-based permission control.
  • ABAC: Attribute-Based Access Control, attribute-based permission control.
  • PBAC: Policy-Based Access Control, policy-based permission control.

In typical internet scenarios, the first two models are commonly used, while the latter two are less commonly used.

The ACL model is very simple. It represents the direct mapping relationship between users and permissions, as shown in the diagram below:

The RBAC model introduces the concept of roles and supports grouping of users, as shown in the diagram below:

Kafka does not use the RBAC model; it uses the ACL model. Simply put, this model specifies which users have what kind of access permissions to what resources. We can use a sentence from the official website to describe this model: “Principal P is [Allowed/Denied] Operation O From Host H On Resource R.” There are many subjects mentioned in this sentence, and I will explain their meanings separately.

  • Principal: Represents the user accessing the Kafka cluster.
  • Operation: Represents a specific type of access, such as reading and writing messages or creating topics.
  • Host: Represents the IP address of the client application connecting to the Kafka cluster. The host supports wildcard placeholders, representing all IP addresses.
  • Resource: Represents the type of Kafka resource. If we take the latest version 2.3 as an example, there are 5 types of resources, namely TOPIC, CLUSTER, GROUP, TRANSACTIONALID, and DELEGATION TOKEN.

Currently, Kafka provides a pluggable authorization implementation mechanism. This mechanism stores all the ACL entries you configure in the /kafka-acl node under ZooKeeper. You can dynamically add, delete, modify, and query ACL entries using the Kafka-provided kafka-acls script and make them take effect immediately.

How to enable ACL? #

Enabling ACL in Kafka is very simple. You just need to add a line of configuration in the Broker’s configuration file, which is the server.properties file. Set the parameter value as follows:

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

The authorizer.class.name parameter specifies the implementation class of the ACL authorization mechanism. Kafka currently provides the Authorizer interface, which allows you to implement your own authorization mechanism. However, it is more common to use the SimpleAclAuthorizer implementation class provided by Kafka directly. Once this parameter has been set and the Broker is started, the Broker will enable ACL authorization by default. In a production environment, you need to do this configuration for each Broker in the cluster.

Super User #

After enabling ACL authorization, you must explicitly set the permissions for different users to access a resource, otherwise, by default, resources without any ACL configuration cannot be accessed. However, there is an exception: super users can access all resources even if no ACL is set for them.

So, how do we set up super users in a Kafka cluster? The method is simple, just set the super.users parameter in the Broker configuration file server.properties, for example:

super.users=User:superuser1;User:superuser2

Note that if you want to specify multiple super users at once, the delimiter is a semicolon, not a comma, to avoid problems with usernames that contain commas and cannot be separated.

In addition to setting the super.users parameter, Kafka also supports configuring all users as super users. If we set allow.everyone.if.no.acl.found=true in the server.properties file, then all users can access resources without any ACL set. However, I personally do not recommend this setting. After all, in a production environment, especially in environments with high security requirements, using a whitelist mechanism is more reassuring than a blacklist mechanism.

kafka-acls Script #

After understanding the concept of ACL in Kafka, let’s take a look at how to set them up. Currently, in Kafka, the method for configuring authorizations is through the kafka-acls script. For example, if we want to add all permissions at the cluster level for user Alice, we can use the following command:

$ kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Alice --operation All --topic '*' --cluster

In this command, “All” represents all operations, the asterisk in the topic field represents all topics, and specifying “–cluster” indicates that we are setting cluster-level permissions for Alice.

This script has many parameters, let’s look at another common usage:

$ bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:'*' --allow-host '*' --deny-principal User:BadUser --deny-host 10.205.96.119 --operation Read --topic test-topic

The asterisk after “User” represents all users, and the asterisk after “allow-host” represents all IP addresses. This command means allowing all users to read data from the topic “test-topic” using any IP address, while also denying access to the user “BadUser” and the IP address 10.205.96.119 under the “test-topic”.

The kafka-acls script also has other functionalities, such as deleting ACLs, querying existing ACLs, etc. Their actual usage is similar to the commands mentioned above, and I won’t list them one by one here. You can use kafka-acls.sh to query all its usages.

ACL Permission List #

The two commands mentioned earlier involve cluster permissions and read permissions for topics. You may wonder, how many types of ACL permissions does Kafka actually provide? Let’s take a look at the table below, which displays all the ACL permissions provided by Kafka.

Seeing such a large table, are you surprised? In fact, this just proves that Kafka’s current authorization mechanism is very granular. Now, let me share with you how to use this table.

For example, if you want to grant write permission for your producer program, first, you need to locate the Topic type permission in the Resource column, and then find the WRITE operation permission in the Operation column. This WRITE permission is crucial in determining whether the Producer program can send messages to the corresponding topic. In most cases, the Producer program may also need permissions for creating topics and fetching topic data, so Kafka has created shortcuts for these common permissions needed by Producers, namely “–producer”. This means that when executing the kafka-acls command, you can simply specify “–producer” to obtain these three permissions simultaneously. “–consumer” works in a similar way, specifying “–consumer” can obtain the permissions needed by Consumer-side applications as well.

Can the authorization mechanism be used separately? #

Regarding authorization, a common question is whether the Kafka authorization mechanism can be used without configuring authentication mechanisms. The answer is yes, but you can only set permissions for IP addresses. For example, the following command will deny the Producer application running on the IP address 127.0.0.1 from sending data to the “test” topic:

$ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --deny-principal User:* --deny-host 127.0.0.1 --operation Write --topic test

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello
[2019-07-16 10:10:57,283] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2019-07-16 10:10:57,284] ERROR [Producer clientId=console-producer] Topic authorization failed for topics [test] (org.apache.kafka.clients.Metadata)
[2019-07-16 10:10:57,284] ERROR Error when sending message to topic test with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]

Please note the orange text in the output. Although no authentication mechanism is set, we can still deny clients on these IP addresses from accessing Kafka resources by setting IP address ACL authorization. However, although the authorization mechanism can be used to some extent separately, I recommend using it in conjunction with the authentication mechanism mentioned in a previous article.

Next, let me give an example of configuring SSL + ACL to demonstrate how ACL authorization should be done in a cloud environment.

Configuration Example #

Before demonstrating ACLs, let me briefly explain how to configure SSL. I provide a shell script that makes it easy for you to set up SSL. Here is the code:

#!/bin/bash

# Set environment variables
BASE_DIR=/Users/huxi/testenv # You need to modify this
CERT_OUTPUT_PATH="$BASE_DIR/certificates"
PASSWORD=test1234
KEY_STORE="$CERT_OUTPUT_PATH/server.keystore.jks"
TRUST_STORE="$CERT_OUTPUT_PATH/server.truststore.jks"
CLIENT_KEY_STORE="$CERT_OUTPUT_PATH/client.keystore.jks"
CLIENT_TRUST_STORE="$CERT_OUTPUT_PATH/client.truststore.jks"
KEY_PASSWORD=$PASSWORD
STORE_PASSWORD=$PASSWORD
TRUST_KEY_PASSWORD=$PASSWORD
TRUST_STORE_PASSWORD=$PASSWORD
CERT_AUTH_FILE="$CERT_OUTPUT_PATH/ca-cert"
DAYS_VALID=365
DNAME="CN=Xi Hu, OU=YourDept, O=YourCompany, L=Beijing, ST=Beijing, C=CN"


mkdir -p $CERT_OUTPUT_PATH

echo "1. Generating keys and certificates......"
keytool -keystore $KEY_STORE -alias kafka-server -validity $DAYS_VALID -genkey -keyalg RSA \
-storepass $STORE_PASSWORD -keypass $KEY_PASSWORD -dname "$DNAME"

keytool -keystore $CLIENT_KEY_STORE -alias kafka-client -validity $DAYS_VALID -genkey -keyalg RSA \
-storepass $STORE_PASSWORD -keypass $KEY_PASSWORD -dname "$DNAME"

echo "2. Creating CA......"
openssl req -new -x509 -keyout $CERT_OUTPUT_PATH/ca-key -out "$CERT_AUTH_FILE" -days "$DAYS_VALID" \
-passin pass:"$PASSWORD" -passout pass:"$PASSWORD" \
-subj "/C=CN/ST=Beijing/L=Beijing/O=YourCompany/OU=YourDept,CN=Xi Hu"

echo "3. Adding CA file to broker truststore......"
keytool -keystore "$TRUST_STORE" -alias CARoot \
-importcert -file "$CERT_AUTH_FILE" -storepass "$TRUST_STORE_PASSWORD" -keypass "$TRUST_KEY_PASS" -noprompt

echo "4. Adding CA file to client truststore......"
keytool -keystore "$CLIENT_TRUST_STORE" -alias CARoot \
-importcert -file "$CERT_AUTH_FILE" -storepass "$TRUST_STORE_PASSWORD" -keypass "$TRUST_KEY_PASS" -noprompt

echo "5. Exporting cluster certificate from keystore......"
keytool -keystore "$KEY_STORE" -alias kafka-server -certreq -file "$CERT_OUTPUT_PATH/server-cert-file" \
-storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt

keytool -keystore "$CLIENT_KEY_STORE" -alias kafka-client -certreq -file "$CERT_OUTPUT_PATH/client-cert-file" \
-storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt

echo "6. Issuing certificates using CA......"
openssl x509 -req -CA "$CERT_AUTH_FILE" -CAkey $CERT_OUTPUT_PATH/ca-key -in "$CERT_OUTPUT_PATH/server-cert-file" \
-out "$CERT_OUTPUT_PATH/server-cert-signed" -days "$DAYS_VALID" -CAcreateserial -passin pass:"$PASSWORD"

openssl x509 -req -CA "$CERT_AUTH_FILE" -CAkey $CERT_OUTPUT_PATH/ca-key -in "$CERT_OUTPUT_PATH/client-cert-file" \
-out "$CERT_OUTPUT_PATH/client-cert-signed" -days "$DAYS_VALID" -CAcreateserial -passin pass:"$PASSWORD"

echo "7. Importing CA file into keystore......"
keytool -keystore "$KEY_STORE" -alias CARoot -import -file "$CERT_AUTH_FILE" -storepass "$STORE_PASSWORD" \
-keypass "$KEY_PASSWORD" -noprompt

keytool -keystore "$CLIENT_KEY_STORE" -alias CARoot -import -file "$CERT_AUTH_FILE" -storepass "$STORE_PASSWORD" \
-keypass "$KEY_PASSWORD" -noprompt

echo "8. Importing issued certificates into keystore......"
keytool -keystore "$KEY_STORE" -alias kafka-server -import -file "$CERT_OUTPUT_PATH/server-cert-signed" \
-storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt

keytool -keystore "$CLIENT_KEY_STORE" -alias kafka-client -import -file "$CERT_OUTPUT_PATH/client-cert-signed" \
-storepass "$STORE_PASSWORD" -keypass "$KEY_PASSWORD" -noprompt

echo "9. Removing temporary files......"
rm "$CERT_OUTPUT_PATH/ca-cert.srl"

Note: You need to modify the BASE_DIR variable according to your needs. rm “$CERT_OUTPUT_PATH/server-cert-signed” rm “$CERT_OUTPUT_PATH/client-cert-signed” rm “$CERT_OUTPUT_PATH/server-cert-file” rm “$CERT_OUTPUT_PATH/client-cert-file”

You can save the above code as a SHELL script and run it on a Broker machine. The script primarily produces four files: server.keystore.jks, server.truststore.jks, client.keystore.jks, and client.truststore.jks.

You need to copy the two files starting with “server” to all Broker machines in the cluster, and the two files starting with “client” to all client application machines that will connect to the Kafka cluster.

Next, you need to configure the server.properties file for each Broker by adding the following content:

listeners=SSL://localhost:9093
ssl.truststore.location=/Users/huxi/testenv/certificates/server.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/Users/huxi/testenv/certificates/server.keystore.jks
ssl.keystore.password=test1234
security.inter.broker.protocol=SSL
ssl.client.auth=required
ssl.key.password=test1234

Now we can start the Broker process. If you find that it cannot be started or fails to start, you need to check the error messages and adjust the configurations accordingly. Next, let’s configure the client SSL.

First, we need to create a file named client-ssl.config with the following content:

security.protocol=SSL
ssl.truststore.location=/Users/huxi/testenv/certificates/client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/Users/huxi/testenv/certificates/server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.endpoint.identification.algorithm=

Note that you must include the last line. Since Kafka 2.0, by default, it verifies whether the hostname of the server matches the hostname in the Broker’s certificate. If you want to disable this functionality, make sure to set this parameter to an empty string.

With these configurations in place, you can use ConsoleConsumer and ConsoleProducer to test if the Producer and Consumer work correctly. For example, the following command specifies the producer-config to point to the client-ssl configuration file we just created:

$ bin/kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.config

Now let’s talk about ACL configuration.

If you are running a Kafka cluster in the cloud, you will inevitably face the multi-tenancy problem. In addition to setting up a reasonable authentication mechanism, granting appropriate permissions to each client connecting to the Kafka cluster is also crucial. Here are some best practices to consider.

First, as mentioned earlier, to enable ACL, you need to set authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer.

Second, I recommend using a whitelist mechanism so that users who are not explicitly granted permissions have no access to any resources. In other words, do not set allow.everyone.if.no.acl.found=true in Kafka’s server.properties file.

Third, you can use the kafka-acls script to grant cluster permissions to SSL users. Let’s explain with the previous example.

When we configured SSL, we specified the user’s Distinguished Name as “CN=Xi Hu, OU=YourDept, O=YourCompany, L=Beijing, ST=Beijing, C=CN”. Previously, when setting parameters for the Broker, we specified security.inter.broker.protocol=SSL, which means SSL encryption is enforced for communication between Brokers.

If you do not grant permission operations to the specified Distinguished Name, you will not be able to successfully start the Broker. Therefore, you need to execute the following command before starting the Broker:

$ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:"CN=Xi Hu,OU=YourDept,O=YourCompany,L=Beijing,ST=Beijing,C=CN" --operation All --cluster

Fourth, you need to grant the corresponding permissions to client applications. For example, grant the producer permission to the producer and the consumer permission to the consumer. Assuming the client wants to access the topic named “test”, the commands are as follows:

$ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:"CN=Xi Hu,OU=YourDept,O=YourCompany,L=Beijing,ST=Beijing,C=CN" --producer --topic 'test'


$ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:"CN=Xi Hu,OU=YourDept,O=YourCompany,L=Beijing,ST=Beijing,C=CN" --consumer --topic 'test' --group '*'

Note the –producer and –consumer in these two commands. They are shortcuts that grant common permissions for producers and consumers at once.

As a PaaS administrator in a cloud environment, in addition to the necessary permissions mentioned above, it is better not to grant other permissions to clients, such as the permission to create topics. In general, the fewer permissions you grant, the more secure your Kafka cluster will be.

Summary #

In summary, we have completed a comprehensive review of Kafka’s authorization mechanism. In addition, I have also included instructions for SSL endpoint configuration. I hope you can combine the content of these two sections to create an ultra-secure Kafka cluster.

secure kafka

Open discussion #

Kafka provides various types of permissions. The content we are discussing today only covers a few of the most important permissions. If you want to grant a client the ability to query consumer group’s committed offset data, what permissions do you think should be granted?

Feel free to write down your thoughts and answers, and let’s discuss together. If you find it helpful, you are also welcome to share this article with your friends.