-
Notifications
You must be signed in to change notification settings - Fork 294
transformations - more decoupling #2718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: devel
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for dlt-hub-docs ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
|
||
# TODO: why? don't we prevent empty column schemas above? | ||
all_columns = {**computed_columns, **(columns or {})} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a test for columns merging, and the columns yielded from here overwrite all incoming columns definitions from the decorator, which I think generally makes sense in dlt, but not for the transformations. I would maybe keep this topic open for now until we decide how this should work exactly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
w8: you are overwriting computed_columns
with columns
which come from the decorator. so how are they not overwritten at the end?
@@ -31,7 +33,6 @@ def sqlglot_schema() -> SQLGlotSchema: | |||
QUERY_KNOWN_TABLE_STAR_SELECT = "SELECT * FROM table_1" | |||
QUERY_UNKNOWN_TABLE_STAR_SELECT = "SELECT * FROM table_unknown" | |||
QUERY_ANONYMOUS_SELECT = "SELECT LEN(col_varchar) FROM table_1" | |||
QUERY_GIBBERISH = "%&/ GIBBERISH" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can not happen anymore, as non parseable sql strings are already caught in the dataset.
def inventory_original(dataset: SupportsReadableDataset[Any]) -> Any: | ||
return dataset["inventory"] | ||
|
||
@dlt.transformation(columns={"price": {"precision": 20, "scale": 2}}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the correct behavior, we should allow to set the precision for an existing column here. Or would you rather do this in the function body? I am not sure.
Will be translated to | ||
|
||
```sql | ||
INSERT INTO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the actually generated insert statement. I realized we were not quoting identifiers, so I added this. We are doing 2 subqueries here now, one stemming from the normalization in the extract and one from @anuunchin normalizer work. We can probably still optimize here?
) | ||
elif not add_dlt_load_id and add_dlt_id: | ||
assert ( | ||
normalized_query | ||
== 'SELECT _dlt_subquery."a" AS a, _dlt_subquery."b" AS b, UUID() AS _dlt_id FROM' | ||
== 'SELECT _dlt_subquery."a" AS "a", _dlt_subquery."b" AS "b", UUID() AS "_dlt_id" FROM' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the model item normalizer will now always use quoted identifiers on the outer queries, regardless wether the inner select has quoted identifiers. Since all queries arriving here now go through query normalization in the dataset where quotes are applied, this should be fine acutally.
def _query(self) -> sge.Query: | ||
from dlt.helpers.ibis import duckdb_compiler | ||
|
||
select_query = duckdb_compiler.to_sqlglot(self._ibis_object) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For converting an ibis expression into sqlglot, you always need a compiler which is destination dependent. I think the compiler will fail if you use function that are known not to exist on a specific destination, and certain materializations will also occur in this step. If you have an expression that counts rows, the alias for the result will be created here, which will be CountStar(*)
for duckdb, but which is not a valid identifier for bigquery for example (see comment in query normalizer)
dlt/destinations/dataset/utils.py
Outdated
@@ -144,6 +145,8 @@ def normalize_query( | |||
if len(expanded_path) == 3: | |||
if node.db != expanded_path[0]: | |||
node.set("catalog", sqlglot.to_identifier(expanded_path[0], quoted=False)) | |||
if isinstance(node, sge.Alias): | |||
node.set("alias", naming.normalize_identifier(node.alias)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I make sure aliases that are created by the user or automatically by ibis in the compile step above adhere to the naming convention of the current destination. The alternative would be to use a different compiler for every destination. Normalizing the alias in the ModelNormalizer step is too late, since this code is also used for just accessing the data. Without this change, def test_row_counts
will fail for bigquery because an alias with invalid symbols remains in the query. All other destinations seem to accept this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have reverted this change and am now using the bigquery compiler only for bigquery destinations. Maybe we should be using the correct compiler for each destination type, but it seems duckdb works for all. I am not sure about this one.
…t of unneeded transpiling clean up make_transformation function tests still pending
normalize aliases in normalization step
211cbfc
to
1f5a787
Compare
…uery compiler for bigquery destinations
…-more-decoupling # Conflicts: # docs/website/docs/general-usage/transformations/index.md
848233e
to
28b7361
Compare
# NOTE: We can use the duckdb compiler for all dialects except for bigquery | ||
# as bigquery is more strict about identifier naming and will not accept | ||
# identifiers generated by the duckdb compiler for anonymous columns | ||
destination_dialect = self._dataset.sql_client.capabilities.sqlglot_dialect |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll keep the old notes:
# NOTE: ibis is optimized for reading a real schema from db and pushing back optimized sql
# - it quotes all identifiers, there's no option to get unqoted query
# - it converts full lists of column names back into star
# - it optimizes the query inside, adds meaningless aliases to all tables etc.
for example: I bet that BigQuery problem is quotation problem where "
are not accepted
"Must be an SQL SELECT statement." | ||
) | ||
|
||
return query.sql(dialect=self._dataset.sql_client.capabilities.sqlglot_dialect) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why provided_dialect
is not used here?
|
||
# derived / cached properties | ||
self._opened_sql_client: SqlClientBase[Any] = None | ||
self._columns_schema: TTableSchemaColumns = None | ||
self._qualified_query: sge.Query = None | ||
self._normalized_query: sge.Query = None | ||
self.__qualified_query: sge.Query = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this intended? names will be mangled which is maybe good because we implement __getitem__
to get column names here
@@ -108,6 +108,7 @@ def normalize_query( | |||
sqlglot_schema: SQLGlotSchema, | |||
qualified_query: sge.Query, | |||
sql_client: SqlClientBase[Any], | |||
naming: NamingConvention, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed anymore. and I think it should not be needed here
|
||
# TODO: why? don't we prevent empty column schemas above? | ||
all_columns = {**computed_columns, **(columns or {})} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
w8: you are overwriting computed_columns
with columns
which come from the decorator. so how are they not overwritten at the end?
"For sql transformations all data_types of columns must be known. " | ||
+ "Please run with strict lineage or provide data_type hints " | ||
+ f"for following columns: {unknown_column_types}", | ||
) | ||
yield dlt.mark.with_hints( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is something I wrote in the old PR: could we place all_column
in the SqlModel
? or better: could we place a relation in it? then:
- user is able to use
dlt.mark
on the model (ie. their own custom hints or table name) all_columns = {**computed_columns, **(columns or {})}
is not needed. columns will be applied bycompute_table_schema
inDltResource
- you can use relation/computed_columns in SqlModel to apply to the resource in extractor. Look how we call this method
root_table_schema = resource.compute_table_schema(items, meta)
and
def compute_table_schema(self, item: TDataItem = None, meta: Any = None) -> TTableSchema:
"""Computes the table schema based on hints and column definitions passed during resource creation.
`item` parameter is used to resolve table hints based on data.
`meta` parameter is taken from Pipe and may further specify table name if variant is to be used
"""
so SqlModel will be passed there and you can use it to extract computed hints.
Or you can implement _compute_schema
on ModelExtractor
for fully custom logic
Description
This PR changes the readable dataset to handle sqlglot expressions where possible without intermediary steps where sql strings are generated and re-parsed.