effect-kafka
0.9.0
Minor Changes
1493848
Thanks @floydspace! - Implement methods for defining and parsing kafka messages with effect SchemaHo to use:
- When you parse raw kafka message, you can use the
MessageRouter.schemaRaw
in combination withConsumerSchema
helper methods to define the schema of the message.
tsimport { Console, Effect, Schema } from "effect"; import { Consumer, ConsumerSchema, MessageRouter } from "effect-kafka"; const ConsumerLive = MessageRouter.empty.pipe( MessageRouter.subscribe( "test-topic", MessageRouter.schemaRaw( Schema.Struct({ topic: Schema.Literal("test-topic"), partition: Schema.Number, offset: Schema.NumberFromString, key: Schema.NullOr(ConsumerSchema.Number), value: ConsumerSchema.String, }), ).pipe( Effect.flatMap(({ topic, partition, ...message }) => Console.log({ topic, partition, offset: message.offset, key: message.key, value: message.value, }), ), ), ), Consumer.serve({ groupId: "group" }), );
- When you parse raw kafka message, you can use the
MessageRouter.schemaJson
if you expect kafka messagevalue
to be a JSON string. Sovalue
schema property can be defined asSchema.Struct
.
tsimport { Console, Effect, Schema } from "effect"; import { Consumer, ConsumerSchema, MessageRouter } from "effect-kafka"; const ConsumerLive = MessageRouter.empty.pipe( MessageRouter.subscribe( "test-topic", MessageRouter.schemaJson( Schema.Struct({ topic: Schema.Literal("test-topic"), partition: Schema.Number, offset: Schema.NumberFromString, key: Schema.NullOr(ConsumerSchema.Number), value: Schema.Struct({ message: Schema.String }), }), ).pipe( Effect.flatMap(({ topic, partition, ...message }) => Console.log({ topic, partition, offset: message.offset, key: message.key, value: message.value.message, }), ), ), ), Consumer.serve({ groupId: "group" }), );
- When you parse only kafka message raw
value
as raw value, you can use theConsumerRecord.schemaValueRaw
in combination withConsumerSchema
helper methods to define the schema of the message value.
tsimport { Console, Effect } from "effect"; import { Consumer, ConsumerRecord, ConsumerSchema, MessageRouter } from "effect-kafka"; const ConsumerLive = MessageRouter.empty.pipe( MessageRouter.subscribe( "customers", ConsumerRecord.schemaValueRaw(ConsumerSchema.String).pipe(Effect.flatMap((value) => Console.log(value))), ), Consumer.serve({ groupId: "group" }), );
- When you parse only kafka message
value
as json value, you can use theConsumerRecord.schemaValueJson
. Sovalue
schema can be defined asSchema.Struct
.
tsimport { Console, Effect } from "effect"; import { Consumer, ConsumerRecord, MessageRouter } from "effect-kafka"; const ConsumerLive = MessageRouter.empty.pipe( MessageRouter.subscribe( "customers", ConsumerRecord.schemaValueJson(Schema.Struct({ message: Schema.String })).pipe( Effect.flatMap((value) => Console.log(value)), ), ), Consumer.serve({ groupId: "group" }), );
- When you parse raw kafka message, you can use the
Patch Changes
f564387
Thanks @floydspace! - upgrade @confluentinc/kafka-javascript to the stable v1
0.8.0
Minor Changes
- #32
3fab074
Thanks @floydspace! - Implement mvp platformatic kafka instance
0.7.1
Patch Changes
#34
936a300
Thanks @floydspace! - fix hanging stream when handler failscloses #33
0.7.0
Minor Changes
0.6.0
Minor Changes
- #29
d3c8fc0
Thanks @floydspace! - define UnknownProducerError and use it as fallback for all errors happend in producer engine
Patch Changes
#29
d3c8fc0
Thanks @floydspace! - Poll and retry in case QueueFull error raised#20
4c5f113
Thanks @floydspace! - implement an ability to catch errors raised in MessageRouter handler
0.5.2
Patch Changes
- #25
21df29a
Thanks @floydspace! - fix esm distro
0.5.1
Patch Changes
d9a8d4d
Thanks @floydspace! - fix bug in internal types
0.5.0
Minor Changes
- #22
42f2df9
Thanks @floydspace! - Improve build configuration, expose sub packages and fix optional peer dependencies bug, closes #21
0.4.3
Patch Changes
#18
64bbd83
Thanks @floydspace! - proxy more config options for rdkafka7bd1b0c
Thanks @floydspace! - implement in memory kafka instance, closes #9
0.4.2
Patch Changes
0ef48b3
Thanks @floydspace! - forward rdkafka logs, useLogger.withMinimumLogLevel
to enable desired log levels, closes #11
0.4.1
Patch Changes
b72be89
Thanks @floydspace! - fix bug when message offered to the queue was not suspended in case of back pressureb72be89
Thanks @floydspace! - add layerConfig for kafka instance implementations
0.4.0
Minor Changes
4783992
Thanks @floydspace! - rename producersend
assendScoped
, and define differentsend
signature
Patch Changes
af964dd
Thanks @floydspace! - allow wider range of kafka engines versions
0.3.2
Patch Changes
dd8dea2
Thanks @floydspace! - remove unnecessary peer to @effect/platform16610c1
Thanks @floydspace! - run consumer scoped
0.3.1
Patch Changes
5f5b329
Thanks @floydspace! - fix consumer layers usage
0.3.0
Minor Changes
c2902e0
Thanks @floydspace! - - renameMessagePayload
toConsumerRecord
similar how it is named inzio-kafka
- handle message batches manually
#4
f5d97d2
Thanks @floydspace! - implement stream consumer allow handling commits manually
0.2.0
Minor Changes
ce326b9
Thanks @floydspace! - implement more clean layer configuration, add confluent kafka examples801815b
Thanks @floydspace! - improve connection error handling58a3803
Thanks @floydspace! - implement producer
0.1.0
Minor Changes
ea84542
Thanks @floydspace! - release first prototype