In this page
GreyCat Kafka Library
@library("kafka", "0.0.0");
Apache Kafka integration for GreyCat. Consume and produce messages with full support for consumer groups, SSL/SASL security, and all librdkafka configuration options.
Quick Start
Consumer
var reader = KafkaReader<String> {
conf: KafkaConf {
"bootstrap.servers": "localhost:9092",
"group.id": "my-consumer-group",
"auto.offset.reset": "earliest",
},
topics: ["events", "metrics"],
};
var msg = reader.read();
info("Topic: ${msg.topic}, Partition: ${msg.partition}, Offset: ${msg.offset}");
info("Value: ${msg.value}");
Producer
var writer = KafkaWriter<String> {
conf: KafkaConf {
"bootstrap.servers": "localhost:9092",
},
topic: "events",
};
writer.write("Hello Kafka");
writer.write("Another message");
writer.flush(); // ensure all messages are delivered
API Reference
KafkaReader<T>
Consumer that reads messages from one or more Kafka topics. Generic type T is the deserialized payload type.
Constructor:
| Field | Type | Description |
|---|---|---|
conf |
KafkaConf |
Kafka client configuration (must include bootstrap.servers and group.id) |
topics |
Array<String> |
Topics to subscribe to |
timeout |
duration? |
Read timeout. Defaults to 10s |
Methods:
| Method | Returns | Description |
|---|---|---|
read() |
KafkaMsg<T> |
Read next message. Blocks until available or timeout. Automatically commits offsets. |
KafkaWriter<T>
Producer that writes messages to a Kafka topic. Generic type T is the payload type to serialize.
Constructor:
| Field | Type | Description |
|---|---|---|
conf |
KafkaConf |
Kafka client configuration (must include bootstrap.servers) |
topic |
String |
Target topic name |
Methods:
| Method | Returns | Description |
|---|---|---|
write(value) |
void |
Send a message. Buffered and sent asynchronously. |
flush() |
void |
Block until all buffered messages are delivered. Call before shutdown. |
KafkaMsg<T>
A message received from Kafka.
| Field | Type | Description |
|---|---|---|
topic |
String |
Source topic |
partition |
int |
Partition number |
offset |
int |
Message offset within the partition |
value |
T |
Message payload |
KafkaConf
Configuration for Kafka clients. Maps directly to librdkafka configuration properties. All fields are String? except bootstrap.servers which is required.
Essential settings:
| Property | Required For | Description |
|---|---|---|
bootstrap.servers |
Both | Comma-separated broker list (e.g., "host1:9092,host2:9092") |
group.id |
Consumer | Consumer group name |
auto.offset.reset |
Consumer | Where to start for new groups: "earliest" or "latest" |
client.id |
Both | Client identifier for logging and monitoring |
Security settings:
| Property | Description |
|---|---|
security.protocol |
"PLAINTEXT", "SSL", "SASL_PLAINTEXT", or "SASL_SSL" |
sasl.mechanism |
"PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512", "GSSAPI", "OAUTHBEARER" |
sasl.username |
SASL username |
sasl.password |
SASL password |
ssl.ca.location |
Path to CA certificate file |
ssl.certificate.location |
Path to client certificate |
ssl.key.location |
Path to client private key |
ssl.key.password |
Private key passphrase |
Producer tuning:
| Property | Default | Description |
|---|---|---|
acks |
— | Acknowledgement level: "all", "0", "1" |
linger.ms |
— | Batching delay in milliseconds |
batch.size |
— | Maximum batch size in bytes |
compression.type |
— | "none", "gzip", "snappy", "lz4", "zstd" |
delivery.timeout.ms |
— | Max time for message delivery |
enable.idempotence |
— | Exactly-once delivery guarantee |
retries |
— | Number of send retries |
Consumer tuning:
| Property | Default | Description |
|---|---|---|
auto.commit.interval.ms |
— | Offset auto-commit interval |
fetch.min.bytes |
— | Minimum data per fetch request |
fetch.max.bytes |
— | Maximum data per fetch request |
max.poll.interval.ms |
— | Max interval between polls before rebalance |
session.timeout.ms |
— | Consumer heartbeat session timeout |
heartbeat.interval.ms |
— | Heartbeat frequency |
isolation.level |
— | "read_uncommitted" or "read_committed" |
Examples
Continuous Consumer Loop
var reader = KafkaReader<String> {
conf: KafkaConf {
"bootstrap.servers": "broker1:9092,broker2:9092",
"group.id": "data-ingestion",
"auto.offset.reset": "latest",
},
topics: ["sensor-data"],
timeout: 5_s,
};
while (true) {
var msg = reader.read();
info("[${msg.topic}:${msg.partition}@${msg.offset}] ${msg.value}");
}
Secure Connection with SASL/SSL
var writer = KafkaWriter<String> {
conf: KafkaConf {
"bootstrap.servers": "kafka.example.com:9093",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "api-key",
"sasl.password": "api-secret",
"ssl.ca.location": "/etc/ssl/certs/ca-certificates.crt",
},
topic: "secure-topic",
};
writer.write("encrypted message");
writer.flush();