SQL Queries
Important Capabilities
| Capability | Status | Notes |
|---|---|---|
| Column-level Lineage | ✅ | Parsed from SQL queries. |
| Table-Level Lineage | ✅ | Parsed from SQL queries. |
This source reads a newline-delimited JSON file containing SQL queries and parses them to generate lineage.
Query File Format
This file should contain one JSON object per line, with the following fields:
- query: string - The SQL query to parse.
- timestamp (optional): number - The timestamp of the query, in seconds since the epoch.
- user (optional): string - The user who ran the query. This user value will be directly converted into a DataHub user urn.
- operation_type (optional): string - Platform-specific operation type, used if the operation type can't be parsed.
- session_id (optional): string - Session identifier for temporary table resolution across queries.
- downstream_tables (optional): string[] - Fallback list of tables that the query writes to, used if the query can't be parsed.
- upstream_tables (optional): string[] - Fallback list of tables the query reads from, used if the query can't be parsed.
Lazy Schema Loading:
- Fetches schemas on-demand during query parsing instead of bulk loading all schemas upfront
- Caches fetched schemas for future lookups to avoid repeated network requests
- Reduces initial startup time and memory usage significantly
- Automatically handles large platforms efficiently without memory issues
Query Processing:
- Loads the entire query file into memory at once
- Processes all queries sequentially before generating metadata work units
- Preserves temp table mappings and lineage relationships to ensure consistent lineage tracking
- Query deduplication is handled automatically by the SQL parsing aggregator
Incremental Lineage
When incremental_lineage is enabled, this source will emit lineage as patches rather than full overwrites.
This allows you to add lineage edges without removing existing ones, which is useful for:
- Gradually building up lineage from multiple sources
- Preserving manually curated lineage
- Avoiding conflicts when multiple ingestion processes target the same datasets
Note: Incremental lineage only applies to UpstreamLineage aspects. Other aspects like queries and usage statistics will still be emitted normally.
Temporary Table Support
For platforms like Athena that don't have native temporary tables, you can use the temp_table_patterns
configuration to specify regex patterns that identify fake temporary tables. This allows the source to
process these tables like other sources that support native temp tables, enabling proper lineage tracking
across temporary table operations.
Example Queries File
{"query": "SELECT x FROM my_table", "timestamp": 1689232738.051, "user": "user_a", "downstream_tables": [], "upstream_tables": ["my_database.my_schema.my_table"]}
{"query": "INSERT INTO my_table VALUES (1, 'a')", "timestamp": 1689232737.669, "user": "user_b", "downstream_tables": ["my_database.my_schema.my_table"], "upstream_tables": []}
Note that this file does not represent a single JSON object, but instead newline-delimited JSON, in which each line is a separate JSON object.
CLI based Ingestion
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
datahub_api: # Only necessary if using a non-DataHub sink, e.g. the file sink
server: http://localhost:8080
timeout_sec: 60
source:
type: sql-queries
config:
platform: "snowflake"
default_db: "SNOWFLAKE"
query_file: "./queries.json"
Config Details
- Options
- Schema
Note that a . is used to denote nested fields in the YAML recipe.
| Field | Description |
|---|---|
platform ✅ string | The platform for which to generate data, e.g. snowflake |
query_file ✅ string | Path to file to ingest |
default_db One of string, null | The default database to use for unqualified table names Default: None |
default_schema One of string, null | The default schema to use for unqualified table names Default: None |
enable_lazy_schema_loading boolean | Enable lazy schema loading for better performance. When enabled, schemas are fetched on-demand instead of bulk loading all schemas upfront, reducing startup time and memory usage. Default: True |
incremental_lineage boolean | When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run. Default: False |
override_dialect One of string, null | The SQL dialect to use when parsing queries. Overrides automatic dialect detection. Default: None |
platform_instance One of string, null | The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details. Default: None |
env string | The environment that all assets produced by this connector belong to Default: PROD |
aws_config One of AwsConnectionConfig, null | AWS configuration for S3 access. Required when query_file is an S3 URI (s3://). Default: None |
aws_config.aws_access_key_id One of string, null | AWS access key ID. Can be auto-detected, see the AWS boto3 docs for details. Default: None |
aws_config.aws_advanced_config object | Advanced AWS configuration options. These are passed directly to botocore.config.Config. |
aws_config.aws_endpoint_url One of string, null | The AWS service endpoint. This is normally constructed automatically, but can be overridden here. Default: None |
aws_config.aws_profile One of string, null | The named profile to use from AWS credentials. Falls back to default profile if not specified and no access keys provided. Profiles are configured in ~/.aws/credentials or ~/.aws/config. Default: None |
aws_config.aws_proxy One of string, null | A set of proxy configs to use with AWS. See the botocore.config docs for details. Default: None |
aws_config.aws_region One of string, null | AWS region code. Default: None |
aws_config.aws_retry_mode Enum | One of: "legacy", "standard", "adaptive" Default: standard |
aws_config.aws_retry_num integer | Number of times to retry failed AWS requests. See the botocore.retry docs for details. Default: 5 |
aws_config.aws_secret_access_key One of string, null | AWS secret access key. Can be auto-detected, see the AWS boto3 docs for details. Default: None |
aws_config.aws_session_token One of string, null | AWS session token. Can be auto-detected, see the AWS boto3 docs for details. Default: None |
aws_config.read_timeout number | The timeout for reading from the connection (in seconds). Default: 60 |
aws_config.aws_role One of string, array, null | AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are the same as boto3's STS.Client.assume_role. Default: None |
aws_config.aws_role.union One of string, AwsAssumeRoleConfig | |
aws_config.aws_role.union.RoleArn ❓ string | ARN of the role to assume. |
aws_config.aws_role.union.ExternalId One of string, null | External ID to use when assuming the role. Default: None |
temp_table_patterns array | Regex patterns for temporary tables to filter in lineage ingestion. Specify regex to match the entire table name. This is useful for platforms like Athena that don't have native temp tables but use naming patterns for fake temp tables. Default: [] |
temp_table_patterns.string string | |
usage BaseUsageConfig | |
usage.bucket_duration Enum | One of: "DAY", "HOUR" |
usage.end_time string(date-time) | Latest date of lineage/usage to consider. Default: Current time in UTC |
usage.format_sql_queries boolean | Whether to format sql queries Default: False |
usage.include_operational_stats boolean | Whether to display operational stats. Default: True |
usage.include_read_operational_stats boolean | Whether to report read operational stats. Experimental. Default: False |
usage.include_top_n_queries boolean | Whether to ingest the top_n_queries. Default: True |
usage.start_time string(date-time) | Earliest date of lineage/usage to consider. Default: Last full day in UTC (or hour, depending on bucket_duration). You can also specify relative time with respect to end_time such as '-7 days' Or '-7d'. Default: None |
usage.top_n_queries integer | Number of top queries to save to each table. Default: 10 |
usage.user_email_pattern AllowDenyPattern | A class to store allow deny regexes |
usage.user_email_pattern.ignoreCase One of boolean, null | Whether to ignore case sensitivity during pattern matching. Default: True |
The JSONSchema for this configuration is inlined below.
{
"$defs": {
"AllowDenyPattern": {
"additionalProperties": false,
"description": "A class to store allow deny regexes",
"properties": {
"allow": {
"default": [
".*"
],
"description": "List of regex patterns to include in ingestion",
"items": {
"type": "string"
},
"title": "Allow",
"type": "array"
},
"deny": {
"default": [],
"description": "List of regex patterns to exclude from ingestion.",
"items": {
"type": "string"
},
"title": "Deny",
"type": "array"
},
"ignoreCase": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": true,
"description": "Whether to ignore case sensitivity during pattern matching.",
"title": "Ignorecase"
}
},
"title": "AllowDenyPattern",
"type": "object"
},
"AwsAssumeRoleConfig": {
"additionalProperties": true,
"properties": {
"RoleArn": {
"description": "ARN of the role to assume.",
"title": "Rolearn",
"type": "string"
},
"ExternalId": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "External ID to use when assuming the role.",
"title": "Externalid"
}
},
"required": [
"RoleArn"
],
"title": "AwsAssumeRoleConfig",
"type": "object"
},
"AwsConnectionConfig": {
"additionalProperties": false,
"description": "Common AWS credentials config.\n\nCurrently used by:\n - Glue source\n - SageMaker source\n - dbt source",
"properties": {
"aws_access_key_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS access key ID. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
"title": "Aws Access Key Id"
},
"aws_secret_access_key": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS secret access key. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
"title": "Aws Secret Access Key"
},
"aws_session_token": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS session token. Can be auto-detected, see [the AWS boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) for details.",
"title": "Aws Session Token"
},
"aws_role": {
"anyOf": [
{
"type": "string"
},
{
"items": {
"anyOf": [
{
"type": "string"
},
{
"$ref": "#/$defs/AwsAssumeRoleConfig"
}
]
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS roles to assume. If using the string format, the role ARN can be specified directly. If using the object format, the role can be specified in the RoleArn field and additional available arguments are the same as [boto3's STS.Client.assume_role](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sts.html?highlight=assume_role#STS.Client.assume_role).",
"title": "Aws Role"
},
"aws_profile": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The [named profile](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-profiles.html) to use from AWS credentials. Falls back to default profile if not specified and no access keys provided. Profiles are configured in ~/.aws/credentials or ~/.aws/config.",
"title": "Aws Profile"
},
"aws_region": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS region code.",
"title": "Aws Region"
},
"aws_endpoint_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The AWS service endpoint. This is normally [constructed automatically](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html), but can be overridden here.",
"title": "Aws Endpoint Url"
},
"aws_proxy": {
"anyOf": [
{
"additionalProperties": {
"type": "string"
},
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"description": "A set of proxy configs to use with AWS. See the [botocore.config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) docs for details.",
"title": "Aws Proxy"
},
"aws_retry_num": {
"default": 5,
"description": "Number of times to retry failed AWS requests. See the [botocore.retry](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html) docs for details.",
"title": "Aws Retry Num",
"type": "integer"
},
"aws_retry_mode": {
"default": "standard",
"description": "Retry mode to use for failed AWS requests. See the [botocore.retry](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html) docs for details.",
"enum": [
"legacy",
"standard",
"adaptive"
],
"title": "Aws Retry Mode",
"type": "string"
},
"read_timeout": {
"default": 60,
"description": "The timeout for reading from the connection (in seconds).",
"title": "Read Timeout",
"type": "number"
},
"aws_advanced_config": {
"additionalProperties": true,
"description": "Advanced AWS configuration options. These are passed directly to [botocore.config.Config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html).",
"title": "Aws Advanced Config",
"type": "object"
}
},
"title": "AwsConnectionConfig",
"type": "object"
},
"BaseUsageConfig": {
"additionalProperties": false,
"properties": {
"bucket_duration": {
"$ref": "#/$defs/BucketDuration",
"default": "DAY",
"description": "Size of the time window to aggregate usage stats."
},
"end_time": {
"description": "Latest date of lineage/usage to consider. Default: Current time in UTC",
"format": "date-time",
"title": "End Time",
"type": "string"
},
"start_time": {
"default": null,
"description": "Earliest date of lineage/usage to consider. Default: Last full day in UTC (or hour, depending on `bucket_duration`). You can also specify relative time with respect to end_time such as '-7 days' Or '-7d'.",
"format": "date-time",
"title": "Start Time",
"type": "string"
},
"top_n_queries": {
"default": 10,
"description": "Number of top queries to save to each table.",
"exclusiveMinimum": 0,
"title": "Top N Queries",
"type": "integer"
},
"user_email_pattern": {
"$ref": "#/$defs/AllowDenyPattern",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"description": "regex patterns for user emails to filter in usage."
},
"include_operational_stats": {
"default": true,
"description": "Whether to display operational stats.",
"title": "Include Operational Stats",
"type": "boolean"
},
"include_read_operational_stats": {
"default": false,
"description": "Whether to report read operational stats. Experimental.",
"title": "Include Read Operational Stats",
"type": "boolean"
},
"format_sql_queries": {
"default": false,
"description": "Whether to format sql queries",
"title": "Format Sql Queries",
"type": "boolean"
},
"include_top_n_queries": {
"default": true,
"description": "Whether to ingest the top_n_queries.",
"title": "Include Top N Queries",
"type": "boolean"
}
},
"title": "BaseUsageConfig",
"type": "object"
},
"BucketDuration": {
"enum": [
"DAY",
"HOUR"
],
"title": "BucketDuration",
"type": "string"
}
},
"additionalProperties": false,
"properties": {
"incremental_lineage": {
"default": false,
"description": "When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
"title": "Incremental Lineage",
"type": "boolean"
},
"env": {
"default": "PROD",
"description": "The environment that all assets produced by this connector belong to",
"title": "Env",
"type": "string"
},
"platform_instance": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://docs.datahub.com/docs/platform-instances/ for more details.",
"title": "Platform Instance"
},
"query_file": {
"description": "Path to file to ingest",
"title": "Query File",
"type": "string"
},
"platform": {
"description": "The platform for which to generate data, e.g. snowflake",
"title": "Platform",
"type": "string"
},
"usage": {
"$ref": "#/$defs/BaseUsageConfig",
"default": {
"bucket_duration": "DAY",
"end_time": "2025-11-05T08:24:36.810110Z",
"start_time": "2025-11-04T00:00:00Z",
"queries_character_limit": 24000,
"top_n_queries": 10,
"user_email_pattern": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"include_operational_stats": true,
"include_read_operational_stats": false,
"format_sql_queries": false,
"include_top_n_queries": true
},
"description": "The usage config to use when generating usage statistics"
},
"default_db": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The default database to use for unqualified table names",
"title": "Default Db"
},
"default_schema": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The default schema to use for unqualified table names",
"title": "Default Schema"
},
"override_dialect": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The SQL dialect to use when parsing queries. Overrides automatic dialect detection.",
"title": "Override Dialect"
},
"temp_table_patterns": {
"default": [],
"description": "Regex patterns for temporary tables to filter in lineage ingestion. Specify regex to match the entire table name. This is useful for platforms like Athena that don't have native temp tables but use naming patterns for fake temp tables.",
"items": {
"type": "string"
},
"title": "Temp Table Patterns",
"type": "array"
},
"enable_lazy_schema_loading": {
"default": true,
"description": "Enable lazy schema loading for better performance. When enabled, schemas are fetched on-demand instead of bulk loading all schemas upfront, reducing startup time and memory usage.",
"title": "Enable Lazy Schema Loading",
"type": "boolean"
},
"aws_config": {
"anyOf": [
{
"$ref": "#/$defs/AwsConnectionConfig"
},
{
"type": "null"
}
],
"default": null,
"description": "AWS configuration for S3 access. Required when query_file is an S3 URI (s3://)."
}
},
"required": [
"query_file",
"platform"
],
"title": "SqlQueriesSourceConfig",
"type": "object"
}
Code Coordinates
- Class Name:
datahub.ingestion.source.sql_queries.SqlQueriesSource - Browse on GitHub
Questions
If you've got any questions on configuring ingestion for SQL Queries, feel free to ping us on our Slack.