|
| 1 | +# coding=utf-8 |
| 2 | +from typing import Dict, Set, Tuple |
| 3 | + |
| 4 | +from .compile import CompileTask |
| 5 | +from .runnable import ManifestTask |
| 6 | +from dbt.exceptions import warn_or_error, ValidationException |
| 7 | +from dbt.adapters.factory import get_adapter |
| 8 | +from dbt.contracts.graph.parsed import ( |
| 9 | + ParsedModelNode, |
| 10 | +) |
| 11 | +from dbt.contracts.project import PruneModelsAction |
| 12 | + |
| 13 | + |
| 14 | +class ManageTask(CompileTask): |
| 15 | + def run(self): |
| 16 | + ManifestTask._runtime_initialize(self) |
| 17 | + models_in_codebase = self.manifest.nodes.keys() |
| 18 | + adapter = get_adapter(self.config) |
| 19 | + |
| 20 | + with adapter.connection_named("master"): |
| 21 | + required_schemas = self.get_model_schemas(adapter, models_in_codebase) |
| 22 | + self.populate_adapter_cache(adapter, required_schemas) |
| 23 | + |
| 24 | + adapter.clear_transaction() |
| 25 | + self._prune_models(adapter) |
| 26 | + |
| 27 | + def _prune_models(self, adapter): |
| 28 | + self._assert_schema_uniqueness() |
| 29 | + |
| 30 | + if len(self.config.managed_schemas) == 0: |
| 31 | + warn_or_error("No schema's configured to manage") |
| 32 | + return |
| 33 | + |
| 34 | + models_in_codebase: Set[Tuple[str, str, str]] = set( |
| 35 | + (n.config.database, n.config.schema, n.config.alias) |
| 36 | + for n in self.manifest.nodes.values() |
| 37 | + if isinstance(n, ParsedModelNode) |
| 38 | + ) |
| 39 | + |
| 40 | + # get default 'database' + 'schema' for active target |
| 41 | + creds = adapter.connections.profile.credentials |
| 42 | + default_database, default_schema = creds.database, creds.schema |
| 43 | + |
| 44 | + for config in self.config.managed_schemas: |
| 45 | + database = config.database or default_database |
| 46 | + schema = config.schema or default_schema |
| 47 | + |
| 48 | + models_in_database: Dict[Tuple[str, str, str], str] = { |
| 49 | + (database, schema, relation.identifier): relation |
| 50 | + for relation in adapter.list_relations(database, schema) |
| 51 | + } |
| 52 | + if len(models_in_database) == 0: |
| 53 | + warn_or_error( |
| 54 | + f"No objects in managed schema '{database}.{schema}'" |
| 55 | + ) |
| 56 | + |
| 57 | + should_act_upon = models_in_database.keys() - models_in_codebase |
| 58 | + |
| 59 | + for (target_database, target_schema, target_identifier) in sorted(should_act_upon): |
| 60 | + target_action = config.prune_models or PruneModelsAction.SKIP |
| 61 | + if target_action == PruneModelsAction.WARN: |
| 62 | + warn_or_error( |
| 63 | + f"Found unused model {target_database}.{target_schema}.{target_identifier}" |
| 64 | + ) |
| 65 | + elif target_action == PruneModelsAction.DROP: |
| 66 | + adapter.drop_relation( |
| 67 | + models_in_database[(target_database, target_schema, target_identifier)] |
| 68 | + ) |
| 69 | + |
| 70 | + def _assert_schema_uniqueness(self): |
| 71 | + schemas = set() |
| 72 | + |
| 73 | + for config in self.config.managed_schemas: |
| 74 | + schema = (config.database, config.schema) |
| 75 | + if schema in schemas: |
| 76 | + raise ValidationException(f"Duplicate schema found: {schema}") |
| 77 | + schemas.add(schema) |
| 78 | + |
| 79 | + def interpret_results(self, results): |
| 80 | + return True |
0 commit comments