Amazon Kinesis¶
Load data from Amazon Kinesis into CrateDB.
Amazon Kinesis is a cloud-based service to collect and process large streams of data records in real time.
Prerequisites¶
Use Docker or Podman to run all components. This approach works consistently across Linux, macOS, and Windows. The tutorial uses LocalStack to spin up a local instance of Amazon Kinesis so you don’t need an AWS account to exercise the pipeline.
Install¶
Install the most recent Python packages awscli and cratedb-toolkit, or evaluate alternative installation methods.
uv tool install --upgrade 'awscli' 'cratedb-toolkit[io-ingest]'
Tutorial¶
5-minute step-by-step instructions about how to work with Amazon Kinesis and CrateDB.
Services¶
Run Kinesis from LocalStack and CrateDB using Docker or Podman.
docker run --rm --name=localstack \
--publish=4566:4566 \
docker.io/localstack/localstack:latest
docker run --rm --name=cratedb \
--publish=4200:4200 --publish=5432:5432 --env=CRATE_HEAP_SIZE=2g \
docker.io/crate:latest '-Cdiscovery.type=single-node'
Configure AWS clients¶
LocalStack’s default region is us-east-1. Let’s use it.
export AWS_DEFAULT_REGION=us-east-1
export AWS_ENDPOINT_URL="http://localhost:4566"
Populate data¶
Create the Kinesis stream or enumerate existing ones.
aws kinesis create-stream --stream-name=demo
aws kinesis list-streams
Publish two data payloads to the Kinesis stream.
aws kinesis put-record \
--stream-name=demo --partition-key=default \
--data '{"sensor_id":1,"ts":"2025-06-01 10:00","temperature":42.42,"humidity":84.84}'
aws kinesis put-record \
--stream-name=demo --partition-key=default \
--data '{"sensor_id":2,"ts":"2025-06-01 11:00","temperature":45.21,"humidity":80.82}'
Load data¶
Use CrateDB Toolkit to load data from Kinesis stream into CrateDB table.
ctk load \
"kinesis:?aws_access_key_id=test&aws_secret_access_key=test®ion_name=us-east-1&table=demo" \
"crate://crate:crate@localhost:4200/testdrive/kinesis"
Query data¶
Inspect database using crash.
crash -c "SELECT count(*) FROM testdrive.kinesis"
crash -c "SELECT * FROM testdrive.kinesis"
crash -c "SHOW CREATE TABLE testdrive.kinesis"
Documentation¶
The Kinesis StreamName can be provided by using the &table= query parameter.
You can also use a full Kinesis StreamARN to address the stream in ARN
format.
By default, the Kinesis stream is processed from the beginning, but you can
also adjust its StartingPosition to start reading from a specific time.
Currently, the pipeline expects the Kinesis records to be encoded in JSON
format.
A fully qualified Kinesis URL template that uses ARNs to address the Kinesis stream looks like this.
kinesis:?aws_access_key_id=${AWS_ACCESS_KEY_ID}&aws_secret_access_key=${AWS_SECRET_ACCESS_KEY}®ion_name=${AWS_REGION_NAME}&table=arn:aws:kinesis:${AWS_REGION_NAME}:${AWS_ACCOUNT_ID}:stream/${KINESIS_STREAM_NAME}&start_date=${KINESIS_AT_TIMESTAMP}
Kinesis options
aws_access_key_id: AWS access key ID.aws_secret_access_key: AWS secret access key.region_name: AWS region name.table: Kinesis stream name or ARN.start_date: Kinesis StartingPosition’sAT_TIMESTAMPvalue in ISO format.
When using an ARN to address the Kinesis stream in the table parameter,
please adjust values for the AWS region, the AWS account ID, and the Kinesis
stream name, according to the layout below.
# ARN prefix Region Account ID Type Stream name
arn:aws:kinesis:us-east-1:000000000000:stream/demo
The start_date option supports various datetime formats,
a few examples are listed below.
%Y-%m-%d: 2023-01-31
%Y-%m-%dT%H:%M:%S: 2023-01-31T15:00:00
%Y-%m-%dT%H:%M:%S%z: 2023-01-31T15:00:00+00:00
%Y-%m-%dT%H:%M:%S.%f: 2023-01-31T15:00:00.000123
%Y-%m-%dT%H:%M:%S.%f%z: 2023-01-31T15:00:00.000123+00:00
CrateDB options
Please make sure to replace username, password, and hostname with values matching your environment.
ssl: Use the?ssl=truequery parameter to enable SSL. Also use this when connecting to CrateDB Cloud.'crate://crate:crate@cratedb.example.org:4200/schema/table?ssl=true'
Checkpointing¶
By default, the Kinesis CDC relay starts reading from the beginning of the
stream (TRIM_HORIZON) on every restart. For long-running pipelines, this
means re-ingesting records that were already written to CrateDB.
To enable persistent checkpointing, add the checkpointer query parameter
to the Kinesis URL. The relay will then record its progress and resume from
the last checkpoint on restart.
Checkpointer URL parameters
checkpointer: Backend type. Supported values:cratedb,memory. Without this parameter, no persistent checkpointing is used.checkpointer-name: Namespace for checkpoint storage. Defaults to the stream name. Use distinct names when multiple relays consume the same stream.checkpointer-schema: CrateDB schema for the checkpoint table. Defaults toext.checkpointer-interval: Seconds between checkpoint flushes. Defaults to5. Lower values reduce duplicate records after a crash but increase write load.
Example: CrateDB checkpointer
ctk load \
"kinesis+dms://localhost:4566/demo?region=us-east-1&create=true&checkpointer=cratedb&checkpointer-interval=10" \
"crate://localhost:4200/testdrive" \
--transformation examples/cdc/aws/dms-load-schema-universal.yaml
The relay creates a kinesis_checkpoints table in the configured schema
(default ext) to store shard progress. On restart, the consumer resumes
from the last checkpointed sequence number.
Delivery guarantee
Checkpointed delivery is at-least-once. Records between the last flushed
checkpoint and a crash will be replayed on restart. Relay SQL writes should
use upsert or idempotent semantics (INSERT ... ON CONFLICT) to handle
duplicate delivery gracefully.
Maintenance
The checkpoint table stores one row per shard per namespace. For typical workloads (a handful of streams with 1-10 shards each), the table stays small indefinitely.
Rows may accumulate over time from decommissioned pipelines or Kinesis shard
splits. Use the ctk kinesis commands to inspect and clean up checkpoint state.
List checkpoints:
# All checkpoints
ctk kinesis list-checkpoints "crate://localhost/"
# Filter by namespace (stream name)
ctk kinesis list-checkpoints --namespace my-stream "crate://localhost/"
Prune stale checkpoints:
At least one of --older-than or --namespace is required. By default only
inactive rows (active=FALSE) are eligible for deletion.
# Preview what would be deleted (dry run)
ctk kinesis prune-checkpoints --older-than 30d --dry-run "crate://localhost/"
# Delete inactive checkpoints older than 30 days
ctk kinesis prune-checkpoints --older-than 30d "crate://localhost/"
# Remove all checkpoints for a decommissioned pipeline
ctk kinesis prune-checkpoints --namespace old-pipeline "crate://localhost/"
# Skip confirmation prompt
ctk kinesis prune-checkpoints --older-than 30d --yes "crate://localhost/"
# Custom schema (if not using default "ext")
ctk kinesis prune-checkpoints --older-than 7d --schema my_ext "crate://localhost/"
The --older-than option accepts durations like 7d, 24h, 2w, or
combinations like 1d12h.
Note
A healthy but idle consumer’s checkpoints can look stale by updated_at.
Stop consumers before pruning, or use --dry-run to verify what will be
deleted.
Manual SQL
You can also query and manage the checkpoint table directly:
SELECT "namespace", "shard_id", "sequence", "active", "updated_at"
FROM "ext"."kinesis_checkpoints"
ORDER BY "updated_at" DESC;
DELETE FROM "ext"."kinesis_checkpoints"
WHERE "active" = FALSE
AND "updated_at" < NOW() - INTERVAL '30 days';
See also¶
CrateDB also provides native data import capabilities and support for different ETL applications and frameworks, see load data into CrateDB. If you have additional requirements on this or other I/O adapters, for example to support advanced processing options or different data formats, or if you want us to provide a managed variant, please let us know through any of our support channels, preferably on our community forum.