RealtimeTrigger
RealtimeTrigger
type: "io.kestra.plugin.debezium.postgres.RealtimeTrigger"
Consume a message in real-time from a PostgreSQL database via change data capture and create one execution per row.
If you would like to consume multiple messages processed within a given time frame and process them in batch, you can use the io.kestra.plugin.debezium.postgres.Trigger instead.
Examples
Consume a message from a PostgreSQL database via change data capture in real-time.
id: debezium-postgres
namespace: company.team
tasks:
- id: send_data
type: io.kestra.plugin.core.log.Log
message: "{{ trigger.data }}"
triggers:
- id: realtime
type: io.kestra.plugin.debezium.postgres.RealtimeTrigger
database: postgres
hostname: 127.0.0.1
port: 65432
username: postgres
password: pg_passwd
Properties
database
- Type: string
- Dynamic: ✔️
- Required: ✔️
The name of the PostgreSQL database from which to stream the changes.
deleted
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
ADD_FIELD
- Possible Values:
ADD_FIELD
NULL
DROP
Specify how to handle deleted rows.
Possible settings are:
ADD_FIELD
: Add a deleted field as boolean.NULL
: Send a row with all values as null.DROP
: Don't send deleted row.
deletedFieldName
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
deleted
The name of deleted field if deleted is ADD_FIELD
.
format
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
INLINE
- Possible Values:
RAW
INLINE
WRAP
The format of the output.
Possible settings are:
RAW
: Send raw data from debezium.INLINE
: Send a row like in the source with only data (remove after & before), all the columns will be present for each row.WRAP
: Send a row like INLINE but wrapped in arecord
field.
hostname
- Type: string
- Dynamic: ✔️
- Required: ✔️
Hostname of the remote server.
ignoreDdl
- Type: boolean
- Dynamic: ❌
- Required: ✔️
- Default:
true
Ignore DDL statement.
Ignore CREATE, ALTER, DROP and TRUNCATE operations.
key
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
ADD_FIELD
- Possible Values:
ADD_FIELD
DROP
Specify how to handle key.
Possible settings are:
ADD_FIELD
: Add key(s) merged with columns.DROP
: Drop keys.
metadata
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
ADD_FIELD
- Possible Values:
ADD_FIELD
DROP
Specify how to handle metadata.
Possible settings are:
ADD_FIELD
: Add metadata in a column namedmetadata
.DROP
: Drop metadata.
metadataFieldName
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
metadata
The name of metadata field if metadata is ADD_FIELD
.
pluginName
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
PGOUTPUT
- Possible Values:
DECODERBUFS
WAL2JSON
WAL2JSON_RDS
WAL2JSON_STREAMING
WAL2JSON_RDS_STREAMING
PGOUTPUT
The name of the PostgreSQL logical decoding plug-in installed on the PostgreSQL server.
If you are using a
wal2json
plug-in and transactions are very large, the JSON batch event that contains all transaction changes might not fit into the hard-coded memory buffer, which has a size of 1 GB. In such cases, switch to a streaming plug-in, by setting the plugin-name property towal2json_streaming
orwal2json_rds_streaming
. With a streaming plug-in, PostgreSQL sends the connector a separate message for each change in a transaction.
port
- Type: string
- Dynamic: ✔️
- Required: ✔️
Port of the remote server.
publicationName
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
kestra_publication
The name of the PostgreSQL publication created for streaming changes when using PGOUTPUT
.
This publication is created at start-up if it does not already exist and it includes all tables. Debezium then applies its own include/exclude list filtering, if configured, to limit the publication to change events for the specific tables of interest. The connector user must have superuser permissions to create this publication, so it is usually preferable to create the publication before starting the connector for the first time.
If the publication already exists, either for all tables or configured with a subset of tables, Debezium uses the publication as it is defined.
slotName
- Type: string
- Dynamic: ✔️
- Required: ✔️
- Default:
kestra
The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema.
The server uses this slot to stream events to the Debezium connector that you are configuring. Slot names must conform to PostgreSQL replication slot naming rules, which state: "Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."
snapshotMode
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
INITIAL
- Possible Values:
INITIAL
ALWAYS
NEVER
INITIAL_ONLY
Specifies the criteria for running a snapshot when the connector starts.
Possible settings are:
INITIAL
: The connector performs a snapshot only when no offsets have been recorded for the logical server name.ALWAYS
: The connector performs a snapshot each time the connector starts.NEVER
: The connector never performs snapshots. When a connector is configured this way, its behavior when it starts is as follows. If there is a previously stored LSN, the connector continues streaming changes from that position. If no LSN has been stored, the connector starts streaming changes from the point in time when the PostgreSQL logical replication slot was created on the server. The never snapshot mode is useful only when you know all data of interest is still reflected in the WAL.INITIAL_ONLY
: The connector performs an initial snapshot and then stops, without processing any subsequent changes.
splitTable
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
TABLE
- Possible Values:
OFF
DATABASE
TABLE
Split table on separate output uris
.
Possible settings are:
TABLE
: This will split all rows by tables on output with namedatabase.table
DATABASE
: This will split all rows by databases on output with namedatabase
.OFF
: This will NOT split all rows resulting in a singledata
output.
stateName
- Type: string
- Dynamic: ❌
- Required: ✔️
- Default:
debezium-state
The name of the Debezium state file stored in the KV Store for that namespace.
conditions
- Type: array
- SubType: Condition
- Dynamic: ❌
- Required: ❌
List of conditions in order to limit the flow trigger.
excludedColumns
- Type: object
- Dynamic: ✔️
- Required: ❌
An optional, comma-separated list of regular expressions that match the fully-qualified names of columns to include in change event record values.
Fully-qualified names for columns are of the form databaseName.tableName.columnName. Do not also specify the
includedColumns
connector configuration property."
excludedDatabases
- Type: object
- Dynamic: ✔️
- Required: ❌
An optional, comma-separated list of regular expressions that match the names of databases for which you do not want to capture changes.
The connector captures changes in any database whose name is not in the
excludedDatabases
. Do not also set theincludedDatabases
connector configuration property.
excludedTables
- Type: object
- Dynamic: ✔️
- Required: ❌
An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes you do not want to capture.
The connector captures changes in any table not included in
excludedTables
. Each identifier is of the form databaseName.tableName. Do not also specify theincludedTables
connector configuration property.
includedColumns
- Type: object
- Dynamic: ✔️
- Required: ❌
An optional, comma-separated list of regular expressions that match the fully-qualified names of columns to exclude from change event record values.
Fully-qualified names for columns are of the form databaseName.tableName.columnName. Do not also specify the
excludedColumns
connector configuration property.
includedDatabases
- Type: object
- Dynamic: ✔️
- Required: ❌
An optional, comma-separated list of regular expressions that match the names of the databases for which to capture changes.
The connector does not capture changes in any database whose name is not in
includedDatabases
. By default, the connector captures changes in all databases. Do not also set theexcludedDatabases
connector configuration property.
includedTables
- Type: object
- Dynamic: ✔️
- Required: ❌
An optional, comma-separated list of regular expressions that match fully-qualified table identifiers of tables whose changes you want to capture.
The connector does not capture changes in any table not included in
includedTables
. Each identifier is of the form databaseName.tableName. By default, the connector captures changes in every non-system table in each database whose changes are being captured. Do not also specify theexcludedTables
connector configuration property.
offsetsCommitMode
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
ON_EACH_BATCH
- Possible Values:
ON_EACH_BATCH
ON_STOP
How to commit the offsets to the KV Store.
Possible values are:
- ON_EACH_BATCH: after each batch of records consumed by this trigger, the offsets will be stored in the KV Store. This avoids any duplicated records being consumed but can be costly if many events are produced.
- ON_STOP: when this trigger is stopped or killed, the offsets will be stored in the KV Store. This avoid any un-necessary writes to the KV Store, but if the trigger is not stopped gracefully, the KV Store value may not be updated leading to duplicated records consumption.
password
- Type: string
- Dynamic: ✔️
- Required: ❌
Password on the remote server.
properties
- Type: object
- SubType: string
- Dynamic: ✔️
- Required: ❌
Additional configuration properties.
Any additional configuration properties that is valid for the current driver.
sslCert
- Type: string
- Dynamic: ✔️
- Required: ❌
The SSL certificate for the client.
Must be a PEM encoded certificate.
sslKey
- Type: string
- Dynamic: ✔️
- Required: ❌
The SSL private key of the client.
Must be a PEM encoded key.
sslKeyPassword
- Type: string
- Dynamic: ✔️
- Required: ❌
The password to access the client private key sslKey
.
sslMode
- Type: string
- Dynamic: ❌
- Required: ❌
- Default:
DISABLE
- Possible Values:
DISABLE
REQUIRE
VERIFY_CA
VERIFY_FULL
**Whether to use an encrypted connection to the PostgreSQL server. Options include:
DISABLE
uses an unencrypted connection.REQUIRE
uses a secure (encrypted) connection, and fails if one cannot be established.VERIFY_CA
behaves like require but also verifies the server TLS certificate against the configured Certificate Authority (CA) certificates, or fails if no valid matching CA certificates are found.VERIFY_FULL
behaves like verify-ca but also verifies that the server certificate matches the host to which the connector is trying to connect.
See the PostgreSQL documentation for more information.**
sslRootCert
- Type: string
- Dynamic: ✔️
- Required: ❌
The root certificate(s) against which the server is validated.
Must be a PEM encoded certificate.
stopAfter
- Type: array
- SubType: string
- Dynamic: ❌
- Required: ❌
List of execution states after which a trigger should be stopped (a.k.a. disabled).
username
- Type: string
- Dynamic: ✔️
- Required: ❌
Username on the remote server.
Outputs
data
- Type: object
- Required: ❌
stream
- Type: string
- Required: ❌
Definitions
Was this page helpful?