Kafka Sink
Prerequisites, SASL/TLS setup, and full configuration reference for the Nanosync Kafka sink connector.
Nanosync publishes CDC events to Apache Kafka using a pure-Go producer (franz-go). Each change event is published as a message to the configured topic. The message key is the row’s primary key, ensuring ordered delivery per row within a partition.
Prerequisites
- A running Kafka cluster (2.6+) or a compatible managed service (Confluent Cloud, MSK, Redpanda)
- Network connectivity from Nanosync to all broker addresses
- If SASL is required: credentials with produce permissions on the target topics
Setup
-
Verify broker connectivity
# From the Nanosync host, confirm each broker is reachable nc -zv kafka1.prod.internal 9092 -
Create ACLs (if using Kafka authorization)
# Allow nanosync to write to topics with the nanosync. prefix kafka-acls.sh --bootstrap-server kafka1:9092 \ --add --allow-principal User:nanosync \ --operation Write --topic 'nanosync.' --resource-pattern-type prefixed # Allow describe (needed for auto topic creation) kafka-acls.sh --bootstrap-server kafka1:9092 \ --add --allow-principal User:nanosync \ --operation Describe --topic 'nanosync.' --resource-pattern-type prefixed # Allow topic creation if auto_create_topics is true kafka-acls.sh --bootstrap-server kafka1:9092 \ --add --allow-principal User:nanosync \ --operation Create --cluster kafka-cluster -
Create topics manually (if
auto_create_topicsis false)kafka-topics.sh --bootstrap-server kafka1:9092 \ --create --topic nanosync.public.orders \ --partitions 12 --replication-factor 3
Connection configuration
Plaintext (no authentication)
connections:
- name: prod-kafka
type: kafka
properties:
brokers: "kafka1.prod.internal:9092,kafka2.prod.internal:9092,kafka3.prod.internal:9092"
client_id: nanosync-sink # identifies this producer in broker logs
SASL/PLAIN with TLS (Confluent Cloud, MSK SASL)
connections:
- name: confluent-kafka
type: kafka
properties:
brokers: "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092"
sasl_mechanism: PLAIN
sasl_username: "${env:KAFKA_API_KEY}"
sasl_password: "${env:KAFKA_API_SECRET}"
tls_enabled: "true"
SASL/SCRAM with TLS (MSK, self-hosted)
connections:
- name: msk-kafka
type: kafka
properties:
brokers: "b-1.msk.us-east-1.amazonaws.com:9096"
sasl_mechanism: SCRAM-SHA-512
sasl_username: "${env:KAFKA_USER}"
sasl_password: "${env:KAFKA_PASSWORD}"
tls_enabled: "true"
tls_ca_cert: "/etc/ssl/ca.crt"
Pipeline configuration
All tables to one topic
pipelines:
- name: orders-to-kafka
source:
connection: prod-postgres
tables:
- public.orders
- public.order_items
sink:
connection: prod-kafka
properties:
topic: orders-cdc
primary_keys: id
message_format: json
compression: snappy
Per-table topics using a prefix
pipelines:
- name: postgres-to-kafka
source:
connection: prod-postgres
tables:
- public.orders
- public.order_items
sink:
connection: prod-kafka
properties:
topic_prefix: nanosync. # creates nanosync.public_orders, nanosync.public_order_items
primary_keys: id
message_format: json
Topic names are formed as <topic_prefix><schema>_<table> with dots in the table name replaced by underscores.
Sink properties
Connection
| Property | Default | Description |
|---|---|---|
brokers | — | Comma-separated list of host:port broker addresses. Required. |
client_id | nanosync-sink | Producer client ID shown in broker logs and metrics. |
sasl_mechanism | — | SASL authentication: PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512. Leave empty for no SASL. |
sasl_username | — | SASL username. |
sasl_password | — | SASL password. Stored encrypted. |
tls_enabled | false | Enable TLS on broker connections. Required for Confluent Cloud and MSK SASL. |
tls_ca_cert | — | Path to a PEM-encoded CA certificate for broker TLS verification. |
Per-pipeline (set in sink.properties)
| Property | Default | Description |
|---|---|---|
topic | — | Fixed topic name — all tables write here. Mutually exclusive with topic_prefix. |
topic_prefix | nanosync. | Prefix for per-table topic names. Topics are named <prefix><schema>_<table>. Mutually exclusive with topic. |
primary_keys | — | Comma-separated column(s) used as the Kafka message key. Ensures ordered delivery for the same row within a partition. |
message_format | json | Message encoding: json, avro, protobuf, or cloudevents. |
compression | snappy | Producer compression: none, gzip, snappy, lz4, or zstd. |
auto_create_topics | true | Automatically create topics if they do not exist. |
num_partitions | -1 | Partition count for auto-created topics. -1 uses the broker default. |
replication_factor | -1 | Replication factor for auto-created topics. -1 uses the broker default. |
max_retries | 5 | Maximum produce retries on transient errors before the pipeline errors. |
Message format
JSON (default)
{
"_ns_op": "UPDATE",
"_ns_table": "public.orders",
"_ns_committed_at":"2026-03-14T10:22:01.456Z",
"before": {
"id": 1001,
"status": "pending",
"amount": 99.99
},
"after": {
"id": 1001,
"status": "shipped",
"amount": 99.99
}
}
CloudEvents
When message_format: cloudevents, each message is a CloudEvents v1.0 envelope:
{
"specversion": "1.0",
"type": "io.nanosync.cdc.UPDATE",
"source": "/nanosync/public/orders",
"id": "01JXXXXXXXXXXXXXXXXXXXXXXX",
"time": "2026-03-14T10:22:01.456Z",
"datacontenttype": "application/json",
"data": {
"before": { "id": 1001, "status": "pending" },
"after": { "id": 1001, "status": "shipped" }
}
}
Avro / Protobuf
avro and protobuf encode messages as binary with schema embedded in each message. Schema Registry integration is not required — Nanosync generates schemas from the source table structure.
Topic routing
| Configuration | Result |
|---|---|
topic: orders-cdc | All tables → orders-cdc |
topic_prefix: nanosync. | public.orders → nanosync.public_orders |
topic_prefix: prod.pg. | public.order_items → prod.pg.public_order_items |
topic and topic_prefix are mutually exclusive. If both are set, the pipeline will fail to start.
Compatible services
| Service | Notes |
|---|---|
| Confluent Cloud | Use sasl_mechanism: PLAIN with API key + secret, tls_enabled: true |
| Amazon MSK | SASL/SCRAM with TLS, or IAM auth (configure via AWS SDK env) |
| Redpanda | Fully Kafka-compatible, no extra config required |
| Self-hosted Kafka | Any 2.6+ cluster; configure ACLs as shown above |
Limitations
- No exactly-once guarantees. Nanosync uses at-least-once delivery — on retry after a transient error, duplicate messages may be produced. Design consumers to be idempotent (e.g., use the message key to deduplicate).
- Message size is bounded by the broker’s
max.message.bytessetting (default 1 MiB). Rows larger than this limit will fail to produce. Increase the broker limit or use a more compact encoding (avro,protobuf). - Avro and Protobuf encode the schema in each message. Confluent Schema Registry is not supported — schemas are not registered or retrieved from a registry.
- Ordering is guaranteed only within a partition for the same message key. Set
primary_keysso all events for a given row hash to the same partition. Cross-partition ordering is not guaranteed.
Monitoring
nanosync metrics pipeline orders-to-kafka
Key Prometheus metrics:
| Metric | Description |
|---|---|
ns_pipeline_replication_lag_seconds | End-to-end source-to-sink latency |
ns_sink_rows_written_total | Messages published to Kafka |
ns_sink_write_errors_total | Producer errors |
Troubleshooting
TOPIC_AUTHORIZATION_FAILED
The Nanosync user lacks produce permissions on the topic. Add a Write ACL for the topic or prefix.
UNKNOWN_TOPIC_OR_PARTITION with auto_create_topics: false
The topic does not exist. Create it manually or set auto_create_topics: true with cluster-level Create ACL.
Messages out of order
Ordering is guaranteed only within a partition for the same message key. Set primary_keys so that all events for a given row hash to the same partition.
High producer latency with compression: zstd
zstd is CPU-intensive. Switch to snappy for a better throughput/CPU trade-off.
SASL authentication failure
Double-check sasl_mechanism matches the broker configuration. PLAIN requires TLS (tls_enabled: true) to protect credentials in transit.