Kafka Security Authentication and Authorization Configuration Guide (SASL/PLAIN and SASL/SCRAM)
This guide explains Kafka's authentication and authorization mechanisms, covering SASL/PLAIN and SASL/SCRAM setups, JAAS file creation, server property configuration, ACL management, and provides complete Java producer and consumer examples for secure communication.
This article describes Kafka permission authentication (tested on Kafka 2.3.0) and explains the security mechanisms used for identity authentication between clients, brokers, and ZooKeeper, as well as topic‑level authorization.
Kafka supports two main security layers: SASL for authentication and SSL for encryption. The supported SASL mechanisms are GSSAPI (Kerberos), PLAIN, SCRAM‑SHA‑256, SCRAM‑SHA‑512, and OAUTHBEARER.
SASL/PLAIN configuration steps :
1. Create a JAAS file (e.g., kafka-server-jaas.conf ) with user definitions:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-pwd"
user_admin="admin-pwd"
user_producer="producer-secret"
user_consumer="consumer-secret"
user_alice="alice-secret";
};2. Add SASL settings to server.properties :
listeners=SASL_PLAINTEXT://host.name:9092
advertised.listeners=SASL_PLAINTEXT://host.name:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:admin3. Modify the broker start script to point to the JAAS file:
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/etc/kafka/config/kafka-server-jaas.conf kafka.Kafka "$@"4. Create ACLs for a user (example for alice ) using kafka-acls.sh :
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:12181 --add --allow-principal User:alice --operation Write --topic *
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:12181 --add --allow-principal User:alice --operation Read --topic *
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:12181 --add --allow-principal User:alice --operation Describe --topic *
./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:12181 --add --allow-principal User:alice --operation Read --group test-groupExample Java producer using SASL/PLAIN:
package cn.xdf.xadd.xaddstarterkafkasample.sample;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducer {
private static final String TOPIC = "test-topic1";
private static final String BROKERS = "host.name:port";
private static KafkaProducer
producer;
static {
Properties props = initConfig();
producer = new KafkaProducer<>(props);
}
public static Properties initConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"alice-secret\";");
return props;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ProducerRecord
record = new ProducerRecord<>(TOPIC, 0, "tian", "xdf-9901-value-" + (int)(10 * Math.random()));
producer.send(record, (metadata, e) -> {
if (e != null) System.out.println("send error: " + e.getMessage());
else System.out.println(String.format("offset:%s,partition:%s", metadata.offset(), metadata.partition()));
}).get();
}
}Example Java consumer using SASL/PLAIN:
package cn.xdf.xadd.xaddstarterkafkasample.sample;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
public class KafkaConsumer {
private static final String TOPIC = "test-topic1";
private static final String BROKERS = "host.name:port";
private static KafkaConsumer
consumer;
static {
Properties props = initConfig();
consumer = new KafkaConsumer<>(props);
}
public static Properties initConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "xdf-9901");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"alice-secret\";");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
return props;
}
public static void main(String[] args) {
consumer.subscribe(Collections.singleton(TOPIC));
AtomicBoolean running = new AtomicBoolean(true);
try {
while (running.get()) {
ConsumerRecords
records = consumer.poll(1000L);
for (ConsumerRecord
record : records) {
System.out.println("topic:" + record.topic() + ", partition:" + record.partition() + ", offset:" + record.offset());
System.out.println("key:" + record.key() + ", value:" + record.value());
}
}
} catch (Exception e) {
System.out.println("consumer error : " + e.getMessage());
} finally {
consumer.close();
}
}
}For script‑based clients, the JAAS file must be referenced via the JVM option -Djava.security.auth.login.config in the producer/consumer scripts, and the security.protocol and sasl.mechanism properties must be added to the respective *.properties files.
SASL/SCRAM configuration steps :
1. Create SCRAM credentials in ZooKeeper using kafka-configs.sh (example for user admin ):
./kafka-configs.sh --zookeeper
--alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin2. Add similar commands for producer and consumer users.
3. Add a SCRAM JAAS entry for the broker:
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};4. Pass the JAAS file location to each broker with the JVM flag:
-Djava.security.auth.login.config=/etc/kafka/kafka-server-jaas.conf5. Update server.properties to enable SCRAM mechanisms and ACL settings:
# authentication
listeners=SASL_PLAINTEXT://host.name:port
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
# ACL
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer6. Restart the brokers and assign ACLs to the new users using the same kafka-acls.sh commands described earlier.
Client configuration for SCRAM is analogous to the SASL/PLAIN examples, only the mechanism and JAAS module differ (e.g., SASL_MECHANISM=SCRAM-SHA-256 and SaslLoginModule ).
For further details on Kafka’s security implementations, refer to the source package org.apache.kafka.common.security in the Apache Kafka repository.
New Oriental Technology
Practical internet development experience, tech sharing, knowledge consolidation, and forward-thinking insights.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.