effect-kafka
0.9.0
Minor Changes
1493848Thanks @floydspace! - Implement methods for defining and parsing kafka messages with effect SchemaHo to use:
- When you parse raw kafka message, you can use the
MessageRouter.schemaRawin combination withConsumerSchemahelper methods to define the schema of the message.
tsimport { Console, Effect, Schema } from "effect"; import { Consumer, ConsumerSchema, MessageRouter } from "effect-kafka"; const ConsumerLive = MessageRouter.empty.pipe( MessageRouter.subscribe( "test-topic", MessageRouter.schemaRaw( Schema.Struct({ topic: Schema.Literal("test-topic"), partition: Schema.Number, offset: Schema.NumberFromString, key: Schema.NullOr(ConsumerSchema.Number), value: ConsumerSchema.String, }), ).pipe( Effect.flatMap(({ topic, partition, ...message }) => Console.log({ topic, partition, offset: message.offset, key: message.key, value: message.value, }), ), ), ), Consumer.serve({ groupId: "group" }), );- When you parse raw kafka message, you can use the
MessageRouter.schemaJsonif you expect kafka messagevalueto be a JSON string. Sovalueschema property can be defined asSchema.Struct.
tsimport { Console, Effect, Schema } from "effect"; import { Consumer, ConsumerSchema, MessageRouter } from "effect-kafka"; const ConsumerLive = MessageRouter.empty.pipe( MessageRouter.subscribe( "test-topic", MessageRouter.schemaJson( Schema.Struct({ topic: Schema.Literal("test-topic"), partition: Schema.Number, offset: Schema.NumberFromString, key: Schema.NullOr(ConsumerSchema.Number), value: Schema.Struct({ message: Schema.String }), }), ).pipe( Effect.flatMap(({ topic, partition, ...message }) => Console.log({ topic, partition, offset: message.offset, key: message.key, value: message.value.message, }), ), ), ), Consumer.serve({ groupId: "group" }), );- When you parse only kafka message raw
valueas raw value, you can use theConsumerRecord.schemaValueRawin combination withConsumerSchemahelper methods to define the schema of the message value.
tsimport { Console, Effect } from "effect"; import { Consumer, ConsumerRecord, ConsumerSchema, MessageRouter } from "effect-kafka"; const ConsumerLive = MessageRouter.empty.pipe( MessageRouter.subscribe( "customers", ConsumerRecord.schemaValueRaw(ConsumerSchema.String).pipe(Effect.flatMap((value) => Console.log(value))), ), Consumer.serve({ groupId: "group" }), );- When you parse only kafka message
valueas json value, you can use theConsumerRecord.schemaValueJson. Sovalueschema can be defined asSchema.Struct.
tsimport { Console, Effect } from "effect"; import { Consumer, ConsumerRecord, MessageRouter } from "effect-kafka"; const ConsumerLive = MessageRouter.empty.pipe( MessageRouter.subscribe( "customers", ConsumerRecord.schemaValueJson(Schema.Struct({ message: Schema.String })).pipe( Effect.flatMap((value) => Console.log(value)), ), ), Consumer.serve({ groupId: "group" }), );- When you parse raw kafka message, you can use the
Patch Changes
f564387Thanks @floydspace! - upgrade @confluentinc/kafka-javascript to the stable v1
0.8.0
Minor Changes
- #32
3fab074Thanks @floydspace! - Implement mvp platformatic kafka instance
0.7.1
Patch Changes
#34
936a300Thanks @floydspace! - fix hanging stream when handler failscloses #33
0.7.0
Minor Changes
0.6.0
Minor Changes
- #29
d3c8fc0Thanks @floydspace! - define UnknownProducerError and use it as fallback for all errors happend in producer engine
Patch Changes
#29
d3c8fc0Thanks @floydspace! - Poll and retry in case QueueFull error raised#20
4c5f113Thanks @floydspace! - implement an ability to catch errors raised in MessageRouter handler
0.5.2
Patch Changes
- #25
21df29aThanks @floydspace! - fix esm distro
0.5.1
Patch Changes
d9a8d4dThanks @floydspace! - fix bug in internal types
0.5.0
Minor Changes
- #22
42f2df9Thanks @floydspace! - Improve build configuration, expose sub packages and fix optional peer dependencies bug, closes #21
0.4.3
Patch Changes
#18
64bbd83Thanks @floydspace! - proxy more config options for rdkafka7bd1b0cThanks @floydspace! - implement in memory kafka instance, closes #9
0.4.2
Patch Changes
0ef48b3Thanks @floydspace! - forward rdkafka logs, useLogger.withMinimumLogLevelto enable desired log levels, closes #11
0.4.1
Patch Changes
b72be89Thanks @floydspace! - fix bug when message offered to the queue was not suspended in case of back pressureb72be89Thanks @floydspace! - add layerConfig for kafka instance implementations
0.4.0
Minor Changes
4783992Thanks @floydspace! - rename producersendassendScoped, and define differentsendsignature
Patch Changes
af964ddThanks @floydspace! - allow wider range of kafka engines versions
0.3.2
Patch Changes
dd8dea2Thanks @floydspace! - remove unnecessary peer to @effect/platform16610c1Thanks @floydspace! - run consumer scoped
0.3.1
Patch Changes
5f5b329Thanks @floydspace! - fix consumer layers usage
0.3.0
Minor Changes
c2902e0Thanks @floydspace! - - renameMessagePayloadtoConsumerRecordsimilar how it is named inzio-kafka- handle message batches manually
#4
f5d97d2Thanks @floydspace! - implement stream consumer allow handling commits manually
0.2.0
Minor Changes
ce326b9Thanks @floydspace! - implement more clean layer configuration, add confluent kafka examples801815bThanks @floydspace! - improve connection error handling58a3803Thanks @floydspace! - implement producer
0.1.0
Minor Changes
ea84542Thanks @floydspace! - release first prototype