7.7.190-stable Switch to dev

GreyCat Kafka Library

@library("kafka", "0.0.0");

Apache Kafka integration for GreyCat. Consume and produce messages with full support for consumer groups, SSL/SASL security, and all librdkafka configuration options.

Quick Start

Consumer

var reader = KafkaReader<String> {
    conf: KafkaConf {
        "bootstrap.servers": "localhost:9092",
        "group.id": "my-consumer-group",
        "auto.offset.reset": "earliest",
    },
    topics: ["events", "metrics"],
};

var msg = reader.read();
info("Topic: ${msg.topic}, Partition: ${msg.partition}, Offset: ${msg.offset}");
info("Value: ${msg.value}");

Producer

var writer = KafkaWriter<String> {
    conf: KafkaConf {
        "bootstrap.servers": "localhost:9092",
    },
    topic: "events",
};

writer.write("Hello Kafka");
writer.write("Another message");
writer.flush();  // ensure all messages are delivered

API Reference

KafkaReader<T>

Consumer that reads messages from one or more Kafka topics. Generic type T is the deserialized payload type.

Constructor:

Field Type Description
conf KafkaConf Kafka client configuration (must include bootstrap.servers and group.id)
topics Array<String> Topics to subscribe to
timeout duration? Read timeout. Defaults to 10s

Methods:

Method Returns Description
read() KafkaMsg<T> Read next message. Blocks until available or timeout. Automatically commits offsets.

KafkaWriter<T>

Producer that writes messages to a Kafka topic. Generic type T is the payload type to serialize.

Constructor:

Field Type Description
conf KafkaConf Kafka client configuration (must include bootstrap.servers)
topic String Target topic name

Methods:

Method Returns Description
write(value) void Send a message. Buffered and sent asynchronously.
flush() void Block until all buffered messages are delivered. Call before shutdown.

KafkaMsg<T>

A message received from Kafka.

Field Type Description
topic String Source topic
partition int Partition number
offset int Message offset within the partition
value T Message payload

KafkaConf

Configuration for Kafka clients. Maps directly to librdkafka configuration properties. All fields are String? except bootstrap.servers which is required.

Essential settings:

Property Required For Description
bootstrap.servers Both Comma-separated broker list (e.g., "host1:9092,host2:9092")
group.id Consumer Consumer group name
auto.offset.reset Consumer Where to start for new groups: "earliest" or "latest"
client.id Both Client identifier for logging and monitoring

Security settings:

Property Description
security.protocol "PLAINTEXT", "SSL", "SASL_PLAINTEXT", or "SASL_SSL"
sasl.mechanism "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512", "GSSAPI", "OAUTHBEARER"
sasl.username SASL username
sasl.password SASL password
ssl.ca.location Path to CA certificate file
ssl.certificate.location Path to client certificate
ssl.key.location Path to client private key
ssl.key.password Private key passphrase

Producer tuning:

Property Default Description
acks Acknowledgement level: "all", "0", "1"
linger.ms Batching delay in milliseconds
batch.size Maximum batch size in bytes
compression.type "none", "gzip", "snappy", "lz4", "zstd"
delivery.timeout.ms Max time for message delivery
enable.idempotence Exactly-once delivery guarantee
retries Number of send retries

Consumer tuning:

Property Default Description
auto.commit.interval.ms Offset auto-commit interval
fetch.min.bytes Minimum data per fetch request
fetch.max.bytes Maximum data per fetch request
max.poll.interval.ms Max interval between polls before rebalance
session.timeout.ms Consumer heartbeat session timeout
heartbeat.interval.ms Heartbeat frequency
isolation.level "read_uncommitted" or "read_committed"

Examples

Continuous Consumer Loop

var reader = KafkaReader<String> {
    conf: KafkaConf {
        "bootstrap.servers": "broker1:9092,broker2:9092",
        "group.id": "data-ingestion",
        "auto.offset.reset": "latest",
    },
    topics: ["sensor-data"],
    timeout: 5_s,
};

while (true) {
    var msg = reader.read();
    info("[${msg.topic}:${msg.partition}@${msg.offset}] ${msg.value}");
}

Secure Connection with SASL/SSL

var writer = KafkaWriter<String> {
    conf: KafkaConf {
        "bootstrap.servers": "kafka.example.com:9093",
        "security.protocol": "SASL_SSL",
        "sasl.mechanism": "PLAIN",
        "sasl.username": "api-key",
        "sasl.password": "api-secret",
        "ssl.ca.location": "/etc/ssl/certs/ca-certificates.crt",
    },
    topic: "secure-topic",
};

writer.write("encrypted message");
writer.flush();