Use SASL/SCRAM authentication in Confluent Platform

Salted Challenge Response Authentication Mechanism (SCRAM), or SASL/SCRAM, is a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username/password authentication, like PLAIN and DIGEST-MD5. SCRAM provides the following features:

  • The challenge-response mechanism of SASL/SCRAM protects against password sniffing on the network and against dictionary attacks on the password file. SCRAM allows the server to authenticate the client without ever transmitting or storing the client’s password in plain text.
  • Authentication information stored in the authentication database is not sufficient by itself to impersonate the client. The information is salted to prevent a pre-stored dictionary attack if the database is compromised.

For details on how SASL/SCRAM works, see RFC 5802.

Confluent Platform clusters support SCRAM-SHA-256 and SCRAM-SHA-512, which can be used with TLS to perform secure authentication. The examples below use SCRAM-SHA-256, but you can substitute the configuration for SCRAM-SHA-512 as needed.

The SCRAM implementation in a Confluent Platform cluster stores SCRAM credentials in KRaft and is suitable for use in Confluent Platform installations where KRaft is on a private network. Because of this, you must create SCRAM credentials for users in KRaft.

KRaft-based Confluent Platform clusters

KRaft-backed Confluent clusters can’t use SCRAM for controller-to-controller authentication. However, Confluent Server brokers can use SCRAM to authenticate to controllers and other brokers. You must create the SCRAM credentials before the brokers are up and running.

kafka-storage.sh tool

You must explicitly create a cluster ID for a KRaft cluster with the kafka-storage.sh tool, for example.

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
Copy

The tool generates a cluster UUID, initializes storage space, and formats the log directory on a controller. Before running the tool, make sure you have the following minimum configuration in the controller’s server.properties file:

# SAMPLE server.properties for kafka-storage.sh tool
process.roles=controller
node.id=10
controller.quorum.voters=10@kraftcontroller:9090
log.dirs=/tmp/kraftLogs
listeners=CONTROLLER://:9090
controller.listener.names=CONTROLLER
Copy

A broker may authenticate with any of the KRaft controllers, so you must run kafka-storage.sh on each controller in your cluster. Running the tool ensures each controller has what it needs to write to the metadata log.

Create SCRAM credentials

When the kafka-storage.sh tool generates a cluster UUID, it does not add SCRAM credentials. You must create them in an additional procedure detailed in this section. You must create SCRAM credentials on each controller node. Failure to create the credentials can result in a situation in which one controller knows of the SCRAM credentials while the others do not.

When it runs, the kafka-storage.sh tool creates a bootstrap.checkpoint file. This file contains UserScramCredentialsRecord records used to bootstrap the cluster. The tool’s --add-scram option adds a new ApiMessageAndVersion record to the bootstrap.checkpoint file. The record contains a UserScramCredentialsRecord that stores the SCRAM credentials for the specified user. Confluent Server brokers use this record to authenticate other brokers to it using SCRAM. The server side of each connection uses UserScramCredentialsRecord; the client side of each connection still needs to know the password.

To create SCRAM credentials for users in KRaft, use the --add-scram option of the kafka-storage.sh tool, like this:

kafka-storage format [-h] --config CONFIG \
  --cluster-id CLUSTER_ID \
  --add-scram SCRAM_CREDENTIAL \
  --release-version RELEASE_VERSION \
  --ignore-formatted
Copy

where SCRAM_CREDENTIAL looks like one of the following:

  • 'SCRAM-SHA-256=[name=alice,password=alice-secret]'
  • 'SCRAM-SHA-512=[name=alice,iterations=8192,salt="MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=",saltedpassword="mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE="]'

The SCRAM_CREDENTIAL argument is a key-value pair. The key specifies the supported SCRAM mechanism, and the value contains another set of key-value pairs for populating the UserScramCredentialsRecord.

The SCRAM_CREDENTIAL subarguments require a name key and either a password key or a saltedpassword key. If you use a saltedpassword key, you must also supply an iteration key and a salt key. The value for salt and saltedpassword is a Base64 encoding of binary data. The iteration and salt key are otherwise optional. When these keys aren’t supplied, iteration count defaults to 4096 and the salt is randomly generated.

To add a user to an existing SCRAM setup, use a kafka-configs command:

kafka-configs --bootstrap-server localhost:9092 --alter \
  --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' \
  --entity-type users \
  --entity-name alice
Copy

Security considerations for SASL/SCRAM

  • For cases where you require Confluent Server brokers to authenticate each other using SCRAM, and you need to create SCRAM credentials before the brokers are up and running.
  • Confluent Platform only supports the strong hash functions SHA-256 and SHA-512 with a minimum iteration count of 4096. Strong hash functions combined with strong passwords and high iteration counts protect against brute force attacks if KRaft security is compromised.
  • SCRAM should be used only with TLS-encryption to prevent interception of SCRAM exchanges. This protects against dictionary or brute force attacks and against impersonation if KRaft is compromised.
  • The default SASL/SCRAM credential store may be overridden using custom callback handlers by configuring sasl.server.callback.handler.class in installations where KRaft is not secure.
  • For more details on security considerations, refer to RFC 5802.

The remainder of this page will show you how to configure SASL/SCRAM for each component in the Confluent Platform.

Configure Confluent Server brokers

Configure all brokers in the Kafka cluster to accept secure connections from clients. Any configuration changes made to the broker will require a rolling restart.

Enable security for Kafka brokers as described in the section below. Additionally, if you are using Confluent Control Center or Auto Data Balancer, configure your brokers for:

JAAS

Note

Use of separate JAAS files is supported, but is not recommended. Instead, use the listener configuration specified in step 5 of Configuration to replace the steps below.

  1. First create the broker’s JAAS configuration file in each Confluent Server broker’s configuration directory. For this example, it is named kafka_server_jaas.conf.

  2. In each broker’s JAAS file, configure a KafkaServer section. This configuration defines one user (admin). The properties username and password are used by the broker to initiate connections to other brokers. In this example, admin is the user for interbroker communication.

    KafkaServer {
       org.apache.kafka.common.security.scram.ScramLoginModule required
       username="admin"
       password="admin-secret";
    };
    
    Copy

Configuration

  1. Enable SASL/SCRAM mechanism in the server.properties file of every broker.

    # List of enabled mechanisms, can be more than one
    sasl.enabled.mechanisms=SCRAM-SHA-256
    
    # Specify one of of the SASL mechanisms
    sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
    
    Copy
  1. If you want to enable SASL for interbroker communication, add the following to the broker properties file (it defaults to PLAINTEXT). Set the protocol to:

    • SASL_SSL: if TLS/SSL encryption is enabled (TLS/SSL encryption should always be used if SASL mechanism is PLAIN)
    • SASL_PLAINTEXT: if TLS/SSL encryption is not enabled
    # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    security.inter.broker.protocol=SASL_SSL
    
    Copy
  2. Tell the Kafka brokers on which ports to listen for client and interbroker SASL connections. You must configure listeners, and optionally advertised.listeners if the value is different from listeners. Set the listener to:

    • SASL_SSL: if TLS/SSL encryption is enabled (TLS/SSL encryption should always be used if SASL mechanism is PLAIN)
    • SASL_PLAINTEXT: if TLS/SSL encryption is not enabled
    # With TLS/SSL encryption
    listeners=SASL_SSL://kafka1:9093
    advertised.listeners=SASL_SSL://localhost:9093
    
    # Without TLS/SSL encryption
    listeners=SASL_PLAINTEXT://kafka1:9093
    advertised.listeners=SASL_PLAINTEXT://localhost:9093
    
    Copy
  3. Configure both SASL_SSL and PLAINTEXT ports if:

    • SASL is not enabled for interbroker communication
    • Some clients connecting to the cluster do not use SASL

    Example SASL listeners with TLS/SSL encryption, mixed with PLAINTEXT listeners

    # With TLS/SSL encryption
    listeners=PLAINTEXT://kafka1:9092,SASL_SSL://kafka1:9093
    advertised.listeners=PLAINTEXT://localhost:9092,SASL_SSL://localhost:9093
    
    # Without TLS/SSL encryption
    listeners=PLAINTEXT://kafka1:9092,SASL_PLAINTEXT://kafka1:9093
    advertised.listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093
    
    Copy
  4. If you are not using a separate JAAS configuration file to configure JAAS, then configure JAAS for the Kafka broker listener as follows:

    listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required
       username="admin"
       password="admin-secret";
    
    Copy

Configure Kafka clients

Important

If you are configuring this for Schema Registry or REST Proxy, you must prefix each parameter with confluent.license. For example, sasl.mechanism becomes confluent.license.sasl.mechanism. For additional information, see Configure license clients to authenticate to Kafka.

The new Producer and Consumer clients support security for Kafka versions 0.9.0 and higher.

If you are using the Kafka Streams API, you can read on how to configure equivalent SSL and SASL parameters.

  1. Configure the following properties in a client properties file client.properties.

    sasl.mechanism=SCRAM-SHA-256
    # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    security.protocol=SASL_SSL
    
    Copy
  2. Configure the JAAS configuration property to describe how the clients like producer and consumer can connect to the Kafka Brokers. The properties username and password are used by clients to configure the user for client connections. In this example, clients connect to the broker as user kafkaclient1.

    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="kafkaclient1" \
      password="kafkaclient1-secret";
    
    Copy

Configure Kafka Connect

This section describes how to enable security for Kafka Connect. Securing Kafka Connect requires that you configure security for:

  1. Kafka Connect workers: part of the Kafka Connect API, a worker is really just an advanced client, underneath the covers
  2. Kafka Connect connectors: connectors may have embedded producers or consumers, so you must override the default configurations for Connect producers used with source connectors and Connect consumers used with sink connectors
  3. Kafka Connect REST: Kafka Connect exposes a REST API that can be configured to use TLS/SSL using additional properties

Configure security for Kafka Connect as described in the section below. Additionally, if you are using Confluent Control Center streams monitoring for Kafka Connect, configure security for:

Configure all the following properties in connect-distributed.properties.

  1. Configure the Connect workers to use SASL/SCRAM.

    sasl.mechanism=SCRAM-SHA-256
    # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    security.protocol=SASL_SSL
    
    Copy
  2. Configure the JAAS configuration property to describe how Connect’s producers and consumers can connect to the Kafka Brokers. The properties username and password are used by Connect to configure the user for connections. In this example, Connect workers connect to the broker as user connect.

    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="connect" \
      password="connect-secret";
    
    Copy
  3. For the connectors to leverage security, you also have to override the default producer/consumer configuration that the worker uses. Depending on whether the connector is a source or sink connector:

    • Source connector: configure the same properties adding the producer prefix.

      producer.sasl.mechanism=SCRAM-SHA-256
      # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
      producer.security.protocol=SASL_SSL
      producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="connect" \
        password="connect-secret";
      
      Copy
    • Sink connector: configure the same properties adding the consumer prefix.

      consumer.sasl.mechanism=SCRAM-SHA-256
      # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
      consumer.security.protocol=SASL_SSL
      consumer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="connect" \
        password="connect-secret";
      
      Copy

Configure Confluent Replicator

Confluent Replicator is a type of Kafka source connector that replicates data from a source to destination Kafka cluster. An embedded consumer inside Replicator consumes data from the source cluster, and an embedded producer inside the Kafka Connect worker produces data to the destination cluster.

Replicator version 4.0 and earlier requires a connection to ZooKeeper in the origin and destination Kafka clusters. If ZooKeeper is configured for authentication, the client configures the ZooKeeper security credentials via the global JAAS configuration setting -Djava.security.auth.login.config on the Connect workers, and the ZooKeeper security credentials in the origin and destination clusters must be the same.

To configure Confluent Replicator security, you must configure the Replicator connector as shown below and additionally you must configure:

Configure Confluent Replicator to use SASL/SCRAM by adding these properties in the Replicator’s JSON configuration file. The JAAS configuration property defines username and password used by Replicator to configure the user for connections. In this example, Replicator connects to the broker as user replicator.

{
  "name":"replicator",
    "config":{
      ....
      "src.kafka.security.protocol" : "SASL_SSL",
      "src.kafka.sasl.mechanism" : "SCRAM-SHA-256",
      "src.kafka.sasl.jaas.config" : "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"replicator\" password=\"replicator-secret\";",
      ....
    }
  }
}
Copy

To configure Confluent Replicator for a destination cluster with SASL/SCRAM authentication, modify the Replicator JSON configuration to include the following:

{
  "name":"replicator",
    "config":{
      ....
      "dest.kafka.security.protocol" : "SASL_SSL",
      "dest.kafka.sasl.mechanism" : "SCRAM-SHA-256",
      "dest.kafka.sasl.jaas.config" : "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"replicator\" password=\"replicator-secret\";",
      ....
    }
  }
}
Copy

Additionally the following properties are required in the Connect worker:

sasl.mechanism=SCRAM-SHA-256
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="replicator" password="replicator-secret";
producer.sasl.mechanism=SCRAM-SHA-256
producer.security.protocol=SASL_SSL
producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="replicator" password="replicator-secret";
Copy

For more information see the general security configuration for Connect workers here.

Tip

Configure Confluent Control Center

Confluent Control Center uses Kafka Streams as a state store, so if all the Kafka brokers in the cluster backing Control Center are secured, then the Control Center application also needs to be secured.

Note

When RBAC is enabled, Control Center cannot be used in conjunction with Kerberos because Control Center cannot support any SASL mechanism other than OAUTHBEARER.

Enable security for the Control Center application as described in the section below. Additionally, configure security for the following components:

  1. Enable SASL/SCRAM and the security protocol for Control Center in the etc/confluent-control-center/control-center.properties file.

    confluent.controlcenter.streams.sasl.mechanism=SCRAM-SHA-256
    # Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    confluent.controlcenter.streams.security.protocol=SASL_SSL
    
    Copy
  2. Configure the JAAS configuration property to describe how Control Center can connect to the Kafka Brokers. The properties username and password are used by Control Center to configure connections.

    confluent.controlcenter.streams.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="confluent" \
      password="confluent-secret";
    
    Copy

Configure Confluent Metrics Reporter

This section describes how to enable SASL/SCRAM for Confluent Metrics Reporter, which is used for Confluent Control Center and Auto Data Balancer.

To configure the Confluent Metrics Reporter for SASL/SCRAM, make the following configuration changes in the server.properties file in every broker in the production cluster being monitored.

  1. Verify that the Confluent Metrics Reporter is enabled.

    metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
    confluent.metrics.reporter.bootstrap.servers=kafka1:9093
    
    Copy
  2. Enable the SASL/SCRAM mechanism for Confluent Metrics Reporter.

    confluent.metrics.reporter.sasl.mechanism=SCRAM-SHA-256
    # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    confluent.metrics.reporter.security.protocol=SASL_SSL
    
    Copy

Configure Schema Registry

Important

If you are configuring this for Schema Registry or REST Proxy, you must prefix each parameter with confluent.license. For example, sasl.mechanism becomes confluent.license.sasl.mechanism.

Schema Registry uses Kafka to persist schemas, and so it acts as a client to write data to the Kafka cluster. Therefore, if the Kafka brokers are configured for security, you should also configure Schema Registry to use security. You may also refer to the complete list of Schema Registry configuration options.

  1. Here is an example subset of schema-registry.properties configuration parameters to add for SASL authentication:

    kafkastore.bootstrap.servers=kafka1:9093
    # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    kafkastore.security.protocol=SASL_SSL
    kafkastore.sasl.mechanism=SCRAM-SHA-256
    
    Copy
  2. Configure the JAAS configuration property to describe how Schema Registry can connect to the Kafka Brokers. The properties username and password are used by Schema Registry to configure the user for connections. In this example, Schema Registry connects to the broker as user schemaregistry.

    kafkastore.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="schemaregistry" \
      password="schemaregistry-secret";
    
    Copy

Configure REST Proxy

Important

If you are configuring this for Schema Registry or REST Proxy, you must prefix each parameter with confluent.license. For example, sasl.mechanism becomes confluent.license.sasl.mechanism.

Securing Confluent REST Proxy for SASL requires that you configure security between the REST proxy and the .

For a complete list of all configuration options, refer to SASL Authentication.

  1. Configure the SASL/SCRAM mechanism in kafka-rest.properties.

    Note

    Make sure the bootstrap.servers configuration is set with SASL_PLAINTEXT://host:port (or SASL_SSL://host:port) endpoints, or you’ll accidentally open an SASL connection to a non-SASL port. For more details, see bootstrap.servers in Standalone REST Proxy Configuration Options.

    bootstrap.servers=SASL_SSL://kafka1:9093
    client.sasl.mechanism=SCRAM-SHA-256
    # Configure SASL_SSL if TLS/SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
    client.security.protocol=SASL_SSL
    
    Copy
  2. Configure the JAAS configuration property to describe how the REST Proxy can connect to the Kafka Brokers. The properties username and password are used by the REST Proxy to configure the user for connections. In this example, the REST Proxy connects to the broker as user restproxy.

    client.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
      username="restproxy" \
      password="restproxy-secret";
    
    Copy