DataHubGc
DataHub Garbage Collection Source Documentation
Overview
The DataHub Garbage Collection (GC) source is a maintenance component responsible for cleaning up various types of metadata to maintain system performance and data quality. It performs multiple cleanup tasks, each focusing on different aspects of DataHub's metadata.
Cleanup Tasks
1. Index Cleanup
Manages Elasticsearch indices in DataHub, particularly focusing on time-series data.
Configuration
source:
type: datahub-gc
config:
truncate_indices: true
truncate_index_older_than_days: 30
truncation_watch_until: 10000
truncation_sleep_between_seconds: 30
Features
- Truncates old Elasticsearch indices for the following timeseries aspects:
- DatasetOperations
- DatasetUsageStatistics
- ChartUsageStatistics
- DashboardUsageStatistics
- QueryUsageStatistics
- Timeseries Aspects
- Monitors truncation progress
- Implements safe deletion with monitoring thresholds
- Supports gradual truncation with sleep intervals
2. Expired Token Cleanup
Manages access tokens in DataHub to maintain security and prevent token accumulation.
Configuration
source:
type: datahub-gc
config:
cleanup_expired_tokens: true
Features
- Automatically identifies and revokes expired access tokens
- Processes tokens in batches for efficiency
- Maintains system security by removing outdated credentials
- Reports number of tokens revoked
- Uses GraphQL API for token management
3. Data Process Cleanup
Manages the lifecycle of data processes, jobs, and their instances (DPIs) within DataHub.
Features
- Cleans up Data Process Instances (DPIs) based on age and count
- Can remove empty DataJobs and DataFlows
- Supports both soft and hard deletion
- Uses parallel processing for efficient cleanup
- Maintains configurable retention policies
Configuration
source:
type: datahub-gc
config:
dataprocess_cleanup:
enabled: true
retention_days: 10
keep_last_n: 5
delete_empty_data_jobs: false
delete_empty_data_flows: false
hard_delete_entities: false
batch_size: 500
max_workers: 10
delay: 0.25
Limitations
- Maximum 9000 DPIs per job for performance
4. Execution Request Cleanup
Manages DataHub execution request records to prevent accumulation of historical execution data.
Features
- Maintains execution history per ingestion source
- Preserves minimum number of recent requests
- Removes old requests beyond retention period
- Special handling for running/pending requests
- Automatic cleanup of corrupted records
Configuration
source:
type: datahub-gc
config:
execution_request_cleanup:
enabled: true
keep_history_min_count: 10
keep_history_max_count: 1000
keep_history_max_days: 30
batch_read_size: 100
runtime_limit_seconds: 3600
max_read_errors: 10
5. Soft-Deleted Entities Cleanup
Manages the permanent removal of soft-deleted entities after a retention period.
Features
- Permanently removes soft-deleted entities after retention period
- Handles entity references cleanup
- Special handling for query entities
- Supports filtering by entity type, platform, or environment
- Concurrent processing with safety limits
Configuration
source:
type: datahub-gc
config:
soft_deleted_entities_cleanup:
enabled: true
retention_days: 10
batch_size: 500
max_workers: 10
delay: 0.25
entity_types: null # Optional list of entity types to clean
platform: null # Optional platform filter
env: null # Optional environment filter
query: null # Optional custom query filter
limit_entities_delete: 25000
futures_max_at_time: 1000
runtime_limit_seconds: 7200
Performance Considerations
- Concurrent processing using thread pools
- Configurable batch sizes for optimal performance
- Rate limiting through configurable delays
- Maximum limits on concurrent operations
Reporting
Each cleanup task maintains detailed reports including:
- Number of entities processed
- Number of entities removed
- Errors encountered
- Sample of affected entities
- Runtime statistics
- Task-specific metrics
DataHubGcSource is responsible for performing garbage collection tasks on DataHub.
This source performs the following tasks:
- Cleans up expired tokens.
- Truncates Elasticsearch indices based on configuration.
- Cleans up data processes and soft-deleted entities if configured.
CLI based Ingestion
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
cleanup_expired_tokens boolean | Whether to clean up expired tokens or not Default: True |
dry_run boolean | Whether to perform a dry run or not. This is only supported for dataprocess cleanup and soft deleted entities cleanup. Default: False |
truncate_index_older_than_days integer | Indices older than this number of days will be truncated Default: 30 |
truncate_indices boolean | Whether to truncate elasticsearch indices or not which can be safely truncated Default: True |
truncation_sleep_between_seconds integer | Sleep between truncation monitoring. Default: 30 |
truncation_watch_until integer | Wait for truncation of indices until this number of documents are left Default: 10000 |
dataprocess_cleanup DataProcessCleanupConfig | Configuration for data process cleanup |
dataprocess_cleanup.batch_size integer | The number of entities to get in a batch from API Default: 500 |
dataprocess_cleanup.delay number | Delay between each batch Default: 0.25 |
dataprocess_cleanup.delete_empty_data_flows boolean | Whether to delete Data Flows without runs Default: False |
dataprocess_cleanup.delete_empty_data_jobs boolean | Whether to delete Data Jobs without runs Default: False |
dataprocess_cleanup.enabled boolean | Whether to do data process cleanup. Default: True |
dataprocess_cleanup.hard_delete_entities boolean | Whether to hard delete entities Default: False |
dataprocess_cleanup.keep_last_n integer | Number of latest aspects to keep Default: 5 |
dataprocess_cleanup.max_workers integer | The number of workers to use for deletion Default: 10 |
dataprocess_cleanup.retention_days integer | Number of days to retain metadata in DataHub Default: 10 |
dataprocess_cleanup.aspects_to_clean array | List of aspect names to clean up Default: ['DataprocessInstance'] |
dataprocess_cleanup.aspects_to_clean.string string | |
execution_request_cleanup DatahubExecutionRequestCleanupConfig | Configuration for execution request cleanup |
execution_request_cleanup.batch_read_size integer | Number of records per read operation Default: 100 |
execution_request_cleanup.enabled boolean | Global switch for this cleanup task Default: True |
execution_request_cleanup.keep_history_max_count integer | Maximum number of execution requests to keep, per ingestion source Default: 1000 |
execution_request_cleanup.keep_history_max_days integer | Maximum number of days to keep execution requests for, per ingestion source Default: 90 |
execution_request_cleanup.keep_history_min_count integer | Minimum number of execution requests to keep, per ingestion source Default: 10 |
execution_request_cleanup.limit_entities_delete integer | Max number of execution requests to hard delete. Default: 10000 |
execution_request_cleanup.max_read_errors integer | Maximum number of read errors before aborting Default: 10 |
execution_request_cleanup.runtime_limit_seconds integer | Maximum runtime in seconds for the cleanup task Default: 3600 |
soft_deleted_entities_cleanup SoftDeletedEntitiesCleanupConfig | Configuration for soft deleted entities cleanup |
soft_deleted_entities_cleanup.batch_size integer | The number of entities to get in a batch from GraphQL Default: 500 |
soft_deleted_entities_cleanup.delay number | Delay between each batch Default: 0.25 |
soft_deleted_entities_cleanup.enabled boolean | Whether to do soft deletion cleanup. Default: True |
soft_deleted_entities_cleanup.futures_max_at_time integer | Max number of futures to have at a time. Default: 1000 |
soft_deleted_entities_cleanup.limit_entities_delete integer | Max number of entities to delete. Default: 25000 |
soft_deleted_entities_cleanup.max_workers integer | The number of workers to use for deletion Default: 10 |
soft_deleted_entities_cleanup.platform string | Platform to cleanup |
soft_deleted_entities_cleanup.query string | Query to filter entities |
soft_deleted_entities_cleanup.retention_days integer | Number of days to retain metadata in DataHub Default: 10 |
soft_deleted_entities_cleanup.runtime_limit_seconds integer | Runtime limit in seconds Default: 7200 |
soft_deleted_entities_cleanup.env string | Environment to cleanup |
soft_deleted_entities_cleanup.entity_types array | List of entity types to cleanup Default: ['dataset', 'dashboard', 'chart', 'mlmodel', 'mlmo... |
soft_deleted_entities_cleanup.entity_types.string string |
The JSONSchema for this configuration is inlined below.
{
"title": "DataHubGcSourceConfig",
"type": "object",
"properties": {
"dry_run": {
"title": "Dry Run",
"description": "Whether to perform a dry run or not. This is only supported for dataprocess cleanup and soft deleted entities cleanup.",
"default": false,
"type": "boolean"
},
"cleanup_expired_tokens": {
"title": "Cleanup Expired Tokens",
"description": "Whether to clean up expired tokens or not",
"default": true,
"type": "boolean"
},
"truncate_indices": {
"title": "Truncate Indices",
"description": "Whether to truncate elasticsearch indices or not which can be safely truncated",
"default": true,
"type": "boolean"
},
"truncate_index_older_than_days": {
"title": "Truncate Index Older Than Days",
"description": "Indices older than this number of days will be truncated",
"default": 30,
"type": "integer"
},
"truncation_watch_until": {
"title": "Truncation Watch Until",
"description": "Wait for truncation of indices until this number of documents are left",
"default": 10000,
"type": "integer"
},
"truncation_sleep_between_seconds": {
"title": "Truncation Sleep Between Seconds",
"description": "Sleep between truncation monitoring.",
"default": 30,
"type": "integer"
},
"dataprocess_cleanup": {
"title": "Dataprocess Cleanup",
"description": "Configuration for data process cleanup",
"allOf": [
{
"$ref": "#/definitions/DataProcessCleanupConfig"
}
]
},
"soft_deleted_entities_cleanup": {
"title": "Soft Deleted Entities Cleanup",
"description": "Configuration for soft deleted entities cleanup",
"allOf": [
{
"$ref": "#/definitions/SoftDeletedEntitiesCleanupConfig"
}
]
},
"execution_request_cleanup": {
"title": "Execution Request Cleanup",
"description": "Configuration for execution request cleanup",
"allOf": [
{
"$ref": "#/definitions/DatahubExecutionRequestCleanupConfig"
}
]
}
},
"additionalProperties": false,
"definitions": {
"DataProcessCleanupConfig": {
"title": "DataProcessCleanupConfig",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether to do data process cleanup.",
"default": true,
"type": "boolean"
},
"retention_days": {
"title": "Retention Days",
"description": "Number of days to retain metadata in DataHub",
"default": 10,
"type": "integer"
},
"aspects_to_clean": {
"title": "Aspects To Clean",
"description": "List of aspect names to clean up",
"default": [
"DataprocessInstance"
],
"type": "array",
"items": {
"type": "string"
}
},
"keep_last_n": {
"title": "Keep Last N",
"description": "Number of latest aspects to keep",
"default": 5,
"type": "integer"
},
"delete_empty_data_jobs": {
"title": "Delete Empty Data Jobs",
"description": "Whether to delete Data Jobs without runs",
"default": false,
"type": "boolean"
},
"delete_empty_data_flows": {
"title": "Delete Empty Data Flows",
"description": "Whether to delete Data Flows without runs",
"default": false,
"type": "boolean"
},
"hard_delete_entities": {
"title": "Hard Delete Entities",
"description": "Whether to hard delete entities",
"default": false,
"type": "boolean"
},
"batch_size": {
"title": "Batch Size",
"description": "The number of entities to get in a batch from API",
"default": 500,
"type": "integer"
},
"max_workers": {
"title": "Max Workers",
"description": "The number of workers to use for deletion",
"default": 10,
"type": "integer"
},
"delay": {
"title": "Delay",
"description": "Delay between each batch",
"default": 0.25,
"type": "number"
}
},
"additionalProperties": false
},
"SoftDeletedEntitiesCleanupConfig": {
"title": "SoftDeletedEntitiesCleanupConfig",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether to do soft deletion cleanup.",
"default": true,
"type": "boolean"
},
"retention_days": {
"title": "Retention Days",
"description": "Number of days to retain metadata in DataHub",
"default": 10,
"type": "integer"
},
"batch_size": {
"title": "Batch Size",
"description": "The number of entities to get in a batch from GraphQL",
"default": 500,
"type": "integer"
},
"delay": {
"title": "Delay",
"description": "Delay between each batch",
"default": 0.25,
"type": "number"
},
"max_workers": {
"title": "Max Workers",
"description": "The number of workers to use for deletion",
"default": 10,
"type": "integer"
},
"entity_types": {
"title": "Entity Types",
"description": "List of entity types to cleanup",
"default": [
"dataset",
"dashboard",
"chart",
"mlmodel",
"mlmodelGroup",
"mlfeatureTable",
"mlfeature",
"mlprimaryKey",
"dataFlow",
"dataJob",
"glossaryTerm",
"glossaryNode",
"tag",
"role",
"corpuser",
"corpGroup",
"container",
"domain",
"dataProduct",
"notebook",
"businessAttribute",
"schemaField",
"query",
"dataProcessInstance"
],
"type": "array",
"items": {
"type": "string"
}
},
"platform": {
"title": "Platform",
"description": "Platform to cleanup",
"type": "string"
},
"env": {
"title": "Env",
"description": "Environment to cleanup",
"type": "string"
},
"query": {
"title": "Query",
"description": "Query to filter entities",
"type": "string"
},
"limit_entities_delete": {
"title": "Limit Entities Delete",
"description": "Max number of entities to delete.",
"default": 25000,
"type": "integer"
},
"futures_max_at_time": {
"title": "Futures Max At Time",
"description": "Max number of futures to have at a time.",
"default": 1000,
"type": "integer"
},
"runtime_limit_seconds": {
"title": "Runtime Limit Seconds",
"description": "Runtime limit in seconds",
"default": 7200,
"type": "integer"
}
},
"additionalProperties": false
},
"DatahubExecutionRequestCleanupConfig": {
"title": "DatahubExecutionRequestCleanupConfig",
"type": "object",
"properties": {
"keep_history_min_count": {
"title": "Keep History Min Count",
"description": "Minimum number of execution requests to keep, per ingestion source",
"default": 10,
"type": "integer"
},
"keep_history_max_count": {
"title": "Keep History Max Count",
"description": "Maximum number of execution requests to keep, per ingestion source",
"default": 1000,
"type": "integer"
},
"keep_history_max_days": {
"title": "Keep History Max Days",
"description": "Maximum number of days to keep execution requests for, per ingestion source",
"default": 90,
"type": "integer"
},
"batch_read_size": {
"title": "Batch Read Size",
"description": "Number of records per read operation",
"default": 100,
"type": "integer"
},
"enabled": {
"title": "Enabled",
"description": "Global switch for this cleanup task",
"default": true,
"type": "boolean"
},
"runtime_limit_seconds": {
"title": "Runtime Limit Seconds",
"description": "Maximum runtime in seconds for the cleanup task",
"default": 3600,
"type": "integer"
},
"limit_entities_delete": {
"title": "Limit Entities Delete",
"description": "Max number of execution requests to hard delete.",
"default": 10000,
"type": "integer"
},
"max_read_errors": {
"title": "Max Read Errors",
"description": "Maximum number of read errors before aborting",
"default": 10,
"type": "integer"
}
},
"additionalProperties": false
}
}
}
Code Coordinates
- Class Name:
datahub.ingestion.source.gc.datahub_gc.DataHubGcSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for DataHubGc, feel free to ping us on our Slack.