@@ -387,6 +387,7 @@ def pre_ingest_process(
387387 target : Optional [str ] = None ,
388388 collection_id : Optional [str ] = None ,
389389 metadata : Optional [IngestionMetadata ] = None ,
390+ method : Optional [str ] = None ,
390391 ) -> Tuple [List [Dict ], Optional [IngestionMetadata ]]:
391392 """
392393 Do some processing on the ingestion payload before passing it to the
@@ -403,6 +404,7 @@ def pre_ingest_process(self,
403404 target: Optional[str],
404405 collection_id: Optional[str],
405406 metadata: Optional[IngestionMetadata],
407+ method: Optional[str],
406408 ) -> Tuple[List[Dict], Optional[IngestionMetadata]]:
407409 # Ensure all values in the payload are strings
408410 processed_payload = \
@@ -455,6 +457,7 @@ def pre_ingest_process(self,
455457 target: Optional[str],
456458 collection_id: Optional[str],
457459 metadata: Optional[IngestionMetadata],
460+ method: Optional[str],
458461 ) -> Tuple[List[Dict], Optional[IngestionMetadata]]:
459462 if metadata:
460463 metadata[IIngesterPlugin.UPDATED_DYNAMIC_PARAMS] = {
@@ -468,6 +471,23 @@ def pre_ingest_process(self,
468471 return payload, metadata
469472 NOTE: A read-only parameter. Whatever modifications are done to
470473 this object, once returned, it is treated as a new object.
474+ :param method: string
475+ (Optional) A string indicating what ingestion method will be used to
476+ ingest the passed payload. This identifier can be used by the
477+ pre-processor in case some special behavior is necessary for specific
478+ ingestion methods. For example:
479+ .. code-block:: python
480+ def pre_ingest_process(self,
481+ payload: List[dict],
482+ destination_table: Optional[str],
483+ target: Optional[str],
484+ collection_id: Optional[str],
485+ metadata: Optional[IngestionMetadata],
486+ method: Optional[str],
487+ ) -> Tuple[List[Dict], Optional[IngestionMetadata]]:
488+ if method == "http":
489+ # do something only for http ingestion
490+ return payload, metadata
471491 :return: Tuple[List[Dict], Optional[IngestionMetadata]], a tuple
472492 containing the processed payload objects and an
473493 IngestionMetadata object with ingestion metadata information.
@@ -491,6 +511,7 @@ def post_ingest_process(
491511 collection_id : Optional [str ] = None ,
492512 metadata : Optional [IngestionMetadata ] = None ,
493513 exception : Optional [Exception ] = None ,
514+ method : Optional [str ] = None ,
494515 ) -> Optional [IngestionMetadata ]:
495516 """
496517 Do post-ingestion processing of the ingestion payload
@@ -508,6 +529,7 @@ def post_ingest_process(
508529 collection_id: Optional[str],
509530 metadata: Optional[IngestionMetadata],
510531 exception: Optional[Exception],
532+ method: Optional[str],
511533 ) -> Optional[IngestionMetadata]:
512534
513535 # Prepare telemetry
@@ -567,6 +589,11 @@ def post_ingest_process(
567589 A caught exception (if any) encountered while ingesting data.
568590 NOTE: A read-only parameter. Whatever modifications are done to
569591 this object, once returned, it is treated as a new object.
592+ :param method: string
593+ (Optional) A string indicating what ingestion method was used to
594+ ingest the passed payload. This identifier can be used by the
595+ post-processor in case some special behavior is necessary for specific
596+ ingestion methods.
570597 :return: Optional[IngestionMetadata], an IngestionMetadata object
571598 with information about this and possibly the previous processes (
572599 pre-ingestion, ingestion, post-ingestion).
0 commit comments