Kafka
Apache Kafka is a distributed data streaming platform that allows real-time publishing, subscribing, storing, and processing of data streams
This article explains how to add a Kafka data source in ClickPipes
Supported Versions and Architectures
- Versions: Kafka 2.0 ~ 2.5 (built on Scala 2.12)
- Architectures: Single-node or cluster
Supported Data Types
Category | Data Types |
---|---|
Boolean | BOOLEAN |
Integer | SHORT, INTEGER, LONG |
Floating Point | FLOAT, DOUBLE |
Numeric | NUMBER |
String | CHAR (supported as a source), VARCHAR, STRING, TEXT |
Binary | BINARY |
Composite | ARRAY, MAP, OBJECT (supported as a source) |
Date/Time | TIME, DATE, DATETIME, TIMESTAMP |
UUID | UUID (supported as a source) |
Structural Modes and Sync Details
When configuring the Kafka connection, you can select from the following two structure modes based on your business needs:
- Standard Structure (Default)
- Original Structure
Description: Supports synchronization of complete DML operations (INSERT, UPDATE, DELETE). As a source, it parses and restores DML + DDL events for downstream processing; as a target, it stores these events in a standardized format, facilitating future task parsing.
Typical Use Case: In the CDC Log Queue, use the "Standard Structure" to write relational data change events from MySQL into Kafka, and then consume the data to write it into other databases.
Sample Data:
{
"ts": 1727097087513,
"op": "DML:UPDATE",
"opTs": 1727097087512,
"namespaces": [],
"table": "table_name",
"before": {},
"after": {},
}
- ts: The timestamp when the event was parsed, recording the time the event was processed.
- op: Event type, indicating the specific operation, such as
DML:INSERT
,DML:UPDATE
,DML:DELETE
. - opTs: Event timestamp, indicating when the data change actually occurred.
- namespaces: A collection of schema names for multi-schema scenarios.
- table: The table name indicating where the data change occurred.
- before: Data content before the change, available only for
UPDATE
andDELETE
operations. - after: Data content after the change, applicable to
INSERT
andUPDATE
operations.
Description: Uses Kafka's native data synchronization method, supporting append-only operations similar to INSERT
. As a source, it handles complex, unstructured data and passes it downstream; as a target, it allows flexible control over partitions, headers, keys, and values, enabling custom data insertion.
Typical Use Case: Used for homogeneous data migration or unstructured data transformation, enabling data filtering and transformation through a Kafka -> JS Processing Node -> Kafka/MySQL data pipeline.
Sample Data:
{
"offset": 12345,
"timestampType": "LogAppendTime",
"partition": 3,
"timestamp": 1638349200000,
"headers": {
"headerKey1": "headerValue1",
},
"key": "user123",
"value": {
"id": 1,
"name": "John Doe",
"action": "login",
"timestamp": "2021-12-01T10:00:00Z"
}
}
- offset: Offset marking the message position, not included in the target message body.
- timestampType: Type of timestamp, used for metadata purposes, not included in the message body.
- partition: Specifies the partition number for message writing, written to the specified partition if provided.
- timestamp: Message creation time, uses the specified time if provided, otherwise uses the system time.
- headers: Message header information, written to the header if present, carrying additional metadata.
- key: Message key, used for partitioning strategies or to identify the message source.
- value: Message content, containing the actual business data.
Consumption Details
When configuring data replication or transformation tasks later, you can specify the data synchronization method through task settings in the upper-right corner. The corresponding consumption details are as follows:
- Full Only: Reads from the first message and stops the task after reaching the recorded incremental position.
- Full + Incremental: Reads from the first message to the recorded position and then continuously syncs incremental data.
- Incremental Only: Choose the starting point for incremental collection as Now, meaning sync starts from the current position, or Select Time, meaning sync starts from the calculated position based on the specified time.
Since Kafka as a message queue only supports append operations, avoid duplicate data in the target system due to repeated consumption from the source.
Limitations
- Data Type Adaptation: As a source, Kafka's data types need to be adjusted according to the target data source's requirements, or corresponding table structures should be manually created on the target side to ensure compatibility.
- Message Delivery Guarantee: Due to Kafka's
At least once
delivery semantics and append-only behavior, duplicate consumption may occur. Idempotency must be ensured on the target side to avoid duplicate data resulting from repeated consumption. - Consumption Mode Limitation: Consumption threads use different consumer group numbers, so be aware of the impact on consumption concurrency.
- Security Authentication Limitation: Currently, only authentication-free Kafka instances are supported.
Source Config
* **Connection Settings**
* **Name**: Enter a meaningful and unique name.
* **Type**: Supports using Kafka as a source database.
* **Connection Address**: Kafka connection address, including address and port, separated by a colon (`:`), for example, `113.222.22.***:9092`.
* **Structure Mode**: Choose based on business needs:
* **Standard Structure (Default)**: Supports synchronization of complete DML operations (INSERT, UPDATE, DELETE). As a source, it parses and restores DML + DDL events for downstream processing; as a target, it stores these events in a standardized format, facilitating future task parsing.
* **Original Structure**: Uses Kafka's native data synchronization method, supporting append-only operations similar to `INSERT`. As a source, it handles complex, unstructured data and passes it downstream; as a target, it allows flexible control over partitions, headers, keys, and values, enabling custom data insertion.
* **Key Serializer**, **Value Serializer**: Choose the serialization method for keys and values, such as Binary (default).