Blueprints

Send multiple records to AWS Kinesis Data Streams in a simple list of maps or using a JSON API payload

About this blueprint

AWS API

This flow will send multiple records to AWS Kinesis Data Streams. The PutRecords task accepts either a simple list of maps (i.e., a list of dictionaries) or a URL of an internal storage file in ION format. When extracting data from an HTTP API, you must convert the JSON payload to ION format using the JsonToIon task. Then, you can pass it to the PutRecords task.

The Outputs tab will display all records sent to AWS Kinesis Data Streams, including the shard ID and sequence number. You can use these values to retrieve the records from the stream using the GetRecords AWS API call.

Note that when sending the data payload, this must be a string value. If you want to send a JSON object to your Kinesis Data Stream, wrap it into a string, as shown in the following Gist. In contrast, this payload will not work.

yaml
id: aws_kinesis_json
namespace: company.team
tasks:
  - id: put_records_simple_map
    type: io.kestra.plugin.aws.kinesis.PutRecords
    streamName: kestra
    records:
      - data: sign-in
        partitionKey: user1
      - data: sign-out
        partitionKey: user1
  - id: extract
    type: io.kestra.plugin.core.http.Download
    uri: https://huggingface.co/datasets/kestra/datasets/resolve/main/json/user_events.json
  - id: json_to_ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: "{{ outputs.extract.uri }}"
    newLine: false
  - id: put_records
    type: io.kestra.plugin.aws.kinesis.PutRecords
    streamName: kestra
    records: "{{ outputs.json_to_ion.uri }}"

Put Records

Download

Json To Ion

More Related Blueprints

New to Kestra?

Use blueprints to kickstart your first workflows.

Get started with Kestra