Conversation
| from prefect.blocks.system import Secret | ||
| import os | ||
| import dlt | ||
| import copy |
There was a problem hiding this comment.
its good practice to order imports from most general to specific and also group them
python standards, at the top, other libraries, imports from other files should be at the bottom
| import dlt | ||
| import copy | ||
| from datetime import datetime, timedelta, timezone | ||
| from prefect import flow, task |
There was a problem hiding this comment.
the user must install this prefect. so you should add this into the requireemtns.txt file or into the readme
There was a problem hiding this comment.
Yup, will do. Along with all the CLI commands the audience would need for the entire course.
| import example_pipeline,copy | ||
| # Copy pipeline config so modifications don’t leak between tasks | ||
| cfg = copy.deepcopy(example_pipeline.config) | ||
| config = copy.deepcopy(example_pipeline.config) |
There was a problem hiding this comment.
why are you copying the pipeline config and passing it to the source?
There was a problem hiding this comment.
ah i see. this is misleading name. the file example_pipeline.py actually defines your source.
suggestions:
in the file: rename github_source -> source
rename file github_source.py (or github_rest_api) then it would be from github_source import config or github_source.config`
There was a problem hiding this comment.
also, this file should be named prefect_backfill_example.py
spell it out
There was a problem hiding this comment.
- Yeah, these file names were just for testing. I will rename them as you suggested here now and for the course.
- dlt_pipeline.py was something I created to share the rest_api_source with Aashish and Alena to use in their course.
- The reason I'm doing a copy is more for safety reasons since we are sharing the same config across multiple pipelines running concurrently - so that any changes made in one place don’t affect the original configuration or other tasks using it. It's not necessary since all the other pipelines don't share the same resources. We can remove it. Let me know.
There was a problem hiding this comment.
re3:
only deepcopy if you modify and use it in different pipelines. here you load it from the other file, modify it and use it. mind that you don't modify the original config. if another piece of code running on a different thread that runs in parallel is also importing this config it will have its own copy which is the same as the file
| # pick just one resource from your dlt source | ||
| src = rest_api_source(cfg).with_resources(resource_name) | ||
| source = rest_api_source(config).with_resources(resource_name) | ||
| # unique pipeline per resource avoids dlt state clashes |
There was a problem hiding this comment.
thats a useful comment. also add: "creates separate pipeline working directories, isolates task pipelines" or something like that
There was a problem hiding this comment.
Will do - adds more context.
| # Get today's date in UTC and subtract one day to get yesterday's date | ||
| end_dt = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) | ||
| start_dt = end_dt - timedelta(days=1) | ||
| # Get today's datetime in UTC and subtract one day to get yesterday's date |
There was a problem hiding this comment.
actually its a datetime not a date. its always useful to be correct about these differences
There was a problem hiding this comment.
Yup, will change it to datetime.
| res["endpoint"]["incremental"]["end_value"] = end_date | ||
| # pick just one resource from your dlt source | ||
| src = rest_api_source(cfg).with_resources(resource_name) | ||
| source = rest_api_source(config).with_resources(resource_name) |
There was a problem hiding this comment.
this is a bit all over the place:
pipeline_example.py you define the github_source
dlt_pipeline.py also defines it
you then use neither of the above and instead take config and instantiate the rest_api_source with it
I think your files need to be named after what they define, you can include an example way to run the source either commented out, or what's cleaner in a main clause so that it wont be executed if users import the file
here you should then .with_resources, which returns a clone of the original source with just your resource selected:
source = github_source(config).with_resources(resource_name)
There was a problem hiding this comment.
I think we addressed this issue earlier. Will rename and that should clear up the confusion.
few things to make this feel like the rest of our codebase and educational materials:
spell out the names: src -> source, cfg -> config
it makes things easier to read (code is more often read then written so that what its optimized for)