Skip to content

Getting Started

Let's write a simple Kafka producer and consumer using effect-kafka. Make a new folder for your sample program and follow these steps:

Step 1: Set up Kafka

Before everything, we need a running instance of Kafka. We can do that by saving the following docker-compose script in the docker-compose.yml file:

yaml
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

and run the docker compose:

sh
docker compose up -d

Step 2: Install Effect Kafka

Install effect-kafka, peer dependencies and a Kafka engine package of your choice.

sh
pnpm add effect @effect/platform-node effect-kafka kafkajs

TIP

You can use any of the supported Kafka engines, depending on your preference.

Step 3: Implement a program

Here's a simple example of a Kafka producer and consumer using effect-kafka with KafkaJS:

ts
import { NodeRuntime } from "@effect/platform-node";
import { Console, Effect, Layer, Random, Schedule, Stream } from "effect";
import { Consumer, Producer } from "effect-kafka";
import { KafkaJS } from "effect-kafka/KafkaJS";

const producer = Stream.repeatEffect(Random.nextInt).pipe(
  Stream.schedule(Schedule.fixed("2 seconds")),
  Stream.flatMap((random) =>
    Producer.send({
      topic: "random",
      messages: [{ key: String(random % 4), value: random.toString() }],
    }),
  ),
);

const consumer = Consumer.serveStream("random").pipe(
  Stream.tap((record) => Console.log(record.value?.toString()))
);

const program = Stream.merge(producer, consumer).pipe(Stream.runDrain);

const ProducerLive = Producer.layer({ allowAutoTopicCreation: true });
const ConsumerLive = Consumer.layer({ groupId: "group" });

const KafkaLive = KafkaJS.layer({ brokers: ["localhost:29092"] });
const MainLive = program.pipe(
  Effect.provide(Layer.merge(ProducerLive, ConsumerLive)),
  Effect.provide(KafkaLive)
);

NodeRuntime.runMain(MainLive);

Step 4: Run the program

Now, we can run our effect-kafka program:

sh
pnpm tsx main.ts

Released under the MIT License.