Trigger
type: "io.kestra.plugin.amqp.Trigger"
Consume messages periodically from a AMQP queue and create one execution per batch.
Note that you don't need an extra task to consume the message from the event trigger. The trigger will automatically consume messages and you can retrieve their content in your flow using the {{ trigger.uri }}
variable. If you would like to consume each message from a AMQP queue in real-time and create one execution per message, you can use the io.kestra.plugin.amqp.RealtimeTrigger instead.
Examples
id: amqp_trigger
namespace: company.team
tasks:
- id: trigger
type: io.kestra.plugin.amqp.Trigger
url: amqp://guest:guest@localhost:5672/my_vhost
maxRecords: 2
queue: amqpTrigger.queue
Properties
consumerTag
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
Kestra
A client-generated consumer tag to establish context.
queue
- Type: string
- Dynamic: ✔️
- Required: ✔️
The queue to pull messages from.
serdeType
- Type: string
- Dynamic: ❓
- Required: ✔️
- Default:
STRING
- Possible Values:
STRING
JSON
Serializer / Deserializer used for the message.
conditions
- Type: array
- SubType: Condition
- Dynamic: ❌
- Required: ❌
List of conditions in order to limit the flow trigger.
host
- Type: string
- Dynamic: ✔️
- Required: ❌
The broker host.
interval
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
60.000000000
- Format:
duration
Interval between polling.
The interval between 2 different polls of schedule, this can avoid to overload the remote system with too many calls. For most of the triggers that depend on external systems, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval values.
maxDuration
- Type: string
- Dynamic: ❓
- Required: ❌
- Format:
duration
The maximum duration to wait for new rows.
It's not an hard limit and is evaluated every second.
maxRecords
- Type: integer
- Dynamic: ❓
- Required: ❌
The maximum number of rows to fetch before stopping.
It's not an hard limit and is evaluated every second.
password
- Type: string
- Dynamic: ✔️
- Required: ❌
The broker password.
port
- Type: string
- Dynamic: ✔️
- Required: ❌
The broker port.
stopAfter
- Type: array
- SubType: string
- Dynamic: ❌
- Required: ❌
List of execution states after which a trigger should be stopped (a.k.a. disabled).
username
- Type: string
- Dynamic: ✔️
- Required: ❌
The broker username.
virtualHost
- Type: string
- Dynamic: ✔️
- Required: ❌
The broker virtual host.
Outputs
count
- Type: integer
- Required: ❌
Number of rows consumed.
uri
- Type: string
- Required: ❌
- Format:
uri
File URI containing consumed messages.
Definitions
Was this page helpful?