|
|
Kafka Connect - Streaming Data Between Systems
Author: Venkata Sudhakar
Kafka Connect is a framework for building and running scalable, reliable data pipelines between Apache Kafka and external systems such as databases, file systems, search indexes, and cloud services. While Kafka producers and consumers require you to write custom application code, Kafka Connect provides a declarative, configuration-driven way to move data. A Source Connector reads data from an external system and publishes it to Kafka topics. A Sink Connector reads data from Kafka topics and writes it to an external system. Kafka Connect runs as a distributed cluster of worker processes. Each connector is configured via a JSON document posted to the Kafka Connect REST API. The Confluent Hub provides hundreds of pre-built connectors for JDBC databases, S3, GCS, BigQuery, Elasticsearch, Snowflake, HTTP, and more. This means you can move data from MySQL to BigQuery, from S3 to Kafka, or from Kafka to Elasticsearch purely through configuration, with zero custom code. Kafka Connect handles parallelism, fault tolerance, offset management, and schema evolution automatically. The below example shows a JDBC Source Connector configuration that polls a MySQL table every 5 seconds and publishes new or updated rows to a Kafka topic using timestamp+incrementing mode.
It gives the following output,
curl http://kafka-connect:8083/connectors/mysql-orders-source/status
{
"name": "mysql-orders-source",
"connector": {"state": "RUNNING"},
"tasks": [{"id": 0, "state": "RUNNING"}]
}
# Records appear in Kafka topic mysql.orders:
{"id":1001,"customer_id":500,"amount":649.99,"status":"CREATED","updated_at":1705312200000}
{"id":1002,"customer_id":501,"amount":89.50,"status":"SHIPPED","updated_at":1705312260000}
The below example shows a BigQuery Sink Connector that reads from Kafka topics and writes to BigQuery tables, and the management commands for pausing, resuming, and monitoring connectors.
It gives the following output,
# BigQuery sink creates tables automatically from schema:
BQ table: analytics_dataset.mysql_orders - schema inferred, 1002 rows loaded
BQ table: analytics_dataset.mysql_customers - schema inferred, 125 rows loaded
# Consumer lag check:
GROUP TOPIC PARTITION LAG
connect-bigquery-orders-sink mysql.orders 0 0
connect-bigquery-orders-sink mysql.orders 1 2
connect-bigquery-orders-sink mysql.orders 2 0
# LAG=2 on partition 1 is normal - within acceptable real-time bounds
Kafka Connect modes for JDBC Source: bulk - Full table scan on every poll. Simple but expensive for large tables. incrementing - Tracks the highest seen value of an auto-increment column (id). Captures inserts only, not updates. timestamp - Tracks rows where updated_at is newer than the last poll. Captures updates but can miss rows if timestamps are not set on every update. timestamp+incrementing - Combines both: uses updated_at to detect changes and id as a tiebreaker to handle rows with the same timestamp. This is the recommended mode for most production use cases.
|
|