Producer
The Producer
module provides functionality to send messages to Kafka topics.
Send messages
send
sends messages to a single Kafka topic.
Yield the module to get access to the operations and call the send
.
ts
import { Producer } from "effect-kafka"
import { Effect } from "effect"
// ┌─── Effect.Effect<Producer.Producer.RecordMetadata[], ProducerError, Producer.Producer>
// ▼
const program = Effect.gen(function* () {
const producer = yield* Producer.Producer
return yield* producer.send({
topic: "test-topic",
messages: [{ value: "Hello effect-kafka!" }],
})
})
ts
import { Producer } from "effect-kafka"
import { Effect } from "effect"
// ┌─── Effect.Effect<Producer.Producer.RecordMetadata[], ProducerError, Producer.Producer>
// ▼
const program = Producer.Producer.pipe(
Effect.flatMap((producer) =>
producer.send({
topic: "test-topic",
messages: [{ value: "Hello effect-kafka!" }],
}),
),
)
You can also use module function directly, which is a more concise way to access the producer operations:
ts
import { Producer } from "effect-kafka";
import { Effect } from "effect";
// ┌─── Effect.Effect<Producer.Producer.RecordMetadata[], ProducerError, Producer.Producer>
// ▼
const program = Producer.send({
topic: "test-topic",
messages: [{ value: "Hello, effect-kafka!" }],
})
WARNING
Be cautious when using module functions, as they could lead to requirements leakage when used in downstream services. Read more about Avoiding Requirement Leakage.
Layer
To use the Producer
module operations, you need to provide the Producer
layer. This can be done using the Producer.layer()
function.
The layer accepts ProducerOptions
object to configure the internal producer client. See the ProducerOptions (interface) API reference for more details.
ts
import { NodeRuntime } from "@effect/platform-node"
import { Effect } from "effect"
import { Producer } from "effect-kafka"
import { KafkaJS } from "effect-kafka/KafkaJS"
const program = Producer.send({
topic: "test-topic",
messages: [{ value: "Hello, effect-kafka!" }],
})
const main = program.pipe(
Effect.provide(Producer.layer({ allowAutoTopicCreation: true })),
Effect.provide(KafkaJS.layer({ brokers: ["localhost:19092"] })),
);
NodeRuntime.runMain(main)