RealtimeTrigger
yaml
type: "io.kestra.plugin.kafka.RealtimeTrigger"
Consume a message from a Kafka topic in real time.
yaml
id: kafka_realtime_trigger
namespace: company.team
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: "{{ trigger.value }}"
triggers:
- id: realtime_trigger
type: io.kestra.plugin.kafka.RealtimeTrigger
topic: test_kestra
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO
groupId: kafkaConsumerGroupId
Use Kafka Realtime Trigger to push events into MongoDB
yaml
id: kafka_realtime_trigger
namespace: company.team
tasks:
- id: insert_into_mongodb
type: io.kestra.plugin.mongodb.InsertOne
connection:
uri: mongodb://mongoadmin:secret@localhost:27017/?authSource=admin
database: kestra
collection: products
document: |
{
"product_id": "{{ trigger.value | jq('.product_id') | first }}",
"product_name": "{{ trigger.value | jq('.product_name') | first }}",
"category": "{{ trigger.value | jq('.product_category') | first }}",
"brand": "{{ trigger.value | jq('.brand') | first }}"
}
triggers:
- id: realtime_trigger
type: io.kestra.plugin.kafka.RealtimeTrigger
topic: products
properties:
bootstrap.servers: localhost:9092
serdeProperties:
valueDeserializer: JSON
groupId: kestraConsumer
Dynamic
YES
SubType string
Dynamic
YES
Dynamic
YES
Default
STRING
Possible Values
STRING
INTEGER
FLOAT
DOUBLE
LONG
SHORT
BYTE_ARRAY
BYTE_BUFFER
BYTES
UUID
VOID
AVRO
JSON
Dynamic
YES
Default
STRING
Possible Values
STRING
INTEGER
FLOAT
DOUBLE
LONG
SHORT
BYTE_ARRAY
BYTE_BUFFER
BYTES
UUID
VOID
AVRO
JSON
SubType integer
Dynamic
YES
SubType string
Dynamic
YES
Default
{}
Dynamic
YES
SubType string
Dynamic
NO
Possible Values
CREATED
RUNNING
PAUSED
RESTARTED
KILLING
SUCCESS
WARNING
FAILED
KILLED
CANCELLED
QUEUED
RETRYING
RETRIED
SKIPPED
List of execution states after which a trigger should be stopped (a.k.a. disabled).
Dynamic
NO
Dynamic
YES