Blueprints

Anomaly detection using DuckDB SQL query and S3 file event trigger, sending a CSV file attachment via email if anomalies are detected

About this blueprint

S3 Trigger Notifications DuckDB

This flow will be triggered any time a new file arrives in a given S3 bucket and source_prefix folder.

The flow will check for anomalies in the data from that file using a DuckDB query, and will move the file to the same S3 bucket below the destination_prefix folder.

If anomalies are detected, the flow will send an email to the recipients specified on the to property, and will send anomalous rows as a CSV file attachment in the same email.

If you use MotherDuck, use Kestra Secret to store the MotherDuck service token. Then, modify the query task as follows to point the task to your MotherDuck database:

yaml
  - id: query
    type: io.kestra.plugin.jdbc.duckdb.Query
    description: Validate new file for anomalies
    sql: |
      SELECT *
      FROM read_csv_auto('s3://{{ vars.bucket }}/{{ vars.destination_prefix }}/{{ trigger.objects | jq('.[].key') | first }}')
      WHERE price * quantity != total;
    store: true
    url: "jdbc:duckdb:md:my_db?motherduck_token={{ secret('MOTHERDUCK_TOKEN') }}"
yaml
id: s3_trigger_duckdb
namespace: company.team
variables:
  bucket: kestraio
  source_prefix: monthly_orders
  destination_prefix: stage_orders
tasks:
  - id: query
    type: io.kestra.plugin.jdbc.duckdb.Query
    description: Validate new file for anomalies
    sql: >
      INSTALL httpfs;

      LOAD httpfs;

      SET s3_region='{{ secret('AWS_DEFAULT_REGION') }}';

      SET s3_access_key_id='{{ secret('AWS_ACCESS_KEY_ID') }}';

      SET s3_secret_access_key='{{ secret('AWS_SECRET_ACCESS_KEY') }}';

      SELECT *

      FROM read_csv_auto('s3://{{ vars.bucket }}/{{ vars.destination_prefix
      }}/{{ trigger.objects | jq('.[].key') | first }}')

      WHERE price * quantity != total;
    store: true
  - id: csv
    type: io.kestra.plugin.serdes.csv.IonToCsv
    description: Create CSV file from query results
    from: "{{ outputs.query.uri }}"
  - id: if_anomalies_detected
    type: io.kestra.plugin.core.flow.If
    condition: "{{ outputs.query.size }}"
    description: Send an email if outliers detected
    then:
      - id: send_email_alert
        type: io.kestra.plugin.notifications.mail.MailSend
        subject: Anomalies in data detected
        from: anna@kestra.io
        to: anna@kestra.io
        username: anna@kestra.io
        host: mail.privateemail.com
        port: 465
        password: "{{ secret('EMAIL_PASSWORD') }}"
        sessionTimeout: 6000
        attachments:
          - name: anomalies_in_orders.csv
            uri: "{{ outputs.csv.uri }}"
        htmlTextContent: >
          Detected anomalies in sales data in file <b>s3://{{ vars.bucket }}/{{
          vars.destination_prefix }}/{{ trigger.objects | jq('.[].key') | first
          }}'</b>. <br />

          Anomalous rows are attached in a CSV file.<br /><br />

          Best regards,<br />

          Data Team
triggers:
  - id: poll_for_new_s3_files
    type: io.kestra.plugin.aws.s3.Trigger
    bucket: "{{ vars.bucket }}"
    prefix: "{{ vars.source_prefix }}"
    maxKeys: 1
    interval: PT1S
    filter: FILES
    action: MOVE
    moveTo:
      key: "{{ vars.destination_prefix }}/{{ vars.source_prefix }}"
    region: "{{ secret('AWS_DEFAULT_REGION') }}"
    accessKeyId: "{{ secret('AWS_ACCESS_KEY_ID') }}"
    secretKeyId: "{{ secret('AWS_SECRET_ACCESS_KEY') }}"

Query

Ion To Csv

If

Mail Send

Trigger

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra