Data Contract CLI

Test Workflow Stars Slack Status

The datacontract CLI is an open source command-line tool for working with Data Contracts. It uses data contract YAML files to lint the data contract, connect to data sources and execute schema and quality tests, detect breaking changes, and export to different formats. The tool is written in Python. It can be used as a standalone CLI tool, in a CI/CD pipeline, or directly as a Python library.

Main features of the Data Contract CLI

Demo of Data Contract CLI

Getting started

Let’s look at this data contract: https://datacontract.com/examples/orders-latest/datacontract.yaml

We have a servers section with endpoint details to the S3 bucket, models for the structure of the data, and quality attributes that describe the expected freshness and number of rows.

This data contract contains all information to connect to S3 and check that the actual data meets the defined schema and quality requirements. We can use this information to test if the actual data set in S3 is compliant to the data contract.

Let’s use pip to install the CLI (or use the Docker image, if you prefer).

$ python3 -m pip install datacontract-cli

We run the tests:

$ datacontract test https://datacontract.com/examples/orders-latest/datacontract.yaml

# returns:
Testing https://datacontract.com/examples/orders-latest/datacontract.yaml
╭────────┬─────────────────────────────────────────────────────────────────────┬───────────────────────────────┬─────────╮
│ Result │ Check                                                               │ Field                         │ Details │
├────────┼─────────────────────────────────────────────────────────────────────┼───────────────────────────────┼─────────┤
│ passed │ Check that JSON has valid schema                                    │ orders                        │         │
│ passed │ Check that JSON has valid schema                                    │ line_items                    │         │
│ passed │ Check that field order_id is present                                │ orders                        │         │
│ passed │ Check that field order_timestamp is present                         │ orders                        │         │
│ passed │ Check that field order_total is present                             │ orders                        │         │
│ passed │ Check that field customer_id is present                             │ orders                        │         │
│ passed │ Check that field customer_email_address is present                  │ orders                        │         │
│ passed │ row_count >= 5000                                                   │ orders                        │         │
│ passed │ Check that required field order_id has no null values               │ orders.order_id               │         │
│ passed │ Check that unique field order_id has no duplicate values            │ orders.order_id               │         │
│ passed │ duplicate_count(order_id) = 0                                       │ orders.order_id               │         │
│ passed │ Check that required field order_timestamp has no null values        │ orders.order_timestamp        │         │
│ passed │ freshness(order_timestamp) < 24h                                    │ orders.order_timestamp        │         │
│ passed │ Check that required field order_total has no null values            │ orders.order_total            │         │
│ passed │ Check that required field customer_email_address has no null values │ orders.customer_email_address │         │
│ passed │ Check that field lines_item_id is present                           │ line_items                    │         │
│ passed │ Check that field order_id is present                                │ line_items                    │         │
│ passed │ Check that field sku is present                                     │ line_items                    │         │
│ passed │ values in (order_id) must exist in orders (order_id)                │ line_items.order_id           │         │
│ passed │ row_count >= 5000                                                   │ line_items                    │         │
│ passed │ Check that required field lines_item_id has no null values          │ line_items.lines_item_id      │         │
│ passed │ Check that unique field lines_item_id has no duplicate values       │ line_items.lines_item_id      │         │
╰────────┴─────────────────────────────────────────────────────────────────────┴───────────────────────────────┴─────────╯
🟢 data contract is valid. Run 22 checks. Took 6.739514 seconds.

Voilà, the CLI tested that the datacontract.yaml itself is valid, all records comply with the schema, and all quality attributes are met.

Usage

# create a new data contract from example and write it to datacontract.yaml
$ datacontract init datacontract.yaml

# lint the datacontract.yaml
$ datacontract lint datacontract.yaml

# execute schema and quality checks
$ datacontract test datacontract.yaml

# execute schema and quality checks on the examples within the contract
$ datacontract test --examples datacontract.yaml

# find differences between to data contracts (Coming Soon)
$ datacontract diff datacontract-v1.yaml datacontract-v2.yaml

# find differences between to data contracts categorized into error, warning, and info.
$ datacontract changelog datacontract-v1.yaml datacontract-v2.yaml

# fail pipeline on breaking changes. Uses changelog internally and showing only error and warning.
$ datacontract breaking datacontract-v1.yaml datacontract-v2.yaml

# export model as jsonschema (other formats: avro, dbt, dbt-sources, dbt-staging-sql, jsonschema, odcs, rdf, sql (coming soon), sodacl, terraform)
$ datacontract export --format jsonschema datacontract.yaml

# import sql
$ datacontract import --format sql --source my_ddl.sql

# import avro
$ datacontract import --format avro --source avro_schema.avsc

Programmatic (Python)

from datacontract.data_contract import DataContract

data_contract = DataContract(data_contract_file="datacontract.yaml")
run = data_contract.test()
if not run.has_passed():
    print("Data quality validation failed.")
    # Abort pipeline, alert, or take corrective actions...

Integrations

Integration Option Description
Data Mesh Manager --publish Push full results to the Data Mesh Manager API
OpenTelemetry --publish-to-opentelemetry Push result as gauge metrics (logs are planned)

Integration with Data Mesh Manager

If you use Data Mesh Manager, you can use the data contract URL and append the --publish option to send and display the test results. Set an environment variable for your API key.

# Fetch current data contract, execute tests on production, and publish result to data mesh manager
$ EXPORT DATAMESH_MANAGER_API_KEY=xxx
$ datacontract test https://demo.datamesh-manager.com/demo279750347121/datacontracts/4df9d6ee-e55d-4088-9598-b635b2fdcbbc/datacontract.yaml --server production --publish

Integration with OpenTelemetry

If you use OpenTelemetry, you can use the data contract URL and append the --publish-to-opentelemetry option to send the test results to your OLTP-compatible instance, e.g., Prometheus.

The metric name is “datacontract.cli.test.result” and it uses the following encoding for the result:

datacontract.cli.test.result Description
0 test run passed, no warnings
1 test run has warnings
2 test run failed
3 test run not possible due to an error
4 test status unknown
# Fetch current data contract, execute tests on production, and publish result to open telemetry
$ EXPORT OTEL_SERVICE_NAME=datacontract-cli
$ EXPORT OTEL_EXPORTER_OTLP_ENDPOINT=https://YOUR_ID.apm.westeurope.azure.elastic-cloud.com:443
$ EXPORT OTEL_EXPORTER_OTLP_HEADERS=Authorization=Bearer%20secret # Optional, when using SaaS Products
$ EXPORT OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf # Optional, default is http/protobuf - use value grpc to use the gRPC protocol instead
# Send to OpenTelemetry
$ datacontract test https://demo.datamesh-manager.com/demo279750347121/datacontracts/4df9d6ee-e55d-4088-9598-b635b2fdcbbc/datacontract.yaml --server production --publish-to-opentelemetry

Current limitations:

Installation

Choose the most appropriate installation method for your needs:

pip

Python 3.11 recommended. Python 3.12 available as pre-release release candidate for 0.9.3

python3 -m pip install datacontract-cli

pipx

pipx installs into an isolated environment.

pipx install datacontract-cli

Docker

docker pull datacontract/cli
docker run --rm -v ${PWD}:/home/datacontract datacontract/cli

Or via an alias that automatically uses the latest version:

alias datacontract='docker run --rm -v "${PWD}:/home/datacontract" datacontract/cli:latest'

Documentation

Tests

Data Contract CLI can connect to data sources and run schema and quality tests to verify that the data contract is valid.

$ datacontract test --server production datacontract.yaml

To connect to the databases the server block in the datacontract.yaml is used to set up the connection. In addition, credentials, such as username and passwords, may be defined with environment variables.

The application uses different engines, based on the server type.

Type Format Description Status Engines
s3 parquet Works for any S3-compliant endpoint., e.g., AWS S3, GCS, MinIO, Ceph, … soda-core-duckdb
s3 json Support for new_line delimited JSON files and one JSON record per file. fastjsonschema
soda-core-duckdb
s3 csv   soda-core-duckdb
s3 delta   Coming soon TBD
postgres n/a   soda-core-postgres
snowflake n/a   soda-core-snowflake
bigquery n/a   soda-core-bigquery
redshift n/a   Coming soon TBD
databricks n/a Support for Databricks SQL with Unity catalog and Hive metastore. soda-core-spark
databricks n/a Support for Spark for programmatic use in Notebooks. soda-core-spark-df
kafka json Experimental. pyspark
soda-core-spark-df
kafka avro   Coming soon TBD
kafka protobuf   Coming soon TBD
local parquet   soda-core-duckdb
local json Support for new_line delimited JSON files and one JSON record per file. fastjsonschema
soda-core-duckdb
local csv   soda-core-duckdb

Feel free to create an issue, if you need support for an additional type.

S3

Data Contract CLI can test data that is stored in S3 buckets or any S3-compliant endpoints in various formats.

Example

datacontract.yaml

servers:
  production:
    type: s3
    endpointUrl: https://minio.example.com # not needed with AWS S3
    location: s3://bucket-name/path/*/*.json
    format: json
    delimiter: new_line # new_line, array, or none

Environment Variables

Environment Variable Example Description
DATACONTRACT_S3_REGION eu-central-1 Region of S3 bucket
DATACONTRACT_S3_ACCESS_KEY_ID AKIAXV5Q5QABCDEFGH AWS Access Key ID
DATACONTRACT_S3_SECRET_ACCESS_KEY 93S7LRrJcqLaaaa/XXXXXXXXXXXXX AWS Secret Access Key

Postgres

Data Contract CLI can test data in Postgres or Postgres-compliant databases (e.g., RisingWave).

Example

datacontract.yaml

servers:
  postgres:
    type: postgres
    host: localhost
    port: 5432
    database: postgres
    schema: public
models:
  my_table_1: # corresponds to a table
    type: table
    fields: 
      my_column_1: # corresponds to a column
        type: varchar

Environment Variables

Environment Variable Example Description
DATACONTRACT_POSTGRES_USERNAME postgres Username
DATACONTRACT_POSTGRES_PASSWORD mysecretpassword Password

Snowflake

Data Contract CLI can test data in Snowflake.

Example

datacontract.yaml


servers:
  snowflake:
    type: snowflake
    account: abcdefg-xn12345
    database: ORDER_DB
    schema: ORDERS_PII_V2
models:
  my_table_1: # corresponds to a table
    type: table
    fields: 
      my_column_1: # corresponds to a column
        type: varchar

Environment Variables

Environment Variable Example Description
DATACONTRACT_SNOWFLAKE_USERNAME datacontract Username
DATACONTRACT_SNOWFLAKE_PASSWORD mysecretpassword Password
DATACONTRACT_SNOWFLAKE_ROLE DATAVALIDATION The snowflake role to use.
DATACONTRACT_SNOWFLAKE_WAREHOUSE COMPUTE_WH The Snowflake Warehouse to use executing the tests.

BigQuery

We support authentication to BigQuery using Service Account Key. The used Service Account should include the roles:

Example

datacontract.yaml

servers:
  production:
    type: bigquery
    project: datameshexample-product
    dataset: datacontract_cli_test_dataset
models:
  datacontract_cli_test_table: # corresponds to a BigQuery table
    type: table
    fields: ...

Environment Variables

Environment Variable Example Description
DATACONTRACT_BIGQUERY_ACCOUNT_INFO_JSON_PATH ~/service-access-key.json Service Access key as saved on key creation by BigQuery

Databricks

Works with Unity Catalog and Hive metastore.

Needs a running SQL warehouse or compute cluster.

Example

datacontract.yaml

servers:
  production:
    type: databricks
    host: dbc-abcdefgh-1234.cloud.databricks.com
    catalog: acme_catalog_prod
    schema: orders_latest
models:
  orders: # corresponds to a table
    type: table
    fields: ...

Environment Variables

Environment Variable Example Description
DATACONTRACT_DATABRICKS_TOKEN dapia00000000000000000000000000000 The personal access token to authenticate
DATACONTRACT_DATABRICKS_HTTP_PATH /sql/1.0/warehouses/b053a3ffffffff The HTTP path to the SQL warehouse or compute cluster

Databricks (programmatic)

Works with Unity Catalog and Hive metastore. When running in a notebook or pipeline, the provided spark session can be used. An additional authentication is not required.

Requires a Databricks Runtime with Python >= 3.10.

Example

datacontract.yaml

servers:
  production:
    type: databricks
    host: dbc-abcdefgh-1234.cloud.databricks.com # ignored, always use current host
    catalog: acme_catalog_prod
    schema: orders_latest
models:
  orders: # corresponds to a table
    type: table
    fields: ...

Notebook

%pip install datacontract-cli
dbutils.library.restartPython()

from datacontract.data_contract import DataContract

data_contract = DataContract(
  data_contract_file="/Volumes/acme_catalog_prod/orders_latest/datacontract/datacontract.yaml", 
  spark=spark)
run = data_contract.test()
run.result

Kafka

Kafka support is currently considered experimental.

Example

datacontract.yaml

servers:
  production:
    type: kafka
    host: abc-12345.eu-central-1.aws.confluent.cloud:9092
    topic: my-topic-name
    format: json

Environment Variables

Environment Variable Example Description
DATACONTRACT_KAFKA_SASL_USERNAME xxx The SASL username (key).
DATACONTRACT_KAFKA_SASL_PASSWORD xxx The SASL password (secret).

Exports

# Example export to dbt model
datacontract export --format dbt

Available export options:

Type Description Status
jsonschema Export to JSON Schema
odcs Export to Open Data Contract Standard (ODCS)
sodacl Export to SodaCL quality checks in YAML format
dbt Export to dbt models in YAML format
dbt-sources Export to dbt sources in YAML format
dbt-staging-sql Export to dbt staging SQL models
rdf Export data contract to RDF representation in N3 format
avro Export to AVRO models
protobuf Export to Protobuf
terraform Export to terraform resources
sql Export to SQL DDL
sql-query Export to SQL Query
great-expectations Export to Great Expectations Suites in JSON Format
bigquery Export to BigQuery Schemas TBD
pydantic Export to pydantic models TBD
html Export to HTML page TBD
Missing something? Please create an issue on GitHub TBD

Great Expectations

The export function transforms a specified data contract into a comprehensive Great Expectations JSON suite. If the contract includes multiple models, you need to specify the names of the model you wish to export.

datacontract  export datacontract.yaml --format great-expectations --model orders 

The export creates a list of expectations by utilizing:

RDF

The export function converts a given data contract into a RDF representation. You have the option to add a base_url which will be used as the default prefix to resolve relative IRIs inside the document.

datacontract export --format rdf --rdf-base https://www.example.com/ datacontract.yaml

The data contract is mapped onto the following concepts of a yet to be defined Data Contract Ontology named https://datacontract.com/DataContractSpecification/ :

Having the data contract inside an RDF Graph gives us access the following use cases:

Imports

# Example import from SQL DDL
datacontract import --format sql --source my_ddl.sql

Available import options:

Type Description Status
sql Import from SQL DDL
avro Import from AVRO schemas
protobuf Import from Protobuf schemas TBD
jsonschema Import from JSON Schemas TBD
bigquery Import from BigQuery Schemas TBD
dbt Import from dbt models TBD
odcs Import from Open Data Contract Standard (ODCS) TBD
Missing something? Please create an issue on GitHub TBD

Best Practices

We share best practices in using the Data Contract CLI.

Data-first Approach

Create a data contract based on the actual data. This is the fastest way to get started and to get feedback from the data consumers.

  1. Use an existing physical schema (e.g., SQL DDL) as a starting point to define your logical data model in the contract. Double check right after the import whether the actual data meets the imported logical data model. Just to be sure.
     $ datacontract import --format sql ddl.sql
     $ datacontract test
    
  2. Add examples to the datacontract.yaml. If you can, use actual data and anonymize. Make sure that the examples match the imported logical data model.
    $ datacontract test --examples
    
  3. Add quality checks and additional type constraints one by one to the contract and make sure the examples and the actual data still adheres to the contract. Check against examples for a very fast feedback loop.
    $ datacontract test --examples
    $ datacontract test
    
  4. Make sure that all the best practices for a datacontract.yaml are met using the linter. You probably forgot to document some fields and add the terms and conditions.
    $ datacontract lint
    
  5. Set up a CI pipeline that executes daily and reports the results to the Data Mesh Manager. Or to some place else. You can even publish to any opentelemetry compatible system.
    $ datacontract test --publish https://api.datamesh-manager.com/api/runs
    

Contract-First

Create a data contract based on the requirements from use cases.

  1. Start with a datacontract.yaml template.
    $ datacontract init
    
  2. Add examples to the datacontract.yaml. Do not start with the data model, although you are probably tempted to do that. Examples are the fastest way to get feedback from everybody and not loose someone in the discussion.

  3. Create the model based on the examples. Test the model against the examples to double-check whether the model matches the examples.
     $ datacontract test --examples
    
  4. Add quality checks and additional type constraints one by one to the contract and make sure the examples and the actual data still adheres to the contract. Check against examples for a very fast feedback loop.
     $ datacontract test --examples
    
  5. Fill in the terms, descriptions, etc. Make sure you follow all best practices for a datacontract.yaml using the linter.
     $ datacontract lint
    
  6. Set up a CI pipeline that lints and tests the examples so you make sure that any changes later do not decrease the quality of the contract.
     $ datacontract lint
     $ datacontract test --examples
    
  7. Use the export function to start building the providing data product as well as the integration into the consuming data products.
     # data provider
     $ datacontract export --format dbt
     # data consumer
     $ datacontract export --format dbt-sources
     $ datacontract export --format dbt-staging-sql
    

Schema Evolution

Non-breaking Changes

Examples: adding models or fields

Development Setup

Python base interpreter should be 3.11.x (unless working on 3.12 release candidate).

# create venv
python3 -m venv venv
source venv/bin/activate

# Install Requirements
pip install --upgrade pip setuptools wheel
pip install -e '.[dev]'
ruff check --fix
ruff format --check
pytest

Release

git tag v0.9.0
git push origin v0.9.0
python3 -m pip install --upgrade build twine
rm -r dist/
python3 -m build
# for now only test.pypi.org
python3 -m twine upload --repository testpypi dist/*

Docker Build

docker build -t datacontract/cli .
docker run --rm -v ${PWD}:/home/datacontract datacontract/cli

Release Steps

  1. Update the version in pyproject.toml
  2. Have a look at the CHANGELOG.md
  3. Create release commit manually
  4. Execute ./release
  5. Wait until GitHub Release is created
  6. Add the release notes to the GitHub Release

Contribution

We are happy to receive your contributions. Propose your change in an issue or directly create a pull request with your improvements.

License

MIT License

Credits

Created by Stefan Negele and Jochen Christ.