Create a Kamelet to able Camel K to connect your Kafka cluster.
To do so, connect to the OpenShift console, select the namespace of your team and click the + button in the top-right corner. Then, copy and paste the following content.
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: kafka-sink-scram
annotations:
camel.apache.org/kamelet.support.level: "Stable"
camel.apache.org/catalog.version: "main-SNAPSHOT"
camel.apache.org/kamelet.icon: ""
camel.apache.org/provider: "Apache Software Foundation"
camel.apache.org/kamelet.group: "Kafka"
labels:
camel.apache.org/kamelet.type: "sink"
spec:
definition:
title: "Kafka Sink (SCRAM Authentication)"
description: |-
Send data to Kafka topics.
The Kamelet is able to understand the following headers to be set:
- `key` / `ce-key`: as message key
- `partition-key` / `ce-partitionkey`: as message partition key
Both the headers are optional.
required:
- topic
- bootstrapServers
- user
- password
type: object
properties:
topic:
title: Topic Names
description: Comma separated list of Kafka topic names
type: string
bootstrapServers:
title: Bootstrap Servers
description: Comma separated list of Kafka Broker URLs
type: string
securityProtocol:
title: Security Protocol
description: Protocol used to communicate with brokers. SASL_PLAINTEXT, PLAINTEXT, SASL_SSL and SSL are supported
type: string
default: SASL_PLAINTEXT
saslMechanism:
title: SASL Mechanism
description: The Simple Authentication and Security Layer (SASL) Mechanism used.
type: string
default: SCRAM-SHA-512
user:
title: Username
description: Username to authenticate to Kafka
type: string
x-descriptors:
- urn:camel:group:credentials
password:
title: Password
description: Password to authenticate to kafka
type: string
format: password
x-descriptors:
- urn:alm:descriptor:com.tectonic.ui:password
- urn:camel:group:credentials
dependencies:
- "camel:core"
- "camel:kafka"
- "camel:kamelet"
template:
from:
uri: "kamelet:source"
steps:
- choice:
when:
- simple: "${header[key]}"
steps:
- set-header:
name: kafka.KEY
simple: "${header[key]}"
- simple: "${header[ce-key]}"
steps:
- set-header:
name: kafka.KEY
simple: "${header[ce-key]}"
- choice:
when:
- simple: "${header[partition-key]}"
steps:
- set-header:
name: kafka.PARTITION_KEY
simple: "${header[partition-key]}"
- simple: "${header[ce-partitionkey]}"
steps:
- set-header:
name: kafka.PARTITION_KEY
simple: "${header[ce-partitionkey]}"
- to:
uri: "kafka:{{topic}}"
parameters:
brokers: "{{bootstrapServers}}"
securityProtocol: "{{securityProtocol}}"
saslMechanism: "{{saslMechanism}}"
saslJaasConfig: "org.apache.kafka.common.security.scram.ScramLoginModule required username='{{user}}' password='{{password}}';"