Skip to main content

Kafka

Apache Kafka is a distributed data streaming platform that allows real-time publishing, subscribing, storing, and processing of data streams

This article explains how to add a Kafka data source in ClickPipes

Supported Versions and Architectures

  • Versions: Kafka 2.0 ~ 2.5 (built on Scala 2.12)
  • Architectures: Single-node or cluster

Supported Data Types

CategoryData Types
BooleanBOOLEAN
IntegerSHORT, INTEGER, LONG
Floating PointFLOAT, DOUBLE
NumericNUMBER
StringCHAR (supported as a source), VARCHAR, STRING, TEXT
BinaryBINARY
CompositeARRAY, MAP, OBJECT (supported as a source)
Date/TimeTIME, DATE, DATETIME, TIMESTAMP
UUIDUUID (supported as a source)

Structural Modes and Sync Details

When configuring the Kafka connection, you can select from the following two structure modes based on your business needs:

Description: Supports synchronization of complete DML operations (INSERT, UPDATE, DELETE). As a source, it parses and restores DML + DDL events for downstream processing; as a target, it stores these events in a standardized format, facilitating future task parsing.

Typical Use Case: In the CDC Log Queue, use the "Standard Structure" to write relational data change events from MySQL into Kafka, and then consume the data to write it into other databases.

Sample Data:

{
"ts": 1727097087513,
"op": "DML:UPDATE",
"opTs": 1727097087512,
"namespaces": [],
"table": "table_name",
"before": {},
"after": {},
}
  • ts: The timestamp when the event was parsed, recording the time the event was processed.
  • op: Event type, indicating the specific operation, such as DML:INSERT, DML:UPDATE, DML:DELETE.
  • opTs: Event timestamp, indicating when the data change actually occurred.
  • namespaces: A collection of schema names for multi-schema scenarios.
  • table: The table name indicating where the data change occurred.
  • before: Data content before the change, available only for UPDATE and DELETE operations.
  • after: Data content after the change, applicable to INSERT and UPDATE operations.

Consumption Details

When configuring data replication or transformation tasks later, you can specify the data synchronization method through task settings in the upper-right corner. The corresponding consumption details are as follows:

  • Full Only: Reads from the first message and stops the task after reaching the recorded incremental position.
  • Full + Incremental: Reads from the first message to the recorded position and then continuously syncs incremental data.
  • Incremental Only: Choose the starting point for incremental collection as Now, meaning sync starts from the current position, or Select Time, meaning sync starts from the calculated position based on the specified time.
tip

Since Kafka as a message queue only supports append operations, avoid duplicate data in the target system due to repeated consumption from the source.

Limitations

  • Data Type Adaptation: As a source, Kafka's data types need to be adjusted according to the target data source's requirements, or corresponding table structures should be manually created on the target side to ensure compatibility.
  • Message Delivery Guarantee: Due to Kafka's At least once delivery semantics and append-only behavior, duplicate consumption may occur. Idempotency must be ensured on the target side to avoid duplicate data resulting from repeated consumption.
  • Consumption Mode Limitation: Consumption threads use different consumer group numbers, so be aware of the impact on consumption concurrency.
  • Security Authentication Limitation: Currently, only authentication-free Kafka instances are supported.

Source Config

* **Connection Settings**
* **Name**: Enter a meaningful and unique name.
* **Type**: Supports using Kafka as a source database.
* **Connection Address**: Kafka connection address, including address and port, separated by a colon (`:`), for example, `113.222.22.***:9092`.
* **Structure Mode**: Choose based on business needs:
* **Standard Structure (Default)**: Supports synchronization of complete DML operations (INSERT, UPDATE, DELETE). As a source, it parses and restores DML + DDL events for downstream processing; as a target, it stores these events in a standardized format, facilitating future task parsing.
* **Original Structure**: Uses Kafka's native data synchronization method, supporting append-only operations similar to `INSERT`. As a source, it handles complex, unstructured data and passes it downstream; as a target, it allows flexible control over partitions, headers, keys, and values, enabling custom data insertion.
* **Key Serializer**, **Value Serializer**: Choose the serialization method for keys and values, such as Binary (default).