Iceberg
Important Capabilities
Capability | Status | Notes |
---|---|---|
Data Profiling | ✅ | Optionally enabled via configuration. |
Descriptions | ✅ | Enabled by default. |
Detect Deleted Entities | ✅ | Enabled via stateful ingestion |
Domains | ❌ | Currently not supported. |
Extract Ownership | ✅ | Automatically ingests ownership information from table properties based on user_ownership_property and group_ownership_property |
Partition Support | ❌ | Currently not supported. |
Platform Instance | ✅ | Optionally enabled via configuration, an Iceberg instance represents the catalog name where the table is stored. |
Integration Details
The DataHub Iceberg source plugin extracts metadata from Iceberg tables stored in a distributed or local file system. Typically, Iceberg tables are stored in a distributed file system like S3 or Azure Data Lake Storage (ADLS) and registered in a catalog. There are various catalog implementations like Filesystem-based, RDBMS-based or even REST-based catalogs. This Iceberg source plugin relies on the pyiceberg library.
CLI based Ingestion
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
catalog ✅ map(str,object) | |
group_ownership_property string | Iceberg table property to look for a CorpGroup owner. Can only hold a single group value. If property has no value, no owner information will be emitted. |
platform_instance string | The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://datahubproject.io/docs/platform-instances/ for more details. |
processing_threads integer | How many threads will be processing tables Default: 1 |
user_ownership_property string | Iceberg table property to look for a CorpUser owner. Can only hold a single user value. If property has no value, no owner information will be emitted. Default: owner |
env string | The environment that all assets produced by this connector belong to Default: PROD |
namespace_pattern AllowDenyPattern | Regex patterns for namespaces to filter in ingestion. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
namespace_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
namespace_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
namespace_pattern.allow.string string | |
namespace_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
namespace_pattern.deny.string string | |
table_pattern AllowDenyPattern | Regex patterns for tables to filter in ingestion. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
table_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
table_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
table_pattern.allow.string string | |
table_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
table_pattern.deny.string string | |
profiling IcebergProfilingConfig | Default: {'enabled': False, 'include_field_null_count': Tru... |
profiling.enabled boolean | Whether profiling should be done. Default: False |
profiling.include_field_max_value boolean | Whether to profile for the max value of numeric columns. Default: True |
profiling.include_field_min_value boolean | Whether to profile for the min value of numeric columns. Default: True |
profiling.include_field_null_count boolean | Whether to profile for the number of nulls for each column. Default: True |
profiling.operation_config OperationConfig | Experimental feature. To specify operation configs. |
profiling.operation_config.lower_freq_profile_enabled boolean | Whether to do profiling at lower freq or not. This does not do any scheduling just adds additional checks to when not to run profiling. Default: False |
profiling.operation_config.profile_date_of_month integer | Number between 1 to 31 for date of month (both inclusive). If not specified, defaults to Nothing and this field does not take affect. |
profiling.operation_config.profile_day_of_week integer | Number between 0 to 6 for day of week (both inclusive). 0 is Monday and 6 is Sunday. If not specified, defaults to Nothing and this field does not take affect. |
stateful_ingestion StatefulStaleMetadataRemovalConfig | Iceberg Stateful Ingestion Config. |
stateful_ingestion.enabled boolean | Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False Default: False |
stateful_ingestion.fail_safe_threshold number | Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'. Default: 75.0 |
stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True |
The JSONSchema for this configuration is inlined below.
{
"title": "IcebergSourceConfig",
"description": "Base configuration class for stateful ingestion for source configs to inherit from.",
"type": "object",
"properties": {
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://datahubproject.io/docs/platform-instances/ for more details.",
"type": "string"
},
"stateful_ingestion": {
"title": "Stateful Ingestion",
"description": "Iceberg Stateful Ingestion Config.",
"allOf": [
{
"$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
}
]
},
"catalog": {
"title": "Catalog",
"description": "Catalog configuration where to find Iceberg tables. Only one catalog specification is supported. The format is the same as [pyiceberg's catalog configuration](https://py.iceberg.apache.org/configuration/), where the catalog name is specified as the object name and attributes are set as key-value pairs.",
"type": "object",
"additionalProperties": {
"type": "object"
}
},
"table_pattern": {
"title": "Table Pattern",
"description": "Regex patterns for tables to filter in ingestion.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"namespace_pattern": {
"title": "Namespace Pattern",
"description": "Regex patterns for namespaces to filter in ingestion.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"user_ownership_property": {
"title": "User Ownership Property",
"description": "Iceberg table property to look for a `CorpUser` owner. Can only hold a single user value. If property has no value, no owner information will be emitted.",
"default": "owner",
"type": "string"
},
"group_ownership_property": {
"title": "Group Ownership Property",
"description": "Iceberg table property to look for a `CorpGroup` owner. Can only hold a single group value. If property has no value, no owner information will be emitted.",
"type": "string"
},
"profiling": {
"title": "Profiling",
"default": {
"enabled": false,
"include_field_null_count": true,
"include_field_min_value": true,
"include_field_max_value": true,
"operation_config": {
"lower_freq_profile_enabled": false,
"profile_day_of_week": null,
"profile_date_of_month": null
}
},
"allOf": [
{
"$ref": "#/definitions/IcebergProfilingConfig"
}
]
},
"processing_threads": {
"title": "Processing Threads",
"description": "How many threads will be processing tables",
"default": 1,
"type": "integer"
}
},
"required": [
"catalog"
],
"additionalProperties": false,
"definitions": {
"DynamicTypedStateProviderConfig": {
"title": "DynamicTypedStateProviderConfig",
"type": "object",
"properties": {
"type": {
"title": "Type",
"description": "The type of the state provider to use. For DataHub use `datahub`",
"type": "string"
},
"config": {
"title": "Config",
"description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19).",
"default": {},
"type": "object"
}
},
"required": [
"type"
],
"additionalProperties": false
},
"StatefulStaleMetadataRemovalConfig": {
"title": "StatefulStaleMetadataRemovalConfig",
"description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or `datahub_api` is specified, otherwise False",
"default": false,
"type": "boolean"
},
"remove_stale_metadata": {
"title": "Remove Stale Metadata",
"description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
"default": true,
"type": "boolean"
},
"fail_safe_threshold": {
"title": "Fail Safe Threshold",
"description": "Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'.",
"default": 75.0,
"minimum": 0.0,
"maximum": 100.0,
"type": "number"
}
},
"additionalProperties": false
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"OperationConfig": {
"title": "OperationConfig",
"type": "object",
"properties": {
"lower_freq_profile_enabled": {
"title": "Lower Freq Profile Enabled",
"description": "Whether to do profiling at lower freq or not. This does not do any scheduling just adds additional checks to when not to run profiling.",
"default": false,
"type": "boolean"
},
"profile_day_of_week": {
"title": "Profile Day Of Week",
"description": "Number between 0 to 6 for day of week (both inclusive). 0 is Monday and 6 is Sunday. If not specified, defaults to Nothing and this field does not take affect.",
"type": "integer"
},
"profile_date_of_month": {
"title": "Profile Date Of Month",
"description": "Number between 1 to 31 for date of month (both inclusive). If not specified, defaults to Nothing and this field does not take affect.",
"type": "integer"
}
},
"additionalProperties": false
},
"IcebergProfilingConfig": {
"title": "IcebergProfilingConfig",
"type": "object",
"properties": {
"enabled": {
"title": "Enabled",
"description": "Whether profiling should be done.",
"default": false,
"type": "boolean"
},
"include_field_null_count": {
"title": "Include Field Null Count",
"description": "Whether to profile for the number of nulls for each column.",
"default": true,
"type": "boolean"
},
"include_field_min_value": {
"title": "Include Field Min Value",
"description": "Whether to profile for the min value of numeric columns.",
"default": true,
"type": "boolean"
},
"include_field_max_value": {
"title": "Include Field Max Value",
"description": "Whether to profile for the max value of numeric columns.",
"default": true,
"type": "boolean"
},
"operation_config": {
"title": "Operation Config",
"description": "Experimental feature. To specify operation configs.",
"allOf": [
{
"$ref": "#/definitions/OperationConfig"
}
]
}
},
"additionalProperties": false
}
}
}
Setting up connection to an Iceberg catalog
There are multiple servers compatible with the Iceberg Catalog specification. DataHub's iceberg
connector uses pyiceberg
library to extract metadata from them. The recipe for the source consists of 2 parts:
catalog
part which is passed as-is to thepyiceberg
library and configures the connection and its details (i.e. authentication). The name of catalog specified in the recipe has no consequence, it is just a formal requirement from the library. Only one catalog will be considered for the ingestion.- The remaining configuration consists of parameters, such as
env
orstateful_ingestion
which are standard DataHub's ingestor configuration parameters and are described in the Config Details chapter.
This chapter showcases several examples of setting up connections to an Iceberg catalog, varying based on the underlying implementation. Iceberg is designed to have catalog and warehouse separated, which is reflected in how we configure it. It is especially visible when using Iceberg REST Catalog - which can use many blob storages (AWS S3, Azure Blob Storage, MinIO) as a warehouse.
Note that, for advanced users, it is possible to specify a custom catalog client implementation via py-catalog-impl
configuration option - refer to pyiceberg
documentation on details.
Glue catalog + S3 warehouse
The minimal configuration for connecting to Glue catalog with S3 warehouse:
source:
type: "iceberg"
config:
env: dev
catalog:
my_catalog:
type: "glue"
s3.region: "us-west-2"
region_name: "us-west-2"
Where us-west-2
is the region from which you want to ingest. The above configuration will work assuming your pod or environment in which
you run your datahub CLI is already authenticated to AWS and has proper permissions granted (see below). If you need
to specify secrets directly, use the following configuration as the template:
source:
type: "iceberg"
config:
env: dev
catalog:
demo:
type: "glue"
s3.region: "us-west-2"
s3.access-key-id: "${AWS_ACCESS_KEY_ID}"
s3.secret-access-key: "${AWS_SECRET_ACCESS_KEY}"
s3.session-token: "${AWS_SESSION_TOKEN}"
aws_access_key_id: "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
aws_session_token: "${AWS_SESSION_TOKEN}"
region_name: "us-west-2"
This example uses references to fill credentials (either from Secrets defined in Managed Ingestion or environmental variables). It is possible (but not recommended due to security concerns) to provide those values in plaintext, directly in the recipe.
Glue and S3 permissions required
The role used by the ingestor for ingesting metadata from Glue Iceberg Catalog and S3 warehouse is:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["glue:GetDatabases", "glue:GetTables", "glue:GetTable"],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:ListBucket", "s3:GetObjectVersion"],
"Resource": [
"arn:aws:s3:::<bucket used by the warehouse>",
"arn:aws:s3:::<bucket used by the warehouse>/*"
]
}
]
}
Iceberg REST Catalog + MinIO
The following configuration assumes MinIO defines authentication using the s3.*
prefix. Note the specification of s3.endpoint
, assuming
MinIO listens on port 9000
at minio-host
. The uri
parameter points at Iceberg REST Catalog (IRC) endpoint (in this case iceberg-catalog:8181
).
source:
type: "iceberg"
config:
env: dev
catalog:
demo:
type: "rest"
uri: "http://iceberg-catalog:8181"
s3.access-key-id: "${AWS_ACCESS_KEY_ID}"
s3.secret-access-key: "${AWS_SECRET_ACCESS_KEY}"
s3.region: "eu-east-1"
s3.endpoint: "http://minio-host:9000"
Iceberg REST Catalog (with authentication) + S3
This example assumes IRC requires token authentication (via Authorization
header). There are more options available,
see https://py.iceberg.apache.org/configuration/#rest-catalog for details. Moreover, the assumption here is that the
environment (i.e. pod) is already authenticated to perform actions against AWS S3.
source:
type: "iceberg"
config:
env: dev
catalog:
demo:
type: "rest"
uri: "http://iceberg-catalog-uri"
token: "token-value"
s3.region: "us-west-2"
Special REST connection parameters for resiliency
Unlike other parameters provided in the dictionary under the catalog
key, connection
parameter is a custom feature in
DataHub, allowing to inject connection resiliency parameters to the REST connection made by the ingestor. connection
allows for 2 parameters:
timeout
is provided as amount of seconds, it needs to be whole number (ornull
to turn it off)retry
is a complex object representing parameters used to create urllib3 Retry object. There are many possible parameters, most important would betotal
(total retries) andbackoff_factor
. See the linked docs for the details.
source:
type: "iceberg"
config:
env: dev
catalog:
demo:
type: "rest"
uri: "http://iceberg-catalog-uri"
connection:
retry:
backoff_factor: 0.5
total: 3
timeout: 120
SQL catalog + Azure DLS as the warehouse
This example targets Postgres
as the sql-type Iceberg
catalog and uses Azure DLS as the warehouse.
source:
type: "iceberg"
config:
env: dev
catalog:
demo:
type: sql
uri: postgresql+psycopg2://user:password@sqldatabase.postgres.database.azure.com:5432/icebergcatalog
adlfs.tenant-id: <Azure tenant ID>
adlfs.account-name: <Azure storage account name>
adlfs.client-id: <Azure Client/Application ID>
adlfs.client-secret: <Azure Client Secret>
Concept Mapping
This ingestion source maps the following Source System Concepts to DataHub Concepts:
Source Concept | DataHub Concept | Notes |
---|---|---|
iceberg | Data Platform | |
Table | Dataset | An Iceberg table is registered inside a catalog using a name, where the catalog is responsible for creating, dropping and renaming tables. Catalogs manage a collection of tables that are usually grouped into namespaces. The name of a table is mapped to a Dataset name. If a Platform Instance is configured, it will be used as a prefix: <platform_instance>.my.namespace.table . |
Table property | CorpUser as Owner | The value of a table property can be used as the name of a CorpUser owner. This table property name can be configured with the source option user_ownership_property . |
Table property | CorpGroup | The value of a table property can be used as the name of a CorpGroup owner. This table property name can be configured with the source option group_ownership_property . |
Namespace | Container | Namespaces are mapped to containers, tables which belong to a namespace are represented by datasets which belong to the container representing their namespace. |
Table schema | SchemaField | Maps to the fields defined within the Iceberg table schema definition. |
Troubleshooting
Exceptions while increasing processing_threads
Each processing thread will open several files/sockets to download manifest files from blob storage. If you experience
exceptions appearing when increasing processing_threads
configuration parameter, try to increase limit of open
files (e.g. using ulimit
in Linux).
DataHub Iceberg REST Catalog
DataHub also implements the Iceberg REST Catalog. See the Iceberg Catalog documentation for more details.
Code Coordinates
- Class Name:
datahub.ingestion.source.iceberg.iceberg.IcebergSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for Iceberg, feel free to ping us on our Slack.