Skip to content

effect-kafka

0.9.0

Minor Changes

  • 1493848 Thanks @floydspace! - Implement methods for defining and parsing kafka messages with effect Schema

    Ho to use:

    1. When you parse raw kafka message, you can use the MessageRouter.schemaRaw in combination with ConsumerSchema helper methods to define the schema of the message.
    ts
    import { Console, Effect, Schema } from "effect";
    import { Consumer, ConsumerSchema, MessageRouter } from "effect-kafka";
    
    const ConsumerLive = MessageRouter.empty.pipe(
      MessageRouter.subscribe(
        "test-topic",
        MessageRouter.schemaRaw(
          Schema.Struct({
            topic: Schema.Literal("test-topic"),
            partition: Schema.Number,
            offset: Schema.NumberFromString,
            key: Schema.NullOr(ConsumerSchema.Number),
            value: ConsumerSchema.String,
          }),
        ).pipe(
          Effect.flatMap(({ topic, partition, ...message }) =>
            Console.log({
              topic,
              partition,
              offset: message.offset,
              key: message.key,
              value: message.value,
            }),
          ),
        ),
      ),
      Consumer.serve({ groupId: "group" }),
    );
    1. When you parse raw kafka message, you can use the MessageRouter.schemaJson if you expect kafka message value to be a JSON string. So value schema property can be defined as Schema.Struct.
    ts
    import { Console, Effect, Schema } from "effect";
    import { Consumer, ConsumerSchema, MessageRouter } from "effect-kafka";
    
    const ConsumerLive = MessageRouter.empty.pipe(
      MessageRouter.subscribe(
        "test-topic",
        MessageRouter.schemaJson(
          Schema.Struct({
            topic: Schema.Literal("test-topic"),
            partition: Schema.Number,
            offset: Schema.NumberFromString,
            key: Schema.NullOr(ConsumerSchema.Number),
            value: Schema.Struct({ message: Schema.String }),
          }),
        ).pipe(
          Effect.flatMap(({ topic, partition, ...message }) =>
            Console.log({
              topic,
              partition,
              offset: message.offset,
              key: message.key,
              value: message.value.message,
            }),
          ),
        ),
      ),
      Consumer.serve({ groupId: "group" }),
    );
    1. When you parse only kafka message raw value as raw value, you can use the ConsumerRecord.schemaValueRaw in combination with ConsumerSchema helper methods to define the schema of the message value.
    ts
    import { Console, Effect } from "effect";
    import { Consumer, ConsumerRecord, ConsumerSchema, MessageRouter } from "effect-kafka";
    
    const ConsumerLive = MessageRouter.empty.pipe(
      MessageRouter.subscribe(
        "customers",
        ConsumerRecord.schemaValueRaw(ConsumerSchema.String).pipe(Effect.flatMap((value) => Console.log(value))),
      ),
      Consumer.serve({ groupId: "group" }),
    );
    1. When you parse only kafka message value as json value, you can use the ConsumerRecord.schemaValueJson. So value schema can be defined as Schema.Struct.
    ts
    import { Console, Effect } from "effect";
    import { Consumer, ConsumerRecord, MessageRouter } from "effect-kafka";
    
    const ConsumerLive = MessageRouter.empty.pipe(
      MessageRouter.subscribe(
        "customers",
        ConsumerRecord.schemaValueJson(Schema.Struct({ message: Schema.String })).pipe(
          Effect.flatMap((value) => Console.log(value)),
        ),
      ),
      Consumer.serve({ groupId: "group" }),
    );

Patch Changes

0.8.0

Minor Changes

0.7.1

Patch Changes

0.7.0

Minor Changes

0.6.0

Minor Changes

  • #29 d3c8fc0 Thanks @floydspace! - define UnknownProducerError and use it as fallback for all errors happend in producer engine

Patch Changes

0.5.2

Patch Changes

0.5.1

Patch Changes

0.5.0

Minor Changes

  • #22 42f2df9 Thanks @floydspace! - Improve build configuration, expose sub packages and fix optional peer dependencies bug, closes #21

0.4.3

Patch Changes

0.4.2

Patch Changes

  • 0ef48b3 Thanks @floydspace! - forward rdkafka logs, use Logger.withMinimumLogLevel to enable desired log levels, closes #11

0.4.1

Patch Changes

  • b72be89 Thanks @floydspace! - fix bug when message offered to the queue was not suspended in case of back pressure

  • b72be89 Thanks @floydspace! - add layerConfig for kafka instance implementations

0.4.0

Minor Changes

  • 4783992 Thanks @floydspace! - rename producer send as sendScoped, and define different send signature

Patch Changes

0.3.2

Patch Changes

0.3.1

Patch Changes

0.3.0

Minor Changes

  • c2902e0 Thanks @floydspace! - - rename MessagePayload to ConsumerRecord similar how it is named in zio-kafka

    • handle message batches manually
  • #4 f5d97d2 Thanks @floydspace! - implement stream consumer allow handling commits manually

0.2.0

Minor Changes

0.1.0

Minor Changes

Released under the MIT License.