Operations
Why Would You Use Operations APIs?
The Operations APIs allow you to report operational changes that were made to a given Dataset or Table using the 'Operation' concept. These operations may be viewed on the Dataset Profile (e.g. as last modified time), accessed via the DataHub GraphQL API, or used as inputs to DataHub Cloud Freshness Assertions.
Goal Of This Guide
This guide will show you how to report and query Operations for a Dataset.
Supported Sources
Some ingestion sources can automatically capture operations from native audit logs, query history, table history, or object
timestamps. The table below lists sources that emit dataset-level DataHub operation aspects during ingestion.
| Source | Notes |
|---|---|
| ABS Data Lake | Enabled by default as UPDATE operations from blob timestamps. |
| BigQuery | Enabled by default via usage extraction, can be disabled via usage.include_operational_stats. |
ClickHouse clickhouse | Optionally enabled via include_query_log_operations. |
| Databricks | Enabled by default via usage extraction, can be disabled via include_operational_stats. |
| Delta Lake | Enabled by default from Delta table history. |
| Dremio | Optionally enabled via include_query_lineage; generated from Dremio job history. |
| Fabric OneLake | Optionally enabled via usage.include_usage_statistics and usage.include_operational_stats. |
| Glue | Enabled by default from Glue table created and last modified timestamps. |
| Oracle | Optionally enabled via include_query_usage and include_operational_stats. |
| Redshift | Optionally enabled via include_usage_statistics; controlled by include_operational_stats. |
| S3 / Local Files | Enabled by default as UPDATE operations from object timestamps. |
| Salesforce | Enabled by default from Salesforce object created and last modified timestamps. |
| Snowflake | Enabled by default, can be disabled via configuration include_operational_stats. |
| SQL Queries | Parsed from non-SELECT SQL queries. |
| Teradata | Optionally enabled via include_usage_statistics; controlled by usage.include_operational_stats. |
Prerequisites
For this tutorial, you need to deploy DataHub Quickstart and ingest sample data. For detailed steps, please refer to DataHub Quickstart Guide.
Before reporting operations for a dataset, you need to ensure the targeted dataset is already present in DataHub.
Report Operations
You can use report dataset operations to DataHub using the following APIs.
- GraphQL
- Python
mutation reportOperation {
reportOperation(
input: {
urn: "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)"
operationType: INSERT
sourceType: DATA_PROCESS
}
)
}
Where supported operation types include
INSERTUPDATEDELETECREATEALTERDROPCUSTOM
If you want to report an operation that happened at a specific time, you can also optionally provide
the timestampMillis field. If not provided, the current server time will be used as the operation time.
If you see the following response, the operation was successful:
{
"data": {
"reportOperation": true
},
"extensions": {}
}
# Inlined from /metadata-ingestion/examples/library/dataset_report_operation.py
from datahub.api.graphql import Operation
DATAHUB_HOST = "https//:org.acryl.io/gms"
DATAHUB_TOKEN = "<your-datahub-access-token"
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)"
operation_client = Operation(
datahub_host=DATAHUB_HOST,
datahub_token=DATAHUB_TOKEN,
)
operation_type = "INSERT"
source_type = "DATA_PROCESS" # Source of the operation (data platform or DAG task)
# Report a change operation for the Dataset.
operation_client.report_operation(
urn=dataset_urn, operation_type=operation_type, source_type=source_type
)
Read Operations
You can use read dataset operations to DataHub using the following APIs.
- GraphQL
- Python
query dataset {
dataset(urn: "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)") {
operations(
limit: 10, filter: [], startTimeMillis: <start-timestamp-ms>, endTimeMillis: <end-timestamp-ms>
) {
timestampMillis
operationType
sourceType
}
}
}
Where startTimeMillis and endTimeMillis are optional. By default, operations are sorted by time descending.
If you see the following response, the operation was successful:
{
"data": {
"dataset": {
"operations": [
{
"timestampMillis": 1231232332,
"operationType": "INSERT",
"sourceType": "DATA_PROCESS"
}
]
}
},
"extensions": {}
}
# Inlined from /metadata-ingestion/examples/library/dataset_read_operations.py
from datahub.api.graphql import Operation
DATAHUB_HOST = "https//:org.acryl.io/gms"
DATAHUB_TOKEN = "<your-datahub-access-token"
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)"
operation_client = Operation(
datahub_host=DATAHUB_HOST,
datahub_token=DATAHUB_TOKEN,
)
# Query for changes to the Dataset.
operations = operation_client.query_operations(
urn=dataset_urn,
# limit=5,
# start_time_millis=<timestamp>,
# end_time_millis=<timestamo>
)
Expected Outcomes of Reporting Operations
Reported Operations will appear when displaying the Last Updated time for a Dataset on their DataHub Profile.
They will also be used when selecting the DataHub Operation source type under the Advanced settings of a Freshness
Assertion.