Kafka Sink

Prerequisites, SASL/TLS setup, and full configuration reference for the Nanosync Kafka sink connector.

Nanosync publishes CDC events to Apache Kafka using a pure-Go producer (franz-go). Each change event is published as a message to the configured topic. The message key is the row’s primary key, ensuring ordered delivery per row within a partition.

Prerequisites

Setup

  1. Verify broker connectivity

    # From the Nanosync host, confirm each broker is reachable
    nc -zv kafka1.prod.internal 9092
  2. Create ACLs (if using Kafka authorization)

    # Allow nanosync to write to topics with the nanosync. prefix
    kafka-acls.sh --bootstrap-server kafka1:9092 \
      --add --allow-principal User:nanosync \
      --operation Write --topic 'nanosync.' --resource-pattern-type prefixed
    
    # Allow describe (needed for auto topic creation)
    kafka-acls.sh --bootstrap-server kafka1:9092 \
      --add --allow-principal User:nanosync \
      --operation Describe --topic 'nanosync.' --resource-pattern-type prefixed
    
    # Allow topic creation if auto_create_topics is true
    kafka-acls.sh --bootstrap-server kafka1:9092 \
      --add --allow-principal User:nanosync \
      --operation Create --cluster kafka-cluster
  3. Create topics manually (if auto_create_topics is false)

    kafka-topics.sh --bootstrap-server kafka1:9092 \
      --create --topic nanosync.public.orders \
      --partitions 12 --replication-factor 3

Connection configuration

Plaintext (no authentication)

connections:
  - name: prod-kafka
    type: kafka
    properties:
      brokers: "kafka1.prod.internal:9092,kafka2.prod.internal:9092,kafka3.prod.internal:9092"
      client_id: nanosync-sink   # identifies this producer in broker logs

SASL/PLAIN with TLS (Confluent Cloud, MSK SASL)

connections:
  - name: confluent-kafka
    type: kafka
    properties:
      brokers:          "pkc-xxxxx.us-east-1.aws.confluent.cloud:9092"
      sasl_mechanism:   PLAIN
      sasl_username:    "${env:KAFKA_API_KEY}"
      sasl_password:    "${env:KAFKA_API_SECRET}"
      tls_enabled:      "true"

SASL/SCRAM with TLS (MSK, self-hosted)

connections:
  - name: msk-kafka
    type: kafka
    properties:
      brokers:          "b-1.msk.us-east-1.amazonaws.com:9096"
      sasl_mechanism:   SCRAM-SHA-512
      sasl_username:    "${env:KAFKA_USER}"
      sasl_password:    "${env:KAFKA_PASSWORD}"
      tls_enabled:      "true"
      tls_ca_cert:      "/etc/ssl/ca.crt"

Pipeline configuration

All tables to one topic

pipelines:
  - name: orders-to-kafka
    source:
      connection: prod-postgres
      tables:
        - public.orders
        - public.order_items
    sink:
      connection: prod-kafka
      properties:
        topic:          orders-cdc
        primary_keys:   id
        message_format: json
        compression:    snappy

Per-table topics using a prefix

pipelines:
  - name: postgres-to-kafka
    source:
      connection: prod-postgres
      tables:
        - public.orders
        - public.order_items
    sink:
      connection: prod-kafka
      properties:
        topic_prefix:   nanosync.   # creates nanosync.public_orders, nanosync.public_order_items
        primary_keys:   id
        message_format: json

Topic names are formed as <topic_prefix><schema>_<table> with dots in the table name replaced by underscores.

Sink properties

Connection

PropertyDefaultDescription
brokersComma-separated list of host:port broker addresses. Required.
client_idnanosync-sinkProducer client ID shown in broker logs and metrics.
sasl_mechanismSASL authentication: PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512. Leave empty for no SASL.
sasl_usernameSASL username.
sasl_passwordSASL password. Stored encrypted.
tls_enabledfalseEnable TLS on broker connections. Required for Confluent Cloud and MSK SASL.
tls_ca_certPath to a PEM-encoded CA certificate for broker TLS verification.

Per-pipeline (set in sink.properties)

PropertyDefaultDescription
topicFixed topic name — all tables write here. Mutually exclusive with topic_prefix.
topic_prefixnanosync.Prefix for per-table topic names. Topics are named <prefix><schema>_<table>. Mutually exclusive with topic.
primary_keysComma-separated column(s) used as the Kafka message key. Ensures ordered delivery for the same row within a partition.
message_formatjsonMessage encoding: json, avro, protobuf, or cloudevents.
compressionsnappyProducer compression: none, gzip, snappy, lz4, or zstd.
auto_create_topicstrueAutomatically create topics if they do not exist.
num_partitions-1Partition count for auto-created topics. -1 uses the broker default.
replication_factor-1Replication factor for auto-created topics. -1 uses the broker default.
max_retries5Maximum produce retries on transient errors before the pipeline errors.

Message format

JSON (default)

{
  "_ns_op":          "UPDATE",
  "_ns_table":       "public.orders",
  "_ns_committed_at":"2026-03-14T10:22:01.456Z",
  "before": {
    "id":     1001,
    "status": "pending",
    "amount": 99.99
  },
  "after": {
    "id":     1001,
    "status": "shipped",
    "amount": 99.99
  }
}

CloudEvents

When message_format: cloudevents, each message is a CloudEvents v1.0 envelope:

{
  "specversion":     "1.0",
  "type":            "io.nanosync.cdc.UPDATE",
  "source":          "/nanosync/public/orders",
  "id":              "01JXXXXXXXXXXXXXXXXXXXXXXX",
  "time":            "2026-03-14T10:22:01.456Z",
  "datacontenttype": "application/json",
  "data": {
    "before": { "id": 1001, "status": "pending" },
    "after":  { "id": 1001, "status": "shipped" }
  }
}

Avro / Protobuf

avro and protobuf encode messages as binary with schema embedded in each message. Schema Registry integration is not required — Nanosync generates schemas from the source table structure.

Topic routing

ConfigurationResult
topic: orders-cdcAll tables → orders-cdc
topic_prefix: nanosync.public.ordersnanosync.public_orders
topic_prefix: prod.pg.public.order_itemsprod.pg.public_order_items

topic and topic_prefix are mutually exclusive. If both are set, the pipeline will fail to start.

Compatible services

ServiceNotes
Confluent CloudUse sasl_mechanism: PLAIN with API key + secret, tls_enabled: true
Amazon MSKSASL/SCRAM with TLS, or IAM auth (configure via AWS SDK env)
RedpandaFully Kafka-compatible, no extra config required
Self-hosted KafkaAny 2.6+ cluster; configure ACLs as shown above

Limitations

Monitoring

nanosync metrics pipeline orders-to-kafka

Key Prometheus metrics:

MetricDescription
ns_pipeline_replication_lag_secondsEnd-to-end source-to-sink latency
ns_sink_rows_written_totalMessages published to Kafka
ns_sink_write_errors_totalProducer errors

Troubleshooting

TOPIC_AUTHORIZATION_FAILED The Nanosync user lacks produce permissions on the topic. Add a Write ACL for the topic or prefix.

UNKNOWN_TOPIC_OR_PARTITION with auto_create_topics: false The topic does not exist. Create it manually or set auto_create_topics: true with cluster-level Create ACL.

Messages out of order Ordering is guaranteed only within a partition for the same message key. Set primary_keys so that all events for a given row hash to the same partition.

High producer latency with compression: zstd zstd is CPU-intensive. Switch to snappy for a better throughput/CPU trade-off.

SASL authentication failure Double-check sasl_mechanism matches the broker configuration. PLAIN requires TLS (tls_enabled: true) to protect credentials in transit.