Skip to main content

Kafka

Overview

Kafka is a streaming or integration platform. Learn more in the official Kafka documentation.

The DataHub integration for Kafka covers streaming/integration entities such as topics, connectors, pipelines, or jobs. Depending on module capabilities, it can also capture features such as lineage, usage, profiling, ownership, tags, and stateful deletion detection.

Concept Mapping

While the specific concept mapping is still pending, this shows the generic concept mapping in DataHub.

Source ConceptDataHub ConceptNotes
Platform/account/project scopePlatform Instance, ContainerOrganizes assets within the platform context.
Core technical asset (for example table/view/topic/file)DatasetPrimary ingested technical asset.
Schema fields / columnsSchemaFieldIncluded when schema extraction is supported.
Ownership and collaboration principalsCorpUser, CorpGroupEmitted by modules that support ownership and identity metadata.
Dependencies and processing relationshipsLineage edgesAvailable when lineage extraction is supported and enabled.

Module kafka

Certified

Important Capabilities

CapabilityStatusNotes
Column-level LineageNot supported.
Data ProfilingNot supported.
DescriptionsSet dataset description to top level doc field for Avro schema.
Detect Deleted EntitiesEnabled by default via stateful ingestion.
Platform InstanceFor multiple Kafka clusters, use the platform_instance configuration.
Schema MetadataSchemas associated with each topic are extracted from the schema registry. Avro and Protobuf (certified), JSON (incubating). Schema references are supported.
Table-Level LineageNot supported. If you use Kafka Connect, the kafka-connect source can generate lineage.
Test ConnectionEnabled by default.

Overview

The kafka module ingests metadata from Kafka into DataHub. It is intended for production ingestion workflows and module-specific capabilities are documented below.

Extract Topics & Schemas from Apache Kafka or Confluent Cloud.

This plugin extracts the following:

  • Topics from the Kafka broker
  • Schemas associated with each topic from the schema registry (Avro, Protobuf and JSON schemas are supported)

Prerequisites

Before running ingestion, ensure network connectivity to the source, valid authentication credentials, and read permissions for metadata APIs required by this module.

Install the Plugin

pip install 'acryl-datahub[kafka]'

Starter Recipe

Check out the following recipe to get started with ingestion! See below for full configuration options.

For general pointers on writing and running a recipe, see our main recipe guide.

source:
type: "kafka"
config:
platform_instance: "YOUR_CLUSTER_ID"
connection:
bootstrap: "broker:9092"
schema_registry_url: http://localhost:8081

sink:
# sink configs


Config Details

Note that a . is used to denote nested fields in the YAML recipe.

FieldDescription
convert_urns_to_lowercase
boolean
Whether to convert dataset urns to lowercase.
Default: False
disable_topic_record_naming_strategy
boolean
Disables the utilization of the TopicRecordNameStrategy for Schema Registry subjects. For more information, visit: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#handling-differences-between-preregistered-and-client-derived-schemas:~:text=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
Default: False
enable_meta_mapping
boolean
When enabled, applies the mappings that are defined through the meta_mapping directives.
Default: True
external_url_base
One of string, null
Base URL for external platform (e.g. Aiven) where topics can be viewed. The topic name will be appended to this base URL.
Default: None
field_meta_mapping
object
mapping rules that will be executed against field-level schema properties. Refer to the section below on meta automated mappings.
Default: {}
ignore_warnings_on_schema_type
boolean
Disables warnings reported for non-AVRO/Protobuf value or key schemas if set.
Default: False
ingest_schemas_as_entities
boolean
Enables ingesting schemas from schema registry as separate entities, in addition to the topics
Default: False
meta_mapping
object
mapping rules that will be executed against top-level schema properties. Refer to the section below on meta automated mappings.
Default: {}
platform_instance
One of string, null
The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details.
Default: None
schema_registry_class
string
The fully qualified implementation class(custom) that implements the KafkaSchemaRegistryBase interface.
Default: datahub.ingestion.source.confluent_schema_registry...
schema_tags_field
string
The field name in the schema metadata that contains the tags to be added to the dataset.
Default: tags
strip_user_ids_from_email
boolean
Whether or not to strip email id while adding owners using meta mappings.
Default: False
tag_prefix
string
Prefix added to tags during ingestion.
Default:
topic_subject_map
map(str,string)
env
string
The environment that all assets produced by this connector belong to
Default: PROD
connection
KafkaConsumerConnectionConfig
Configuration class for holding connectivity information for Kafka consumers
connection.bootstrap
string
Default: localhost:9092
connection.client_timeout_seconds
integer
The request timeout used when interacting with the Kafka APIs.
Default: 60
connection.consumer_config
object
Extra consumer config serialized as JSON. These options will be passed into Kafka's DeserializingConsumer. See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#deserializingconsumer and https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md .
connection.schema_registry_config
object
Extra schema registry config serialized as JSON. These options will be passed into Kafka's SchemaRegistryClient. https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html?#schemaregistryclient
connection.schema_registry_url
string
Schema registry URL. Can be overridden with KAFKA_SCHEMAREGISTRY_URL environment variable, or will use DATAHUB_GMS_BASE_PATH if not set.
domain
map(str,AllowDenyPattern)
A class to store allow deny regexes
domain.key.allow
array
List of regex patterns to include in ingestion
Default: ['.*']
domain.key.allow.string
string
domain.key.ignoreCase
One of boolean, null
Whether to ignore case sensitivity during pattern matching.
Default: True
domain.key.deny
array
List of regex patterns to exclude from ingestion.
Default: []
domain.key.deny.string
string
topic_patterns
AllowDenyPattern
A class to store allow deny regexes
topic_patterns.ignoreCase
One of boolean, null
Whether to ignore case sensitivity during pattern matching.
Default: True
topic_patterns.allow
array
List of regex patterns to include in ingestion
Default: ['.*']
topic_patterns.allow.string
string
topic_patterns.deny
array
List of regex patterns to exclude from ingestion.
Default: []
topic_patterns.deny.string
string
stateful_ingestion
One of StatefulStaleMetadataRemovalConfig, null
Default: None
stateful_ingestion.enabled
boolean
Whether or not to enable stateful ingest. Default: True if a pipeline_name is set and either a datahub-rest sink or datahub_api is specified, otherwise False
Default: False
stateful_ingestion.fail_safe_threshold
number
Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'.
Default: 75.0
stateful_ingestion.remove_stale_metadata
boolean
Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.
Default: True

Capabilities

note

Stateful Ingestion is available only when a Platform Instance is assigned to this source.

Use the Important Capabilities table above as the source of truth for supported features and whether additional configuration is required.

Connecting to Confluent Cloud

If using Confluent Cloud you can use a recipe like this. In this consumer_config.sasl.username and consumer_config.sasl.password are the API credentials that you get (in the Confluent UI) from your cluster -> Data Integration -> API Keys. schema_registry_config.basic.auth.user.info has API credentials for Confluent schema registry which you get (in Confluent UI) from Schema Registry -> API credentials.

When creating API Key for the cluster ensure that the ACLs associated with the key are set like below. This is required for DataHub to read topic metadata from topics in Confluent Cloud.

Topic Name = *
Permission = ALLOW
Operation = DESCRIBE
Pattern Type = LITERAL
source:
type: "kafka"
config:
platform_instance: "YOUR_CLUSTER_ID"
connection:
bootstrap: "abc-defg.eu-west-1.aws.confluent.cloud:9092"
consumer_config:
security.protocol: "SASL_SSL"
sasl.mechanism: "PLAIN"
sasl.username: "${CLUSTER_API_KEY_ID}"
sasl.password: "${CLUSTER_API_KEY_SECRET}"
schema_registry_url: "https://abc-defgh.us-east-2.aws.confluent.cloud"
schema_registry_config:
basic.auth.user.info: "${REGISTRY_API_KEY_ID}:${REGISTRY_API_KEY_SECRET}"

sink:
# sink configs

If you are trying to add domains to your topics you can use a configuration like below.

source:
type: "kafka"
config:
# ...connection block
domain:
"urn:li:domain:13ae4d85-d955-49fc-8474-9004c663a810":
allow:
- ".*"
"urn:li:domain:d6ec9868-6736-4b1f-8aa6-fee4c5948f17":
deny:
- ".*"

Note that the domain in config above can be either an urn or a domain id (i.e. urn:li:domain:13ae4d85-d955-49fc-8474-9004c663a810 or simply 13ae4d85-d955-49fc-8474-9004c663a810). The Domain should exist in your DataHub instance before ingesting data into the Domain. To create a Domain on DataHub, check out the Domains User Guide.

If you are using a non-default subject naming strategy in the schema registry, such as RecordNameStrategy, the mapping for the topic's key and value schemas to the schema registry subject names should be provided via topic_subject_map as shown in the configuration below.

source:
type: "kafka"
config:
# ...connection block
# Defines the mapping for the key & value schemas associated with a topic & the subject name registered with the
# kafka schema registry.
topic_subject_map:
# Defines both key & value schema for topic 'my_topic_1'
"my_topic_1-key": "io.acryl.Schema1"
"my_topic_1-value": "io.acryl.Schema2"
# Defines only the value schema for topic 'my_topic_2' (the topic doesn't have a key schema).
"my_topic_2-value": "io.acryl.Schema3"

Custom Schema Registry

The Kafka Source uses the schema registry to figure out the schema associated with both key and value for the topic. By default it uses the Confluent's Kafka Schema registry and supports the AVRO and PROTOBUF schema types.

If you're using a custom schema registry, or you are using schema type other than AVRO or PROTOBUF, then you can provide your own custom implementation of the KafkaSchemaRegistryBase class, and implement the get_schema_metadata(topic, platform_urn) method that given a topic name would return object of SchemaMetadata containing schema for that topic. Please refer datahub.ingestion.source.confluent_schema_registry::ConfluentSchemaRegistry for sample implementation of this class.

class KafkaSchemaRegistryBase(ABC):
@abstractmethod
def get_schema_metadata(
self, topic: str, platform_urn: str
) -> Optional[SchemaMetadata]:
pass

The custom schema registry class can be configured using the schema_registry_class config param of the kafka source as shown below.

source:
type: "kafka"
config:
# Set the custom schema registry implementation class
schema_registry_class: "datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry"
# Coordinates
connection:
bootstrap: "broker:9092"
schema_registry_url: http://localhost:8081

OAuth Callback

The OAuth callback function can be set up for both Kafka sources (consumers) and sinks (producers):

  • For sources: config.connection.consumer_config.oauth_cb
  • For sinks: config.connection.producer_config.oauth_cb

You need to specify a Python function reference in the format <python-module>:<function-name>.

For example, in the configuration oauth:create_token, create_token is a function defined in oauth.py, and oauth.py must be accessible in the PYTHONPATH.

Deploying Custom OAuth Callbacks

For Built-in Callbacks (Recommended):

DataHub includes pre-built OAuth callbacks for common use cases:

  • AWS MSK IAM: datahub_actions.utils.kafka_msk_iam:oauth_cb
  • Azure Event Hubs: datahub_actions.utils.kafka_eventhubs_auth:oauth_cb

Important: To use these built-in callbacks, you must install the acryl-datahub-actions package:

pip install acryl-datahub-actions>=1.3.1.2

For Custom OAuth Callbacks:

If you need to implement a custom OAuth callback, you must ensure your Python module is accessible to the DataHub process, e.g. adding it via PYTHONPATH=/path/to/your/module:$PYTHONPATH or pip install my-oauth-package.

Example for Kafka Source:

source:
type: "kafka"
config:
# Set the custom schema registry implementation class
schema_registry_class: "datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry"
# Coordinates
connection:
bootstrap: "broker:9092"
schema_registry_url: http://localhost:8081
consumer_config:
security.protocol: "SASL_PLAINTEXT"
sasl.mechanism: "OAUTHBEARER"
oauth_cb: "oauth:create_token"
# sink configs

Example for Kafka Sink (e.g., MSK IAM authentication):

sink:
type: "datahub-kafka"
config:
connection:
bootstrap: "b-1.msk.us-west-2.amazonaws.com:9098"
schema_registry_url: "http://datahub-gms:8080/schema-registry/api/"
producer_config:
security.protocol: "SASL_SSL"
sasl.mechanism: "OAUTHBEARER"
sasl.oauthbearer.method: "default"
oauth_cb: "datahub_actions.utils.kafka_msk_iam:oauth_cb"

Enriching DataHub metadata with automated meta mapping

note

Meta mapping is currently only available for Avro schemas, and requires that those Avro schemas are pushed to the schema registry.

Avro schemas are permitted to have additional attributes not defined by the specification as arbitrary metadata. A common pattern is to utilize this for business metadata. The Kafka source has the ability to transform this directly into DataHub Owners, Tags and Terms.

Simple tags

If you simply have a list of tags embedded into an Avro schema (either at the top-level or for an individual field), you can use the schema_tags_field config.

Example Avro schema:

{
"name": "sampleRecord",
"type": "record",
"tags": ["tag1", "tag2"],
"fields": [
{
"name": "field_1",
"type": "string",
"tags": ["tag3", "tag4"]
}
]
}

The name of the field containing a list of tags can be configured with the schema_tags_field property:

config:
schema_tags_field: tags
meta mapping

You can also map specific Avro fields into Owners, Tags and Terms using meta mapping.

Example Avro schema:

{
"name": "sampleRecord",
"type": "record",
"owning_team": "@Data-Science",
"data_tier": "Bronze",
"fields": [
{
"name": "field_1",
"type": "string",
"gdpr": {
"pii": true
}
}
]
}

This can be mapped to DataHub metadata with meta_mapping config:

config:
meta_mapping:
owning_team:
match: "^@(.*)"
operation: "add_owner"
config:
owner_type: group
data_tier:
match: "Bronze|Silver|Gold"
operation: "add_term"
config:
term: "{{ $match }}"
field_meta_mapping:
gdpr.pii:
match: true
operation: "add_tag"
config:
tag: "pii"

The underlying implementation is similar to dbt meta mapping, which has more detailed examples that can be used for reference.

Limitations

Module behavior is constrained by source APIs, permissions, and metadata exposed by the platform. Refer to capability notes for unsupported or conditional features.

PROTOBUF Schema Type Limitations

The current implementation of the support for PROTOBUF schema type has the following limitations:

  • Recursive types are not supported.
  • If the schemas of different topics define a type in the same package, the source would raise an exception.

In addition to this, maps are represented as arrays of messages. The following message,

message MessageWithMap {
map<int, string> map_1 = 1;
}

becomes:

message Map1Entry {
int key = 1;
string value = 2/
}
message MessageWithMap {
repeated Map1Entry map_1 = 1;
}

Troubleshooting

If ingestion fails, validate credentials, permissions, connectivity, and scope filters first. Then review ingestion logs for source-specific errors and adjust configuration accordingly.

Code Coordinates

  • Class Name: datahub.ingestion.source.kafka.kafka.KafkaSource
  • Browse on GitHub
Questions?

If you've got any questions on configuring ingestion for Kafka, feel free to ping us on our Slack.

💡 Contributing to this documentation

This page is auto-generated from the underlying source code. To make changes, please edit the relevant source files in the metadata-ingestion directory.

Tip: For quick typo fixes or documentation updates, you can click the ✏️ Edit icon directly in the GitHub UI to open a Pull Request. For larger changes and PR naming conventions, please refer to our Contributing Guide.