Managed ETL

CrateDB Cloud offers a managed ETL subsystem.

Using the Import API or the Export API of CrateDB Cloud, you can import or export data from/into files, or load data, also continuously using change data capture, from databases and streaming sources.

  • File formats: CSV, JSON, Parquet

  • Databases: DynamoDB, MongoDB; including CDC

This walkthrough describes how to orchestrate managed ETL jobs using either the command-line, or the Python API. Alternatively, if you prefer an interactive user interface, please use the web-based CrateDB Cloud Console.

Installation

CrateDB Toolkit uses the Python programming language with native performance extensions, which is installed on most machines today. Otherwise, we recommend to download and install Python from the original source. For installing additional Python packages, we recommend to install the uv package manager.

uv tool install --upgrade 'cratedb-toolkit'

An alternative way to install Python packages is to use pipx or pip install --user.

pipx install 'cratedb-toolkit'

Another way to invoke CrateDB Toolkit without installing it is to use its container image with Docker, Podman, Kubernetes, and friends.

docker run --rm ghcr.io/crate/cratedb-toolkit ctk

Prerequisites

Before using CrateDB Cloud services, authenticate and select your target database cluster.

Authenticate

When working with CrateDB Cloud, you can select between two authentication variants. Either interactively authorize your terminal session using croud login,

# Replace YOUR_IDP with one of: cognito, azuread, github, google.
croud login --idp YOUR_IDP

or provide API access credentials per environment variables for headless/unattended operations after creating them using the CrateDB Cloud Console or croud api-keys create.

# Provide CrateDB Cloud API authentication tokens.
export CRATEDB_CLOUD_API_KEY='<YOUR_API_KEY>'
export CRATEDB_CLOUD_API_SECRET='<YOUR_API_SECRET>'

Select cluster

Discover the list of available database clusters.

croud clusters list

Select the designated target database cluster using one of three variants, either by using CLI options or environment variables.

  • All address options are mutually exclusive.

  • CLI options take precedence over environment variables.

  • Environment variables can be stored into an .env file in your working directory.

CLI options:

--cluster-id, --cluster-name, --cluster-url

Environment variables:

CRATEDB_CLUSTER_ID, CRATEDB_CLUSTER_NAME, CRATEDB_CLUSTER_URL

Before invoking any of the next steps, address the CrateDB Cloud Cluster you are aiming to connect to, for example by defining the cluster id using the CRATEDB_CLUSTER_ID environment variable.

export CRATEDB_CLUSTER_ID='<YOUR_CLUSTER_ID>'

Usage

You can run jobs from the command-line or by using the Python API.

CLI

Use the ctk load command to load data into database tables. By default, the table name is derived from the filename. Use the --table option to specify a custom name.

ctk load 'https://cdn.crate.io/downloads/datasets/cratedb-datasets/cloud-tutorials/data_weather.csv.gz'
ctk load 'https://cdn.crate.io/downloads/datasets/cratedb-datasets/cloud-tutorials/data_marketing.json.gz'
ctk load 'https://cdn.crate.io/downloads/datasets/cratedb-datasets/timeseries/yc.2019.07-tiny.parquet.gz' --table='yc-2019-07-tiny'

Use the ctk shell command to query and aggregate data using SQL.

ctk shell --command="SELECT * FROM data_weather LIMIT 10;"
ctk shell --command="SELECT * FROM data_weather LIMIT 10;" --format=csv
ctk shell --command="SELECT * FROM data_weather LIMIT 10;" --format=json

Note

ctk shell effectively just invokes the CrateDB Shell, with the additional benefit of transparently authenticating users from the existing session established by CTK/croud. Otherwise, you would need to provide authentication credentials separately.

Python API

Use the Python API to import and query data.

# Import API classes.
from cratedb_toolkit import InputOutputResource, ManagedCluster

# Define data source.
url = "https://cdn.crate.io/downloads/datasets/cratedb-datasets/machine-learning/timeseries/nab-machine-failure.csv"
source = InputOutputResource(url=url)

# Connect to CrateDB Cloud.
with ManagedCluster.from_env() as cluster:

    # Invoke the import job.
    cluster.load_table(source=source)

    # Query imported data.
    results = cluster.query('SELECT * FROM "nab-machine-failure" LIMIT 10;')
    
    # Display data.
    from pprint import pprint
    pprint(results)