Trigger Trigger

yaml
type: "io.kestra.plugin.azure.eventhubs.Trigger"

Consume messages periodically from Azure Event Hubs and create one execution per batch.

If you would like to consume each message from Azure Event Hubs in real-time and create one execution per message, you can use the io.kestra.plugin.azure.eventhubs.RealtimeTrigger instead.

Examples

Trigger flow based on events received from Azure Event Hubs in batch.

yaml
id: azure_eventhubs_trigger
namespace: company.team

tasks:
  - id: log
    type: io.kestra.plugin.core.log.Log
    message: Hello there! I received {{ trigger.eventsCount }} from Azure EventHubs!

triggers:
  - id: read_from_eventhub
    type: io.kestra.plugin.azure.eventhubs.Trigger
    interval: PT30S
    eventHubName: my_eventhub
    namespace: my_eventhub_namespace
    connectionString: "{{ secret('EVENTHUBS_CONNECTION') }}"
    bodyDeserializer: JSON
    consumerGroup: "$Default"
    checkpointStoreProperties:
      containerName: kestra
      connectionString: "{{ secret('BLOB_CONNECTION') }}"

Properties

eventHubName

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

The event hub to read from.

namespace

  • Type: string
  • Dynamic: ✔️
  • Required: ✔️

Namespace name of the event hub to connect to.

bodyDeserializer

  • Type: string
  • Dynamic:
  • Required:
  • Default: STRING
  • Possible Values:
    • STRING
    • BINARY
    • ION
    • JSON

The Deserializer to be used for serializing the event value.

bodyDeserializerProperties

  • Type: object
  • Dynamic:
  • Required:
  • Default: {}

The config properties to be passed to the Deserializer.

Configs in key/value pairs.

checkpointStoreProperties

  • Type: object
  • SubType: string
  • Dynamic:
  • Required:
  • Default: {}

The config properties to be used for configuring the BlobCheckpointStore.

Azure Event Hubs Checkpoint Store can be used for storing checkpoints while processing events from Azure Event Hubs.

clientMaxRetries

  • Type: integer
  • Dynamic:
  • Required:
  • Default: 5

The maximum number of retry attempts before considering a client operation to have failed.

clientRetryDelay

  • Type: integer
  • Dynamic:
  • Required:
  • Default: 500

The maximum permissible delay between retry attempts in milliseconds.

conditions

  • Type: array
  • SubType: Condition
  • Dynamic:
  • Required:

List of conditions in order to limit the flow trigger.

connectionString

  • Type: string
  • Dynamic: ✔️
  • Required:

Connection string of the Storage Account.

consumerGroup

  • Type: string
  • Dynamic:
  • Required:
  • Default: $Default

The consumer group.

customEndpointAddress

  • Type: string
  • Dynamic: ✔️
  • Required:

Custom endpoint address when connecting to the Event Hubs service.

enqueueTime

  • Type: string
  • Dynamic:
  • Required:

The ISO Datetime to be used when PartitionStartingPosition is configured to INSTANT.

Configs in key/value pairs.

interval

  • Type: string
  • Dynamic:
  • Required:
  • Default: 60.000000000
  • Format: duration

Interval between polling.

The interval between 2 different polls of schedule, this can avoid to overload the remote system with too many calls. For most of the triggers that depend on external systems, a minimal interval must be at least PT30S. See ISO_8601 Durations for more information of available interval values.

maxBatchSizePerPartition

  • Type: integer
  • Dynamic:
  • Required:
  • Default: 50

The maximum number of events to consume per event hub partition per poll.

maxDuration

  • Type: string
  • Dynamic:
  • Required:
  • Default: 10.000000000
  • Format: duration

The max time duration to wait to receive events from all partitions.

maxWaitTimePerPartition

  • Type: string
  • Dynamic:
  • Required:
  • Default: 5.000000000
  • Format: duration

The max time duration to wait to receive a batch of events up to the maxBatchSizePerPartition.

partitionStartingPosition

  • Type: string
  • Dynamic:
  • Required:
  • Default: EARLIEST
  • Possible Values:
    • EARLIEST
    • LATEST
    • INSTANT

The starting position.

sasToken

  • Type: string
  • Dynamic: ✔️
  • Required:

The SAS token to use for authenticating requests.

This string should only be the query parameters (with or without a leading '?') and not a full URL.

sharedKeyAccountAccessKey

  • Type: string
  • Dynamic: ✔️
  • Required:

Shared Key access key for authenticating requests.

sharedKeyAccountName

  • Type: string
  • Dynamic: ✔️
  • Required:

Shared Key account name for authenticating requests.

stopAfter

  • Type: array
  • SubType: string
  • Dynamic:
  • Required:

List of execution states after which a trigger should be stopped (a.k.a. disabled).

Outputs

eventsCount

  • Type: integer
  • Required:

Number of events consumed from Azure Event Hubs.

uri

  • Type: string
  • Required:
  • Format: uri

URI of a kestra internal storage file containing the messages.

Definitions

Was this page helpful?