Trigger
type: "io.kestra.plugin.pulsar.Trigger"
Consume messages periodically from Pulsar topics and create one execution per batch.
Note that you don't need an extra task to consume the message from the event trigger. The trigger will automatically consume messages and you can retrieve their content in your flow using the {{ trigger.uri }}
variable. If you would like to consume each message from a Pulsar topic in real-time and create one execution per message, you can use the io.kestra.plugin.pulsar.RealtimeTrigger instead.
Examples
id: pulsar_trigger
namespace: company.team
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: "{{ trigger.value }}"
triggers:
- id: trigger
type: io.kestra.plugin.pulsar.Trigger
interval: PT30S
topic: kestra_trigger
uri: pulsar://localhost:26650
deserializer: JSON
subscriptionName: kestra_trigger_sub
Properties
deserializer
- Type: object
- Dynamic: ❓
- Required: ✔️
initialPosition
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
Earliest
- Possible Values:
Latest
Earliest
The position of a subscription to the topic.
pollDuration
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
2.000000000
- Format:
duration
Duration waiting for record to be polled.
If no records are available, the maximum wait to wait for a new record.
subscriptionName
- Type: string
- Dynamic: ✔️
- Required: ✔️
The subscription name.
Using subscription name, we will fetch only records that haven't been consumed yet.
subscriptionType
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
Exclusive
- Possible Values:
Exclusive
Shared
Failover
Key_Shared
The subscription type.
topic
- Type: object
- Dynamic: ✔️
- Required: ✔️
Pulsar topic(s) where to consume messages from.
Can be a string or a list of strings to consume from multiple topics.
uri
- Type: string
- Dynamic: ✔️
- Required: ✔️
Connection URLs.
You need to specify a Pulsar protocol URL.
- Example of localhost:
pulsar://localhost:6650
- If you have multiple brokers:
pulsar://localhost:6650,localhost:6651,localhost:6652
- If you use TLS authentication:
pulsar+ssl://pulsar.us-west.example.com:6651
authenticationToken
- Type: string
- Dynamic: ✔️
- Required: ❌
Authentication token.
Authentication token that can be required by some providers such as Clever Cloud.
conditions
- Type: array
- SubType: Condition
- Dynamic: ❌
- Required: ❌
List of conditions in order to limit the flow trigger.
consumerName
- Type: string
- Dynamic: ✔️
- Required: ❌
The consumer name.
consumerProperties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Add all the properties in the provided map to the consumer.
encryptionKey
- Type: string
- Dynamic: ✔️
- Required: ❌
Add a public encryption key to the producer/consumer.
interval
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
60.000000000
- Format:
duration
Interval between polling.
The interval between 2 different polls of schedule, this can avoid to overload the remote system with too many calls. For most of the triggers that depend on external systems, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval values.
maxDuration
- Type: string
- Dynamic: ❌
- Required: ❌
- Format:
duration
The maximum duration waiting for new record.
It's not a hard limit and is evaluated every second.
maxRecords
- Type: integer
- Dynamic: ❌
- Required: ❌
The maximum number of records to fetch before stopping.
It's not a hard limit and is evaluated every second.
schemaString
- Type: string
- Dynamic: ✔️
- Required: ❌
JSON string of the topic's schema
Required for connecting with topics with a defined schema and strict schema checking
schemaType
- Type: string
- Dynamic: ✔️
- Required: ❌
- Default:
NONE
- Possible Values:
NONE
AVRO
JSON
The schema type of the topic
Can be one of NONE, AVRO or JSON. None means there will be no schema enforced.
stopAfter
- Type: array
- SubType: string
- Dynamic: ❌
- Required: ❌
List of execution states after which a trigger should be stopped (a.k.a. disabled).
tlsOptions
- Type: AbstractPulsarConnection-TlsOptions
- Dynamic: ❌
- Required: ❌
TLS authentication options.
You need to use "pulsar+ssl://" in serviceUrl to enable TLS support.
Outputs
messagesCount
- Type: integer
- Required: ❌
Number of messages consumed.
uri
- Type: string
- Required: ❌
- Format:
uri
URI of a Kestra internal storage file containing the consumed messages.
Definitions
io.kestra.plugin.pulsar.AbstractPulsarConnection-TlsOptions
Properties
ca
- Type: string
- Dynamic: ❓
- Required: ❌
The ca certificate.
Must be a base64-encoded pem file.
cert
- Type: string
- Dynamic: ❓
- Required: ❌
The client certificate.
Must be a base64-encoded pem file.
key
- Type: string
- Dynamic: ❓
- Required: ❌
The key certificate.
Must be a base64-encoded pem file.
Was this page helpful?