Implement SharePoint data extraction sources and configuration classes#661
Implement SharePoint data extraction sources and configuration classes#661WangCHEN9 wants to merge 20 commits intodlt-hub:masterfrom
Conversation
|
|
…ion definitions in SharePoint modules
…tFilesConfig classes
|
Hi @WangCHEN9, thanks for the contribution! Could add the tests you have written for the source? btw, please rebase on master, some import failures were fixed which were causing ci errors |
- Updated `pyproject.toml` to include SharePoint dependencies. - Created `README.md` for SharePoint source documentation. - Implemented SharePoint client in `helpers.py` for API interaction. - Added configuration classes in `sharepoint_files_config.py` for lists and files. - Developed extraction functions in `__init__.py` for SharePoint lists and files. - Created unit tests for SharePoint source in `test_sharepoint_source.py`. - Added requirements file for SharePoint source dependencies.
…epointFilesSource tests for clarity
Added tests |
There was a problem hiding this comment.
It works well, but a few improvements are needed imo 🙂. Mainly around pagination (I know it was marked as a todo, but it should be fairly easy to add here), some edge cases that could cause runtime errors, and simplifying parts to better follow dlt conventions
If you have any questions, I’m happy to discuss further here or in the community Slack
sources/sharepoint/__init__.py
Outdated
| def get_files( | ||
| sharepoint_files_config: SharepointFilesConfig, | ||
| last_update_timestamp: dlt.sources.incremental = dlt.sources.incremental( | ||
| cursor_path="lastModifiedDateTime", | ||
| initial_value="2020-01-01T00:00:00Z", | ||
| primary_key=(), | ||
| ), | ||
| ): | ||
| current_last_value = last_update_timestamp.last_value | ||
| logger.debug(f"current_last_value: {current_last_value}") | ||
| for file_item in client.get_files_from_path( | ||
| folder_path=sharepoint_files_config.folder_path, | ||
| file_name_startswith=sharepoint_files_config.file_name_startswith, | ||
| pattern=sharepoint_files_config.pattern, | ||
| ): | ||
| logger.debug( | ||
| "filtering files based on lastModifiedDateTime, compare to last_value:" | ||
| f" {current_last_value}" | ||
| ) | ||
| if ( | ||
| file_item["lastModifiedDateTime"] > current_last_value | ||
| or not sharepoint_files_config.is_file_incremental | ||
| ): | ||
| logger.info( | ||
| f"Processing file after lastModifiedDateTime filter: {file_item['name']}" |
There was a problem hiding this comment.
wondering what the reason for doing manual incremental filtering here is - I'm thinking something like this is more idiomatic and readable:
@dlt.source(name="sharepoint_files", max_table_nesting=0)
def sharepoint_files(
folder_path: str = dlt.config.value,
table_name: str = dlt.config.value,
file_type: str = dlt.config.value,
file_name_prefix: Optional[str] = dlt.config.value,
file_name_pattern: Optional[str] = dlt.config.value,
pandas_kwargs: Optional[Dict] = dlt.config.value,
credentials: SharepointCredentials = dlt.secrets.value,
):
folder_path = validate_folder_path(folder_path)
file_type_enum = FileType(file_type)
client = SharepointClient(**credentials)
client.connect()
@dlt.resource(name=f"{table_name}__files", selected=False)
def file_items(
last_modified: dlt.sources.incremental[str] = dlt.sources.incremental(
cursor_path="lastModifiedDateTime",
initial_value="1970-01-01T00:00:00Z",
),
):
for item in client.get_files_from_path(
folder_path=folder_path,
file_name_startswith=file_name_prefix or "",
pattern=file_name_pattern,
):
yield item
@dlt.transformer(name=table_name)
def file_data(file_item: Dict):
file_io = client.get_file_bytes_io(file_item)
pd_func = file_type_enum.get_pd_function()
kwargs = pandas_kwargs or {}
chunksize = kwargs.get("chunksize")
if chunksize and file_type_enum == FileType.CSV:
with pd_func(file_io, **kwargs) as reader:
for chunk in reader:
yield chunk
else:
clean_kwargs = {k: v for k, v in kwargs.items() if k != "chunksize"}
yield pd_func(file_io, **clean_kwargs)
return file_items | file_dataThere was a problem hiding this comment.
same goes for the sharepoint_list source
@dlt.source(name="sharepoint_list", max_table_nesting=0)
def sharepoint_list(
list_title: str = dlt.config.value,
table_name: str = dlt.config.value,
select: Optional[str] = dlt.config.value,
credentials: SharepointCredentials = dlt.secrets.value,
):
client = SharepointClient(**credentials)
client.connect()
@dlt.resource(name=table_name)
def list_items():
data = client.get_items_from_list(
list_title=list_title,
select=select,
)
yield from data
return list_itemsThere was a problem hiding this comment.
wondering what the reason for doing manual incremental filtering here is - I'm thinking something like this is more idiomatic and readable:
@dlt.source(name="sharepoint_files", max_table_nesting=0) def sharepoint_files( folder_path: str = dlt.config.value, table_name: str = dlt.config.value, file_type: str = dlt.config.value, file_name_prefix: Optional[str] = dlt.config.value, file_name_pattern: Optional[str] = dlt.config.value, pandas_kwargs: Optional[Dict] = dlt.config.value, credentials: SharepointCredentials = dlt.secrets.value, ): folder_path = validate_folder_path(folder_path) file_type_enum = FileType(file_type) client = SharepointClient(**credentials) client.connect() @dlt.resource(name=f"{table_name}__files", selected=False) def file_items( last_modified: dlt.sources.incremental[str] = dlt.sources.incremental( cursor_path="lastModifiedDateTime", initial_value="1970-01-01T00:00:00Z", ), ): for item in client.get_files_from_path( folder_path=folder_path, file_name_startswith=file_name_prefix or "", pattern=file_name_pattern, ): yield item @dlt.transformer(name=table_name) def file_data(file_item: Dict): file_io = client.get_file_bytes_io(file_item) pd_func = file_type_enum.get_pd_function() kwargs = pandas_kwargs or {} chunksize = kwargs.get("chunksize") if chunksize and file_type_enum == FileType.CSV: with pd_func(file_io, **kwargs) as reader: for chunk in reader: yield chunk else: clean_kwargs = {k: v for k, v in kwargs.items() if k != "chunksize"} yield pd_func(file_io, **clean_kwargs) return file_items | file_data
I used this approach initially, but it caused the job to run in incremental mode every time, and I could not identify the root cause. Therefore, I switched to applying a manual filter, as shown here.
set is_file_incremental=False is pretty useful for lots of use cases
sources/sharepoint/helpers.py
Outdated
| # TODO, pagination not yet implemented | ||
| logger.warning( | ||
| "Pagination is not implemented for get_items_from_list, " | ||
| "it will return only first page of items." | ||
| ) | ||
| all_lists = self.get_all_lists_in_site() |
There was a problem hiding this comment.
since the client has the paginator already defined, I think we can just go with something like
def get_items_from_list(self, list_title: str, select: Optional[str] = None) -> Iterator[Dict]:
all_lists = self.get_all_lists_in_site()
possible_list_titles = [x["displayName"] for x in all_lists]
if list_title not in possible_list_titles:
raise ValueError(
f"List with title '{list_title}' not found in site {self.site_id}. "
f"Available lists: {possible_list_titles}"
)
# Get the list ID
list_id = next(
x["id"] for x in all_lists if x["displayName"] == list_title
)
url = f"{self.graph_site_url}/lists/{list_id}/items?expand=fields"
if select:
url += f"(select={select})"
# Paginate through all results
item_count = 0
for page in self.client.paginate(url):
page.raise_for_status()
for item in page.json().get("value", []):
item_count += 1
yield item.get("fields", {})
logger.info(f"Got {item_count} items from list: {list_title}")wdyt?
There was a problem hiding this comment.
I am not very familiar with self.client.paginate, but I have implemented pagination based on the API documentation.
…d configuration files
…ntFilesConfig tests
|
Hi, did an update. And addressed most of the points. |
anuunchin
left a comment
There was a problem hiding this comment.
Thanks for the updates! If you'd prefer, I'm happy to take this over to implement the remaining changes and expedite the merge. Let me know (you’ll definitely be credited for the huge work you’ve already done)
| logger.warning(f"No lists found in {url}") | ||
| return filtered_lists | ||
|
|
||
| def get_items_from_list( |
There was a problem hiding this comment.
Nice that you included the pagination here, I still see a good opportunity to make use of the rest client's built-in pagination functionality, while keeping the logic:
def get_items_from_list(
self, list_title: str, select: Optional[str] = None
) -> Iterator[Dict[str, Any]]:
list_id = self._get_list_id(list_title)
url = f"/lists/{list_id}/items"
params = {"$expand": "fields"}
if select:
params["$expand"] = f"fields($select={select})"
for page in self.client.paginate(url, params=params):
for item in page.json().get("value", []):
yield item.get("fields", {})
def _get_list_id(self, list_title: str) -> str:
all_lists = self.get_all_lists_in_site()
for lst in all_lists:
if lst["displayName"] == list_title:
return lst["id"]
available = [x["displayName"] for x in all_lists]
raise ValueError(
f"List with title '{list_title}' not found in site {self.site_id}. "
f"Available lists: {possible_list_titles}"
)PS: separate helper function for getting the list id from title for more readability
| client.connect() | ||
| logger.info(f"Connected to SharePoint site: {client.site_info}") | ||
|
|
||
| def get_pipe(sharepoint_list_config: SharepointListConfig): # type: ignore[no-untyped-def] |
There was a problem hiding this comment.
Let's flatten the structure here so that it's more readable:
@dlt.source(name="sharepoint_list", max_table_nesting=0)
def sharepoint_list(
sharepoint_list_config: SharepointListConfig,
credentials: SharepointCredentials = dlt.secrets.value,
) -> Iterable[DltResource]:
client = SharepointClient()
client.connect()
logger.info(f"Connected to SharePoint site: {client.site_info}")
@dlt.resource(name=sharepoint_list_config.table_name)
def get_records() -> Iterator[TDataItems]:
yield from client.get_items_from_list(
list_title=sharepoint_list_config.list_title,
select=sharepoint_list_config.select,
)
yield get_recordsThere was a problem hiding this comment.
This will also make the type ignores unnecessary, which should be generally avoided
| f"No items found in list: {list_title}, with select: {select}" | ||
| ) | ||
|
|
||
| def get_files_from_path( |
There was a problem hiding this comment.
Based on the docs, it appears to be a paginated endpoint, therefore suggesting:
def get_files_from_path(
self, folder_path: str, file_name_startswith: str, pattern: Optional[str] = None
) -> Iterator[Dict[str, Any]]:
folder_url = (
f"/drive/root:/{folder_path}:/children"
f"?$filter=startswith(name, '{file_name_startswith}')"
)
logger.debug(f"Getting files from: {folder_url}")
compiled_pattern = re.compile(pattern) if pattern else None
for page in self.client.paginate(folder_url):
for item in page.json().get("value", []):
if "file" not in item:
continue
if compiled_pattern and not compiled_pattern.search(item["name"]):
continue
yield itemOn that note, I don't think we should have the filter param enabled here as it doesn't seem to be reliably supported + also the docs page doesn't include the filter parameter either
There was a problem hiding this comment.
Or, alternatively, let's document that it may be unreliable
| yield get_pipe(sharepoint_list_config=sharepoint_list_config) | ||
|
|
||
|
|
||
| @dlt.source(name="sharepoint_files", max_table_nesting=0) |
There was a problem hiding this comment.
To continue the conversation about manual incremental loading, the motivation behind is_file_incremental=False is valid, but that's what dlt's write_disposition is for (which is also controllable from configs), and I don't see the need to add another layer of logic with the same functionality
There was a problem hiding this comment.
FYI, I'm also intending to add additional tests after the PR is merged (with real creds on our end)
| class SharepointFilesConfig(BaseModel): | ||
| """Configuration for SharePoint file extraction and processing. | ||
|
|
||
| Attributes: | ||
| file_type: Type of files to process (CSV, Excel, etc.) | ||
| folder_path: Path to the SharePoint folder containing files | ||
| table_name: Name of the destination table for file data | ||
| file_name_startswith: Prefix filter for file names | ||
| pattern: Optional regex pattern for additional file filtering | ||
| pandas_kwargs: Additional arguments to pass to pandas read function | ||
| is_file_incremental: Enable incremental loading based on file modification time | ||
|
|
||
| Note: | ||
| The pattern attribute is automatically prefixed with file_name_startswith. | ||
| Folder paths are validated and normalized during initialization. | ||
| """ | ||
|
|
||
| file_type: FileType | ||
| folder_path: str | ||
| table_name: str | ||
| file_name_startswith: str | ||
| pattern: Optional[str] = ".*" | ||
| pandas_kwargs: Dict[str, Any] = {} | ||
| is_file_incremental: bool = False | ||
|
|
||
| def __init__(self, **data: Any) -> None: | ||
| super().__init__(**data) | ||
| self.folder_path = validate_folder_path(self.folder_path) | ||
| self.pattern = f"^{self.file_name_startswith}{self.pattern}" |
There was a problem hiding this comment.
This would become
@configspec
class SharepointFilesConfig(BaseConfiguration):
file_type: FileType = None
folder_path: str = None
table_name: str = None
file_name_startswith: str = ""
pattern: Optional[str] = ".*"
pandas_kwargs: Dict[str, Any] = field(default_factory=dict)
def on_resolved(self) -> None:
"""Called after config resolution - validate and normalize."""
self.folder_path = validate_folder_path(self.folder_path)
self.pattern = f"^{self.file_name_startswith}{self.pattern}"There was a problem hiding this comment.
The usage of BaseConfiguration will allow:
import dlt
from sharepoint import sharepoint_list, sharepoint_files
if __name__ == "__main__":
pipeline = dlt.pipeline(
pipeline_name="sharepoint_to_duckdb",
destination="duckdb",
dataset_name="sharepoint_data",
)
# Load SharePoint List
# Credentials auto-loaded from .dlt/secrets.toml
# Config auto-loaded from .dlt/config.toml
print("Loading SharePoint List data...")
list_load_info = pipeline.run(sharepoint_list())
print(list_load_info)
# Load SharePoint Files
print("Loading SharePoint Files data...")
files_load_info = pipeline.run(sharepoint_files())
print(files_load_info)There was a problem hiding this comment.
ie, let's not imply hard coding credentials in the pipeline script
|
|
||
|
|
||
| @configspec | ||
| class SharepointCredentials(BaseConfiguration): |
There was a problem hiding this comment.
CredentialsConfiguration is more appropriate imo, you can look into other sources for usage examples (salesforce)
There was a problem hiding this comment.
The branch needs to be rebased on master, formatted with make format, and checked with the linter using make lint. I know I mentioned this earlier, but it’s still needed, so I wanted to remind again:
- One file is not formatted
- The linter is picking up an error that has already been resolved in master
Yes, please take it over ! |
Introduce classes and functions for extracting data from SharePoint lists and files, including configuration for file types and paths. This implementation supports both list and file data extraction into a DuckDB pipeline.