Managed ETL¶
CrateDB Cloud offers a managed ETL subsystem.
Using the Import API or the Export API of CrateDB Cloud, you can import or export data from/into files, or load data, also continuously using change data capture, from databases and streaming sources.
File formats: CSV, JSON, Parquet
Databases: DynamoDB, MongoDB; including CDC
This walkthrough describes how to orchestrate managed ETL jobs using either the command-line, or the Python API. Alternatively, if you prefer an interactive user interface, please use the web-based CrateDB Cloud Console.
Installation¶
CrateDB Toolkit uses the Python programming language with native performance extensions, which is installed on most machines today. Otherwise, we recommend to download and install Python from the original source. For installing additional Python packages, we recommend to install the uv package manager.
uv tool install --upgrade 'cratedb-toolkit'
An alternative way to install Python packages is to use pipx
or pip install --user.
pipx install 'cratedb-toolkit'
Another way to invoke CrateDB Toolkit without installing it is to use its container image with Docker, Podman, Kubernetes, and friends.
docker run --rm ghcr.io/crate/cratedb-toolkit ctk
Prerequisites¶
Before using CrateDB Cloud services, authenticate and select your target database cluster.
Authenticate¶
When working with CrateDB Cloud, you can select between two authentication variants.
Either interactively authorize your terminal session using croud login,
# Replace YOUR_IDP with one of: cognito, azuread, github, google.
croud login --idp YOUR_IDP
or provide API access credentials per environment variables for headless/unattended
operations after creating them using the CrateDB Cloud Console or
croud api-keys create.
# Provide CrateDB Cloud API authentication tokens.
export CRATEDB_CLOUD_API_KEY='<YOUR_API_KEY>'
export CRATEDB_CLOUD_API_SECRET='<YOUR_API_SECRET>'
Select cluster¶
Discover the list of available database clusters.
croud clusters list
Select the designated target database cluster using one of three variants, either by using CLI options or environment variables.
All address options are mutually exclusive.
CLI options take precedence over environment variables.
Environment variables can be stored into an
.envfile in your working directory.
- CLI options:
--cluster-id,--cluster-name,--cluster-url- Environment variables:
CRATEDB_CLUSTER_ID,CRATEDB_CLUSTER_NAME,CRATEDB_CLUSTER_URL
Before invoking any of the next steps, address the CrateDB Cloud Cluster
you are aiming to connect to, for example by defining the cluster id
using the CRATEDB_CLUSTER_ID environment variable.
export CRATEDB_CLUSTER_ID='<YOUR_CLUSTER_ID>'
Usage¶
You can run jobs from the command-line or by using the Python API.
CLI¶
Use the ctk load command to load data into database tables.
By default, the table name is derived from the filename.
Use the --table option to specify a custom name.
ctk load 'https://cdn.crate.io/downloads/datasets/cratedb-datasets/cloud-tutorials/data_weather.csv.gz'
ctk load 'https://cdn.crate.io/downloads/datasets/cratedb-datasets/cloud-tutorials/data_marketing.json.gz'
ctk load 'https://cdn.crate.io/downloads/datasets/cratedb-datasets/timeseries/yc.2019.07-tiny.parquet.gz' --table='yc-2019-07-tiny'
Use the ctk shell command to query and aggregate data using SQL.
ctk shell --command="SELECT * FROM data_weather LIMIT 10;"
ctk shell --command="SELECT * FROM data_weather LIMIT 10;" --format=csv
ctk shell --command="SELECT * FROM data_weather LIMIT 10;" --format=json
Note
ctk shell effectively just invokes the CrateDB Shell, with the additional
benefit of transparently authenticating users from the existing session
established by CTK/croud. Otherwise, you would need to provide authentication
credentials separately.
Python API¶
Use the Python API to import and query data.
# Import API classes.
from cratedb_toolkit import InputOutputResource, ManagedCluster
# Define data source.
url = "https://cdn.crate.io/downloads/datasets/cratedb-datasets/machine-learning/timeseries/nab-machine-failure.csv"
source = InputOutputResource(url=url)
# Connect to CrateDB Cloud.
with ManagedCluster.from_env() as cluster:
# Invoke the import job.
cluster.load_table(source=source)
# Query imported data.
results = cluster.query('SELECT * FROM "nab-machine-failure" LIMIT 10;')
# Display data.
from pprint import pprint
pprint(results)