DataHub
Overview
Datahub is a DataHub utility or metadata-focused integration. Learn more in the official Datahub documentation.
The DataHub integration for Datahub covers metadata entities and operational objects relevant to this connector. Depending on module capabilities, it can also capture features such as lineage, usage, profiling, ownership, tags, and stateful deletion detection.
Concept Mapping
While the specific concept mapping is still pending, this shows the generic concept mapping in DataHub.
| Source Concept | DataHub Concept | Notes |
|---|---|---|
| Platform/account/project scope | Platform Instance, Container | Organizes assets within the platform context. |
| Core technical asset (for example table/view/topic/file) | Dataset | Primary ingested technical asset. |
| Schema fields / columns | SchemaField | Included when schema extraction is supported. |
| Ownership and collaboration principals | CorpUser, CorpGroup | Emitted by modules that support ownership and identity metadata. |
| Dependencies and processing relationships | Lineage edges | Available when lineage extraction is supported and enabled. |
Module datahub
Important Capabilities
| Capability | Status | Notes |
|---|---|---|
| Asset Containers | ✅ | Enabled by default. Supported for types - Database. |
| Detect Deleted Entities | ✅ | Enabled by default via stateful ingestion. |
Overview
Migrate data from one DataHub instance to another.
Pulls data from two locations:
- DataHub database: Versioned aspects
- DataHub Kafka: MCL Log timeseries aspects
All data is first read from the database, before timeseries data is ingested from kafka. To prevent this source from potentially running forever, it will not ingest data produced after the datahub_source ingestion job is started. This stop_time is reflected in the report.
Data from the database and kafka are read in chronological order, specifically by the createdon timestamp in the database and by kafka offset per partition. In order to properly read from the database, please ensure that the createdon column is indexed. Newly created databases should have this index, named timeIndex, by default, but older ones you may have to create yourself, with the statement:
CREATE INDEX timeIndex ON metadata_aspect_v2 (createdon);
If you do not have this index, the source may run incredibly slowly and produce significant database load.
Prerequisites
Before running ingestion, ensure network connectivity to the source, valid authentication credentials, and read permissions for metadata APIs required by this module.
Requires direct access to the database, kafka broker, and kafka schema registry of the source DataHub instance.
Install the Plugin
pip install 'acryl-datahub[datahub]'
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
pipeline_name: datahub_source_1
datahub_api:
server: "http://localhost:8080" # Migrate data from DataHub instance on localhost:8080
token: "<token>"
source:
type: datahub
config:
include_all_versions: false
database_connection:
scheme: "mysql+pymysql" # or "postgresql+psycopg2" for Postgres
host_port: "<database_host>:<database_port>"
username: "<username>"
password: "<password>"
database: "<database>"
kafka_connection:
bootstrap: "<boostrap_url>:9092"
schema_registry_url: "<schema_registry_url>:8081"
stateful_ingestion:
enabled: true
ignore_old_state: false
urn_pattern:
deny:
# Ignores all datahub metadata where the urn matches the regex
- ^denied.urn.*
allow:
# Ingests all datahub metadata where the urn matches the regex.
- ^allowed.urn.*
flags:
set_system_metadata: false # Replicate system metadata
# Here, we write to a DataHub instance
# You can also use a different sink, e.g. to write the data to a file instead
sink:
type: datahub-rest
config:
server: "<destination_gms_url>"
token: "<token>"
Config Details
- Options
- Schema
Note that a . is used to denote nested fields in the YAML recipe.
| Field | Description |
|---|---|
commit_state_interval One of integer, null | Number of records to process before committing state Default: 1000 |
commit_with_parse_errors boolean | Whether to update createdon timestamp and kafka offset despite parse errors. Enable if you want to ignore the errors. Default: False |
database_query_batch_size integer | Number of records to fetch from the database at a time Default: 10000 |
database_table_name string | Name of database table containing all versioned aspects Default: metadata_aspect_v2 |
drop_duplicate_schema_fields boolean | Whether to drop duplicate schema fields in the schemaMetadata aspect. Useful if the source system has duplicate field paths in the db, but we're pushing to a system with server-side duplicate checking. Default: False |
include_all_versions boolean | If enabled, include all versions of each aspect. Otherwise, only include the latest version of each aspect. Default: False |
include_soft_deleted_entities boolean | If enabled, include entities that have been soft deleted. Otherwise, include all entities regardless of removal status. Default: True |
kafka_topic_name string | Name of kafka topic containing timeseries MCLs Default: MetadataChangeLog_Timeseries_v1 |
preserve_system_metadata boolean | Copy system metadata from the source system Default: True |
query_timeout One of integer, null | Timeout for each query in seconds. Default: None |
database_connection One of SQLAlchemyConnectionConfig, null | Database connection config Default: None |
database_connection.host_port ❓ string | host URL |
database_connection.scheme ❓ string | scheme |
database_connection.database One of string, null | database (catalog) Default: None |
database_connection.options object | Any options specified here will be passed to SQLAlchemy.create_engine as kwargs. To set connection arguments in the URL, specify them under connect_args. |
database_connection.password One of string(password), null | password Default: None |
database_connection.sqlalchemy_uri One of string, null | URI of database to connect to. See https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls. Takes precedence over other connection parameters. Default: None |
database_connection.username One of string, null | username Default: None |
exclude_aspects array | Aspect names to exclude from entities that are being ingested. Note: This only makes sense for entities you want to ingest but without certain aspects. To completely exclude entity types, use 'urn_pattern.deny' instead. Warning: Excluding key aspects while keeping others can create invalid entities. Default: ['datahubIngestionCheckpoint', 'datahubIngestionRu... |
exclude_aspects.string string | |
kafka_connection One of KafkaConsumerConnectionConfig, null | Kafka connection config Default: None |
kafka_connection.bootstrap string | Default: localhost:9092 |
kafka_connection.client_timeout_seconds integer | The request timeout used when interacting with the Kafka APIs. Default: 60 |
kafka_connection.consumer_config object | Extra consumer config serialized as JSON. These options will be passed into Kafka's DeserializingConsumer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#deserializingconsumer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md . |
kafka_connection.schema_registry_config object | Extra schema registry config serialized as JSON. These options will be passed into Kafka's SchemaRegistryClient. https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html?#schemaregistryclient |
kafka_connection.schema_registry_url string | Schema registry URL. Can be overridden with KAFKA_SCHEMAREGISTRY_URL environment variable, or will use DATAHUB_GMS_BASE_PATH if not set. |
urn_pattern AllowDenyPattern | A class to store allow deny regexes |
urn_pattern.ignoreCase One of boolean, null | Whether to ignore case sensitivity during pattern matching. Default: True |
stateful_ingestion StatefulIngestionConfig | Basic Stateful Ingestion Specific Configuration for any source. |
stateful_ingestion.enabled boolean | Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False Default: False |
The JSONSchema for this configuration is inlined below.
{
"$defs": {
"AllowDenyPattern": {
"additionalProperties": false,
"description": "A class to store allow deny regexes",
"properties": {
"allow": {
"default": [
".*"
],
"description": "List of regex patterns to include in ingestion",
"items": {
"type": "string"
},
"title": "Allow",
"type": "array"
},
"deny": {
"default": [],
"description": "List of regex patterns to exclude from ingestion.",
"items": {
"type": "string"
},
"title": "Deny",
"type": "array"
},
"ignoreCase": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": true,
"description": "Whether to ignore case sensitivity during pattern matching.",
"title": "Ignorecase"
}
},
"title": "AllowDenyPattern",
"type": "object"
},
"KafkaConsumerConnectionConfig": {
"additionalProperties": false,
"description": "Configuration class for holding connectivity information for Kafka consumers",
"properties": {
"bootstrap": {
"default": "localhost:9092",
"title": "Bootstrap",
"type": "string"
},
"schema_registry_url": {
"description": "Schema registry URL. Can be overridden with KAFKA_SCHEMAREGISTRY_URL environment variable, or will use DATAHUB_GMS_BASE_PATH if not set.",
"title": "Schema Registry Url",
"type": "string"
},
"schema_registry_config": {
"additionalProperties": true,
"description": "Extra schema registry config serialized as JSON. These options will be passed into Kafka's SchemaRegistryClient. https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html?#schemaregistryclient",
"title": "Schema Registry Config",
"type": "object"
},
"client_timeout_seconds": {
"default": 60,
"description": "The request timeout used when interacting with the Kafka APIs.",
"title": "Client Timeout Seconds",
"type": "integer"
},
"consumer_config": {
"additionalProperties": true,
"description": "Extra consumer config serialized as JSON. These options will be passed into Kafka's DeserializingConsumer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#deserializingconsumer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md .",
"title": "Consumer Config",
"type": "object"
}
},
"title": "KafkaConsumerConnectionConfig",
"type": "object"
},
"SQLAlchemyConnectionConfig": {
"additionalProperties": false,
"properties": {
"username": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "username",
"title": "Username"
},
"password": {
"anyOf": [
{
"format": "password",
"type": "string",
"writeOnly": true
},
{
"type": "null"
}
],
"default": null,
"description": "password",
"title": "Password"
},
"host_port": {
"description": "host URL",
"title": "Host Port",
"type": "string"
},
"database": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "database (catalog)",
"title": "Database"
},
"scheme": {
"description": "scheme",
"title": "Scheme",
"type": "string"
},
"sqlalchemy_uri": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "URI of database to connect to. See https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls. Takes precedence over other connection parameters.",
"title": "Sqlalchemy Uri"
},
"options": {
"additionalProperties": true,
"description": "Any options specified here will be passed to [SQLAlchemy.create_engine](https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine) as kwargs. To set connection arguments in the URL, specify them under `connect_args`.",
"title": "Options",
"type": "object"
}
},
"required": [
"host_port",
"scheme"
],
"title": "SQLAlchemyConnectionConfig",
"type": "object"
},
"StatefulIngestionConfig": {
"additionalProperties": false,
"description": "Basic Stateful Ingestion Specific Configuration for any source.",
"properties": {
"enabled": {
"default": false,
"description": "Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
"title": "Enabled",
"type": "boolean"
}
},
"title": "StatefulIngestionConfig",
"type": "object"
}
},
"properties": {
"stateful_ingestion": {
"$ref": "#/$defs/StatefulIngestionConfig",
"default": {
"enabled": true,
"max_checkpoint_state_size": 16777216,
"state_provider": {
"config": {},
"type": "datahub"
},
"ignore_old_state": false,
"ignore_new_state": false
},
"description": "Stateful Ingestion Config"
},
"database_connection": {
"anyOf": [
{
"$ref": "#/$defs/SQLAlchemyConnectionConfig"
},
{
"type": "null"
}
],
"default": null,
"description": "Database connection config"
},
"kafka_connection": {
"anyOf": [
{
"$ref": "#/$defs/KafkaConsumerConnectionConfig"
},
{
"type": "null"
}
],
"default": null,
"description": "Kafka connection config"
},
"include_all_versions": {
"default": false,
"description": "If enabled, include all versions of each aspect. Otherwise, only include the latest version of each aspect. ",
"title": "Include All Versions",
"type": "boolean"
},
"include_soft_deleted_entities": {
"default": true,
"description": "If enabled, include entities that have been soft deleted. Otherwise, include all entities regardless of removal status. ",
"title": "Include Soft Deleted Entities",
"type": "boolean"
},
"exclude_aspects": {
"default": [
"datahubIngestionCheckpoint",
"datahubIngestionRunSummary",
"testResults"
],
"description": "Aspect names to exclude from entities that are being ingested. Note: This only makes sense for entities you want to ingest but without certain aspects. To completely exclude entity types, use 'urn_pattern.deny' instead. Warning: Excluding key aspects while keeping others can create invalid entities.",
"items": {
"type": "string"
},
"title": "Exclude Aspects",
"type": "array",
"uniqueItems": true
},
"database_query_batch_size": {
"default": 10000,
"description": "Number of records to fetch from the database at a time",
"title": "Database Query Batch Size",
"type": "integer"
},
"database_table_name": {
"default": "metadata_aspect_v2",
"description": "Name of database table containing all versioned aspects",
"title": "Database Table Name",
"type": "string"
},
"kafka_topic_name": {
"default": "MetadataChangeLog_Timeseries_v1",
"description": "Name of kafka topic containing timeseries MCLs",
"title": "Kafka Topic Name",
"type": "string"
},
"commit_state_interval": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": 1000,
"description": "Number of records to process before committing state",
"title": "Commit State Interval"
},
"commit_with_parse_errors": {
"default": false,
"description": "Whether to update createdon timestamp and kafka offset despite parse errors. Enable if you want to ignore the errors.",
"title": "Commit With Parse Errors",
"type": "boolean"
},
"urn_pattern": {
"$ref": "#/$defs/AllowDenyPattern",
"default": {
"allow": [
".*"
],
"deny": [
"urn:li:dataHubIngestionSource:.*",
"urn:li:dataHubSecret:.*",
"urn:li:globalSettings:.*",
"urn:li:dataHubExecutionRequest:.*"
],
"ignoreCase": true
},
"description": "URN patterns to filter entities. Defaults exclude environment-specific entities (ingestion sources, secrets, settings) to prevent copying encrypted credentials and creating broken entities. Recommended to keep these defaults when customizing."
},
"drop_duplicate_schema_fields": {
"default": false,
"description": "Whether to drop duplicate schema fields in the schemaMetadata aspect. Useful if the source system has duplicate field paths in the db, but we're pushing to a system with server-side duplicate checking.",
"title": "Drop Duplicate Schema Fields",
"type": "boolean"
},
"query_timeout": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Timeout for each query in seconds. ",
"title": "Query Timeout"
},
"preserve_system_metadata": {
"default": true,
"description": "Copy system metadata from the source system",
"title": "Preserve System Metadata",
"type": "boolean"
}
},
"title": "DataHubSourceConfig",
"type": "object"
}
Capabilities
Use the Important Capabilities table above as the source of truth for supported features and whether additional configuration is required.
Stateful Ingestion
The source checkpoints by database createdon and Kafka offsets so interrupted runs can resume without restarting from scratch. Use stateful_ingestion.ignore_old_state or a distinct pipeline_name when you want a full replay.
On first run, the source will read from the earliest data in the database and the earliest kafka offsets. Every commit_state_interval (default 1000) records, the source will store a checkpoint to remember its place, i.e. the last createdon timestamp and kafka offsets. This allows you to stop and restart the source without losing much progress, but note that you will re-ingest some data at the start of the new run.
If any errors are encountered in the ingestion process, e.g. we are unable to emit an aspect due to network errors, the source will keep running, but will stop committing checkpoints, unless commit_with_parse_errors (default false) is set. Thus, if you re-run the ingestion, you can re-ingest the data that was missed, but note it will all re-ingest all subsequent data.
If you want to re-ingest all data, you can set a different pipeline_name in your recipe, or set stateful_ingestion.ignore_old_state: true
Limitations
- Can only pull timeseries aspects retained by Kafka, which by default lasts 90 days.
- Does not detect hard timeseries deletions, e.g. if via a datahub delete command using the CLI. Therefore, if you deleted data in this way, it will still exist in the destination instance.
- If you have a significant amount of aspects with the exact same createdon timestamp, stateful ingestion will not be able to save checkpoints partially through that timestamp. On a subsequent run, all aspects for that timestamp will be ingested.
Performance
For large migrations, ensure metadata_aspect_v2.createdon is indexed (timeIndex), enable async ingestion on the destination, and scale consumers/GMS/Elasticsearch workers as needed.
- Enable async ingestion
- Use standalone consumers (mae-consumer and mce-consumer)
- If you are migrating large amounts of data, consider scaling consumer replicas.
- Increase the number of gms pods to add redundancy and increase resilience to node evictions
- If you are migrating large amounts of data, consider increasing elasticsearch's thread count via the
ELASTICSEARCH_THREAD_COUNTenvironment variable.
- If you are migrating large amounts of data, consider increasing elasticsearch's thread count via the
Exclusions
You will likely want to exclude some urn types from your ingestion, as they contain instance-specific metadata, such as settings, roles, policies, ingestion sources, and ingestion runs. For example, you will likely want to start with this:
source:
config:
urn_pattern: # URN pattern to ignore/include in the ingestion
deny:
# Ignores all datahub metadata where the urn matches the regex
- ^urn:li:role.* # Only exclude if you do not want to ingest roles
- ^urn:li:dataHubRole.* # Only exclude if you do not want to ingest roles
- ^urn:li:dataHubPolicy.* # Only exclude if you do not want to ingest policies
- ^urn:li:dataHubIngestionSource.* # Only exclude if you do not want to ingest ingestion sources
- ^urn:li:dataHubSecret.*
- ^urn:li:dataHubExecutionRequest.*
- ^urn:li:dataHubAccessToken.*
- ^urn:li:dataHubUpgrade.*
- ^urn:li:inviteToken.*
- ^urn:li:globalSettings.*
- ^urn:li:dataHubStepState.*
Limitations
Module behavior is constrained by source APIs, permissions, and metadata exposed by the platform. Refer to capability notes for unsupported or conditional features.
Troubleshooting
If ingestion fails, validate credentials, permissions, connectivity, and scope filters first. Then review ingestion logs for source-specific errors and adjust configuration accordingly.
Code Coordinates
- Class Name:
datahub.ingestion.source.datahub.datahub_source.DataHubSource - Browse on GitHub
If you've got any questions on configuring ingestion for DataHub, feel free to ping us on our Slack.
This page is auto-generated from the underlying source code. To make changes, please edit the relevant source files in the metadata-ingestion directory.
Tip: For quick typo fixes or documentation updates, you can click the ✏️ Edit icon directly in the GitHub UI to open a Pull Request. For larger changes and PR naming conventions, please refer to our Contributing Guide.