Data Contract CLI

Test Workflow Stars Slack Status

The datacontract CLI is an open source command-line tool for working with Data Contracts. It uses data contract YAML files to lint the data contract, connect to data sources and execute schema and quality tests, detect breaking changes, and export to different formats. The tool is written in Python. It can be used as a standalone CLI tool, in a CI/CD pipeline, or directly as a Python library.

NOTE: This project has been migrated from Go to Python which adds the possibility to use datacontract within Python code as library, but it comes with some breaking changes. The Go version has been forked, if you still rely on that.

Getting started

Let’s look at this data contract: https://datacontract.com/examples/orders-latest/datacontract.yaml

We have a servers section with endpoint details to the S3 bucket, models for the structure of the data, and quality attributes that describe the expected freshness and number of rows.

This data contract contains all information to connect to S3 and check that the actual data meets the defined schema and quality requirements. We can use this information to test if the actual data set in S3 is compliant to the data contract.

Let’s use pip to install the CLI (or use the Docker image, if you prefer).

$ python3 -m pip install datacontract-cli

We run the tests:

$ datacontract test https://datacontract.com/examples/orders-latest/datacontract.yaml

# returns:
Testing https://datacontract.com/examples/orders-latest/datacontract.yaml
╭────────┬─────────────────────────────────────────────────────────────────────┬───────────────────────────────┬─────────╮
│ Result │ Check                                                               │ Field                         │ Details │
├────────┼─────────────────────────────────────────────────────────────────────┼───────────────────────────────┼─────────┤
│ passed │ Check that JSON has valid schema                                    │ orders                        │         │
│ passed │ Check that JSON has valid schema                                    │ line_items                    │         │
│ passed │ Check that field order_id is present                                │ orders                        │         │
│ passed │ Check that field order_timestamp is present                         │ orders                        │         │
│ passed │ Check that field order_total is present                             │ orders                        │         │
│ passed │ Check that field customer_id is present                             │ orders                        │         │
│ passed │ Check that field customer_email_address is present                  │ orders                        │         │
│ passed │ row_count >= 5000                                                   │ orders                        │         │
│ passed │ Check that required field order_id has no null values               │ orders.order_id               │         │
│ passed │ Check that unique field order_id has no duplicate values            │ orders.order_id               │         │
│ passed │ duplicate_count(order_id) = 0                                       │ orders.order_id               │         │
│ passed │ Check that required field order_timestamp has no null values        │ orders.order_timestamp        │         │
│ passed │ freshness(order_timestamp) < 24h                                    │ orders.order_timestamp        │         │
│ passed │ Check that required field order_total has no null values            │ orders.order_total            │         │
│ passed │ Check that required field customer_email_address has no null values │ orders.customer_email_address │         │
│ passed │ Check that field lines_item_id is present                           │ line_items                    │         │
│ passed │ Check that field order_id is present                                │ line_items                    │         │
│ passed │ Check that field sku is present                                     │ line_items                    │         │
│ passed │ values in (order_id) must exist in orders (order_id)                │ line_items.order_id           │         │
│ passed │ row_count >= 5000                                                   │ line_items                    │         │
│ passed │ Check that required field lines_item_id has no null values          │ line_items.lines_item_id      │         │
│ passed │ Check that unique field lines_item_id has no duplicate values       │ line_items.lines_item_id      │         │
╰────────┴─────────────────────────────────────────────────────────────────────┴───────────────────────────────┴─────────╯
🟢 data contract is valid. Run 22 checks. Took 6.739514 seconds.

Voilà, the CLI tested that the datacontract.yaml itself is valid, all records comply with the schema, and all quality attributes are met.

Usage

# create a new data contract from example and write it to datacontract.yaml
$ datacontract init datacontract.yaml

# lint the datacontract.yaml
$ datacontract lint datacontract.yaml

# execute schema and quality checks
$ datacontract test datacontract.yaml

# execute schema and quality checks on the examples within the contract
$ datacontract test --examples datacontract.yaml

# find differences between to data contracts (Coming Soon)
$ datacontract diff datacontract-v1.yaml datacontract-v2.yaml

# fail pipeline on breaking changes  (Coming Soon)
$ datacontract breaking datacontract-v1.yaml datacontract-v2.yaml

# export model as jsonschema
$ datacontract export --format jsonschema datacontract.yaml

# export model as dbt
$ datacontract export --format dbt datacontract.yaml

# import protobuf as model (Coming Soon)
$ datacontract import --format protobuf --source my_protobuf_file.proto datacontract.yaml

Programmatic (Python)

from datacontract.data_contract import DataContract

data_contract = DataContract(data_contract_file="datacontract.yaml")
run = data_contract.test()
if not run.has_passed():
    print("Data quality validation failed.")
    # Abort pipeline, alert, or take corrective actions...

Scenario: Integration with Data Mesh Manager

If you use Data Mesh Manager, you can use the data contract URL and append the --publish option to send and display the test results. Set an environment variable for your API key.

# Fetch current data contract, execute tests on production, and publish result to data mesh manager
$ EXPORT DATAMESH_MANAGER_API_KEY=xxx
$ datacontract test https://demo.datamesh-manager.com/demo279750347121/datacontracts/4df9d6ee-e55d-4088-9598-b635b2fdcbbc/datacontract.yaml --server production --publish

Installation

Choose the most appropriate installation method for your needs:

pip

Python 3.11 recommended. Python 3.12 available as pre-release release candidate for 0.9.3

python3 -m pip install datacontract-cli

pipx

pipx installs into an isolated environment.

pipx install datacontract-cli

Docker

docker pull datacontract/cli
docker run --rm -v ${PWD}:/home/datacontract datacontract/cli

Or via an alias that automatically uses the latest version:

alias datacontract='docker run --rm -v "${PWD}:/home/datacontract" datacontract/cli:latest'

Documentation

Tests

Data Contract CLI can connect to data sources and run schema and quality tests to verify that the data contract is valid.

$ datacontract test --server production datacontract.yaml

To connect to the databases the server block in the datacontract.yaml is used to set up the connection. In addition, credentials, such as username and passwords, may be defined with environment variables.

The application uses different engines, based on the server type.

Type Format Description Status Engines
s3 parquet Works for any S3-compliant endpoint., e.g., AWS S3, GCS, MinIO, Ceph, … soda-core-duckdb
s3 json Support for new_line delimited JSON files and one JSON record per file. fastjsonschema
soda-core-duckdb
s3 csv   soda-core-duckdb
s3 delta   Coming soon TBD
postgres n/a   soda-core-postgres
snowflake n/a   soda-core-snowflake
bigquery n/a   soda-core-bigquery
redshift n/a   Coming soon TBD
databricks n/a Support for Databricks SQL with Unity catalog and Hive metastore. soda-core-spark
databricks n/a Support for Spark for programmatic use in Notebooks. soda-core-spark-df
kafka json   Coming soon TBD
kafka avro   Coming soon TBD
kafka protobuf   Coming soon TBD
local parquet   soda-core-duckdb
local json Support for new_line delimited JSON files and one JSON record per file. fastjsonschema
soda-core-duckdb
local csv   soda-core-duckdb

Feel free to create an issue, if you need support for an additional type.

S3

Data Contract CLI can test data that is stored in S3 buckets or any S3-compliant endpoints in various formats.

Example

datacontract.yaml

servers:
  production:
    type: s3
    endpointUrl: https://minio.example.com # not needed with AWS S3
    location: s3://bucket-name/path/*/*.json
    format: json
    delimiter: new_line # new_line, array, or none

Environment Variables

Environment Variable Example Description
DATACONTRACT_S3_REGION eu-central-1 Region of S3 bucket
DATACONTRACT_S3_ACCESS_KEY_ID AKIAXV5Q5QABCDEFGH AWS Access Key ID
DATACONTRACT_S3_SECRET_ACCESS_KEY 93S7LRrJcqLaaaa/XXXXXXXXXXXXX AWS Secret Access Key

Postgres

Data Contract CLI can test data in Postgres or Postgres-compliant databases (e.g., RisingWave).

Example

datacontract.yaml

servers:
  postgres:
    type: postgres
    host: localhost
    port: 5432
    database: postgres
    schema: public
models:
  my_table_1: # corresponds to a table
    type: table
    fields: 
      my_column_1: # corresponds to a column
        type: varchar

Environment Variables

Environment Variable Example Description
DATACONTRACT_POSTGRES_USERNAME postgres Username
DATACONTRACT_POSTGRES_PASSWORD mysecretpassword Password

BigQuery

We support authentication to BigQuery using Service Account Key. The used Service Account should include the roles:

Example

datacontract.yaml

servers:
  production:
    type: bigquery
    project: datameshexample-product
    dataset: datacontract_cli_test_dataset
models:
  datacontract_cli_test_table: # corresponds to a BigQuery table
    type: table
    fields: ...

Environment Variables

Environment Variable Example Description
DATACONTRACT_BIGQUERY_ACCOUNT_INFO_JSON_PATH ~/service-access-key.json Service Access key as saved on key creation by BigQuery

Databricks

Works with Unity Catalog and Hive metastore.

Needs a running SQL warehouse or compute cluster.

Example

datacontract.yaml

servers:
  production:
    type: databricks
    host: dbc-abcdefgh-1234.cloud.databricks.com
    catalog: acme_catalog_prod
    schema: orders_latest
models:
  orders: # corresponds to a table
    type: table
    fields: ...

Environment Variables

Environment Variable Example Description
DATACONTRACT_DATABRICKS_TOKEN dapia00000000000000000000000000000 The personal access token to authenticate
DATACONTRACT_DATABRICKS_HTTP_PATH /sql/1.0/warehouses/b053a3ffffffff The HTTP path to the SQL warehouse or compute cluster

Databricks (programmatic)

Works with Unity Catalog and Hive metastore. When running in a notebook or pipeline, the provided spark session can be used. An additional authentication is not required.

Requires a Databricks Runtime with Python >= 3.10.

Example

datacontract.yaml

servers:
  production:
    type: databricks
    host: dbc-abcdefgh-1234.cloud.databricks.com # ignored, always use current host
    catalog: acme_catalog_prod
    schema: orders_latest
models:
  orders: # corresponds to a table
    type: table
    fields: ...

Notebook

%pip install datacontract-cli
dbutils.library.restartPython()

from datacontract.data_contract import DataContract

data_contract = DataContract(
  data_contract_file="/Volumes/acme_catalog_prod/orders_latest/datacontract/datacontract.yaml", 
  spark=spark)
run = data_contract.test()
run.result

Exports

Available export options:

Type Description Status
jsonschema Export to JSON Schema
sodacl Export to SodaCL quality checks in YAML format
dbt Export to dbt model in YAML format
avro Export to AVRO models TBD
pydantic Export to pydantic models TBD
sql Export to SQL DDL TBD
protobuf Export to Protobuf TBD

Development Setup

Python base interpreter should be 3.11.x (unless working on 3.12 release candidate).

# create venv
python3 -m venv venv
source venv/bin/activate

# Install Requirements
pip install --upgrade pip setuptools wheel
pip install -e '.[dev]'
cd tests/
pytest

Release

git tag v0.9.0
git push origin v0.9.0
python3 -m pip install --upgrade build twine
rm -r dist/
python3 -m build
# for now only test.pypi.org
python3 -m twine upload --repository testpypi dist/*

Docker Build

docker build -t datacontract/cli .
docker run --rm -v ${PWD}:/home/datacontract datacontract/cli

Contribution

We are happy to receive your contributions. Propose your change in an issue or directly create a pull request with your improvements.

License

MIT License

Credits

Created by Stefan Negele and Jochen Christ.