Skip to content

Commit e35af2f

Browse files
Fix Sonar issues: cognitive complexity, params, dup, test smells
- legacy_executor: extract _run_pipeline_answer_step helper to drop _handle_structure_pipeline cognitive complexity from 18 to under 15 - legacy_executor: bundle 9 prompt-run scalars into a prompt_run_args dict so _run_line_item_extraction has 8 params (was 15, limit 13) - legacy_executor: merge implicitly concatenated log string - structure_tool_task: extract _write_pipeline_outputs helper used by both _execute_structure_tool_impl and _run_agentic_extraction to remove the duplicated INFILE / COPY_TO_FOLDER write block (fixes the 6.1% duplication on new code) - test_context_retrieval_metrics: use pytest.approx for float compare, drop unused executor local, drop always-true if is_single_pass Co-Authored-By: Claude Opus 4.6 <[email protected]>
1 parent 095c7d1 commit e35af2f

File tree

3 files changed

+161
-109
lines changed

3 files changed

+161
-109
lines changed

workers/executor/executors/legacy_executor.py

Lines changed: 76 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -574,23 +574,57 @@ def _handle_structure_pipeline(self, context: ExecutionContext) -> ExecutionResu
574574
input_file_path=input_file_path,
575575
)
576576

577-
# ---- Step 4b: Force full-context retrieval for single pass ----
578-
# Single pass reads the whole file in one LLM call; force
579-
# chunk-size=0 so the fallback path (no cloud plugin) uses
580-
# retrieve_complete_context instead of vector DB retrieval.
577+
# ---- Step 5: Answer prompt / Single pass ----
578+
answer_result = self._run_pipeline_answer_step(
579+
context=context,
580+
answer_params=answer_params,
581+
is_single_pass=is_single_pass,
582+
shim=shim,
583+
step=step,
584+
)
585+
if not answer_result.success:
586+
return answer_result
587+
588+
# ---- Step 6: Merge results ----
589+
structured_output = answer_result.data
590+
self._finalize_pipeline_result(
591+
structured_output=structured_output,
592+
source_file_name=source_file_name,
593+
extracted_text=extracted_text,
594+
index_metrics=index_metrics,
595+
)
596+
597+
shim.stream_log("Pipeline completed successfully")
598+
return ExecutionResult(success=True, data=structured_output)
599+
600+
def _run_pipeline_answer_step(
601+
self,
602+
context: ExecutionContext,
603+
answer_params: dict,
604+
is_single_pass: bool,
605+
shim: ExecutorToolShim,
606+
step: int,
607+
) -> ExecutionResult:
608+
"""Run the answer-prompt step of the structure pipeline.
609+
610+
For single pass, forces ``chunk-size=0`` (full-context retrieval)
611+
and dispatches ``_handle_single_pass_extraction``. Otherwise
612+
dispatches ``_handle_answer_prompt``.
613+
"""
581614
if is_single_pass:
615+
# Single pass reads the whole file in one LLM call; force
616+
# chunk-size=0 so the fallback path (no cloud plugin) uses
617+
# retrieve_complete_context instead of vector DB retrieval.
582618
for output in answer_params.get("outputs", []):
583619
output["chunk-size"] = 0
584620
output["chunk-overlap"] = 0
621+
operation = Operation.SINGLE_PASS_EXTRACTION.value
622+
mode_label = "single pass"
623+
else:
624+
operation = Operation.ANSWER_PROMPT.value
625+
mode_label = "prompt"
585626

586-
# ---- Step 5: Answer prompt / Single pass ----
587-
mode_label = "single pass" if is_single_pass else "prompt"
588627
shim.stream_log(f"Pipeline step {step}: Running {mode_label} execution...")
589-
operation = (
590-
Operation.SINGLE_PASS_EXTRACTION.value
591-
if is_single_pass
592-
else Operation.ANSWER_PROMPT.value
593-
)
594628
answer_ctx = ExecutionContext(
595629
executor_name=context.executor_name,
596630
operation=operation,
@@ -602,23 +636,8 @@ def _handle_structure_pipeline(self, context: ExecutionContext) -> ExecutionResu
602636
log_events_id=context.log_events_id,
603637
)
604638
if is_single_pass:
605-
answer_result = self._handle_single_pass_extraction(answer_ctx)
606-
else:
607-
answer_result = self._handle_answer_prompt(answer_ctx)
608-
if not answer_result.success:
609-
return answer_result
610-
611-
# ---- Step 6: Merge results ----
612-
structured_output = answer_result.data
613-
self._finalize_pipeline_result(
614-
structured_output=structured_output,
615-
source_file_name=source_file_name,
616-
extracted_text=extracted_text,
617-
index_metrics=index_metrics,
618-
)
619-
620-
shim.stream_log("Pipeline completed successfully")
621-
return ExecutionResult(success=True, data=structured_output)
639+
return self._handle_single_pass_extraction(answer_ctx)
640+
return self._handle_answer_prompt(answer_ctx)
622641

623642
@staticmethod
624643
def _inject_table_settings(
@@ -1500,15 +1519,17 @@ def _execute_single_prompt(
15001519
structured_output=structured_output,
15011520
metadata=metadata,
15021521
metrics=metrics,
1503-
run_id=run_id,
1504-
execution_id=execution_id,
1505-
execution_source=execution_source,
1506-
platform_api_key=platform_api_key,
1507-
tool_id=tool_id,
1508-
doc_name=doc_name,
1509-
prompt_name=prompt_name,
1510-
file_path=file_path,
1511-
tool_settings=tool_settings,
1522+
prompt_run_args={
1523+
"run_id": run_id,
1524+
"execution_id": execution_id,
1525+
"execution_source": execution_source,
1526+
"platform_api_key": platform_api_key,
1527+
"tool_id": tool_id,
1528+
"doc_name": doc_name,
1529+
"prompt_name": prompt_name,
1530+
"file_path": file_path,
1531+
"tool_settings": tool_settings,
1532+
},
15121533
shim=shim,
15131534
)
15141535
return
@@ -1724,20 +1745,19 @@ def _run_line_item_extraction(
17241745
structured_output: dict[str, Any],
17251746
metadata: dict[str, Any],
17261747
metrics: dict[str, Any],
1727-
run_id: str,
1728-
execution_id: str,
1729-
execution_source: str,
1730-
platform_api_key: str,
1731-
tool_id: str,
1732-
doc_name: str,
1733-
prompt_name: str,
1734-
file_path: str,
1735-
tool_settings: dict[str, Any],
1748+
prompt_run_args: dict[str, Any],
17361749
shim: Any,
17371750
) -> None:
1738-
"""Delegate LINE_ITEM prompt to the line_item executor plugin."""
1751+
"""Delegate LINE_ITEM prompt to the line_item executor plugin.
1752+
1753+
``prompt_run_args`` bundles the per-prompt scalars passed from
1754+
``_handle_outputs``: ``run_id``, ``execution_id``,
1755+
``execution_source``, ``platform_api_key``, ``tool_id``,
1756+
``doc_name``, ``prompt_name``, ``file_path``, ``tool_settings``.
1757+
"""
17391758
from executor.executors.constants import PromptServiceConstants as PSKeys
17401759

1760+
prompt_name = prompt_run_args["prompt_name"]
17411761
try:
17421762
line_item_executor = ExecutorRegistry.get("line_item")
17431763
except KeyError:
@@ -1750,20 +1770,20 @@ def _run_line_item_extraction(
17501770
line_item_ctx = ExecutionContext(
17511771
executor_name="line_item",
17521772
operation="line_item_extract",
1753-
run_id=run_id,
1754-
execution_source=execution_source,
1773+
run_id=prompt_run_args["run_id"],
1774+
execution_source=prompt_run_args["execution_source"],
17551775
organization_id=context.organization_id,
17561776
request_id=context.request_id,
17571777
executor_params={
17581778
"llm_adapter_instance_id": output.get(PSKeys.LLM, ""),
1759-
"tool_settings": tool_settings,
1779+
"tool_settings": prompt_run_args["tool_settings"],
17601780
"output": output,
17611781
"prompt": output.get(PSKeys.PROMPTX, ""),
1762-
"file_path": file_path,
1763-
"PLATFORM_SERVICE_API_KEY": platform_api_key,
1764-
"execution_id": execution_id,
1765-
"tool_id": tool_id,
1766-
"file_name": doc_name,
1782+
"file_path": prompt_run_args["file_path"],
1783+
"PLATFORM_SERVICE_API_KEY": prompt_run_args["platform_api_key"],
1784+
"execution_id": prompt_run_args["execution_id"],
1785+
"tool_id": prompt_run_args["tool_id"],
1786+
"file_name": prompt_run_args["doc_name"],
17671787
"prompt_name": prompt_name,
17681788
},
17691789
)
@@ -1953,7 +1973,7 @@ def _inject_context_retrieval_metrics(
19531973
elapsed = round(time.monotonic() - start, 4)
19541974
except Exception:
19551975
logger.warning(
1956-
"Could not measure context_retrieval time for " "single_pass (run_id=%s)",
1976+
"Could not measure context_retrieval time for single_pass (run_id=%s)",
19571977
context.run_id,
19581978
)
19591979
return

workers/file_processing/structure_tool_task.py

Lines changed: 78 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -407,33 +407,19 @@ def _execute_structure_tool_impl(params: dict) -> dict:
407407

408408
# ---- Step 7: Write output files ----
409409
# (metadata/metrics merging already done by executor pipeline)
410-
try:
411-
output_path = Path(output_dir_path) / f"{Path(source_file_name).stem}.json"
412-
logger.info("Writing output to %s", output_path)
413-
fs.json_dump(path=output_path, data=structured_output)
414-
415-
# Overwrite INFILE with JSON output (matches Docker-based tool behavior).
416-
# The destination connector reads from INFILE and checks MIME type —
417-
# if we don't overwrite it, INFILE still has the original PDF.
418-
logger.info("Overwriting INFILE with structured output: %s", input_file_path)
419-
fs.json_dump(path=input_file_path, data=structured_output)
420-
421-
# Write to COPY_TO_FOLDER for FS destinations.
422-
# The old Docker flow created this via ToolExecutor._setup_for_run();
423-
# the destination connector expects output at
424-
# {file_execution_dir}/COPY_TO_FOLDER/{filename}.json
425-
copy_to_folder = str(Path(execution_data_dir) / "COPY_TO_FOLDER")
426-
fs.mkdir(copy_to_folder)
427-
copy_output_path = str(
428-
Path(copy_to_folder) / f"{Path(source_file_name).stem}.json"
429-
)
430-
fs.json_dump(path=copy_output_path, data=structured_output)
431-
logger.info("Output written to COPY_TO_FOLDER: %s", copy_output_path)
432-
433-
logger.info("Output written successfully to workflow storage")
434-
except Exception as e:
435-
logger.error("Failed to write output files: %s", e, exc_info=True)
436-
return ExecutionResult.failure(error=f"Error writing output file: {e}").to_dict()
410+
write_error = _write_pipeline_outputs(
411+
fs=fs,
412+
structured_output=structured_output,
413+
output_dir_path=output_dir_path,
414+
input_file_path=input_file_path,
415+
execution_data_dir=execution_data_dir,
416+
source_file_name=source_file_name,
417+
label="structured",
418+
)
419+
if write_error:
420+
return ExecutionResult.failure(
421+
error=f"Error writing output file: {write_error}"
422+
).to_dict()
437423

438424
# Write tool result + tool_metadata to METADATA.json
439425
# (destination connector reads output_type from tool_metadata)
@@ -620,33 +606,80 @@ def _run_agentic_extraction(
620606
elapsed = time.monotonic() - start_time
621607

622608
# Write output files (matches regular pipeline path)
609+
write_error = _write_pipeline_outputs(
610+
fs=fs,
611+
structured_output=structured_output,
612+
output_dir_path=output_dir_path,
613+
input_file_path=input_file_path,
614+
execution_data_dir=execution_data_dir,
615+
source_file_name=source_file_name,
616+
label="agentic",
617+
)
618+
if write_error:
619+
return ExecutionResult.failure(
620+
error=f"Error writing agentic output: {write_error}"
621+
).to_dict()
622+
623+
# Write tool result + tool_metadata to METADATA.json
624+
_write_tool_result(fs, execution_data_dir, structured_output, elapsed)
625+
626+
return ExecutionResult(success=True, data=structured_output).to_dict()
627+
628+
629+
def _write_pipeline_outputs(
630+
fs: Any,
631+
structured_output: dict,
632+
output_dir_path: str,
633+
input_file_path: str,
634+
execution_data_dir: str,
635+
source_file_name: str,
636+
label: str,
637+
) -> str | None:
638+
"""Write structure-tool / agentic outputs to disk.
639+
640+
Mirrors the old Docker tool's output layout so the destination
641+
connector finds what it expects:
642+
643+
1. ``{output_dir_path}/{stem}.json`` — primary output file.
644+
2. INFILE overwritten with JSON (destination connector reads INFILE
645+
and checks MIME type — without this it still sees the original
646+
PDF).
647+
3. ``{execution_data_dir}/COPY_TO_FOLDER/{stem}.json`` — what the
648+
old ``ToolExecutor._setup_for_run()`` created for FS destinations.
649+
650+
Args:
651+
label: Short label for log lines (``"structured"`` or
652+
``"agentic"``).
653+
654+
Returns:
655+
``None`` on success, or the error string on failure.
656+
"""
623657
try:
624-
output_path = Path(output_dir_path) / f"{Path(source_file_name).stem}.json"
625-
logger.info("Writing agentic output to %s", output_path)
658+
stem = Path(source_file_name).stem
659+
output_path = Path(output_dir_path) / f"{stem}.json"
660+
logger.info("Writing %s output to %s", label, output_path)
626661
fs.json_dump(path=output_path, data=structured_output)
627662

628-
# Overwrite INFILE with JSON output so destination connector reads JSON, not PDF
629-
logger.info("Overwriting INFILE with agentic output: %s", input_file_path)
663+
logger.info(
664+
"Overwriting INFILE with %s output: %s", label, input_file_path
665+
)
630666
fs.json_dump(path=input_file_path, data=structured_output)
631667

632-
# Write to COPY_TO_FOLDER for FS destinations (same as regular pipeline)
633668
copy_to_folder = str(Path(execution_data_dir) / "COPY_TO_FOLDER")
634669
fs.mkdir(copy_to_folder)
635-
copy_output_path = str(
636-
Path(copy_to_folder) / f"{Path(source_file_name).stem}.json"
637-
)
670+
copy_output_path = str(Path(copy_to_folder) / f"{stem}.json")
638671
fs.json_dump(path=copy_output_path, data=structured_output)
639-
logger.info("Agentic output written to COPY_TO_FOLDER: %s", copy_output_path)
640-
except Exception as e:
641-
logger.error("Failed to write agentic output files: %s", e, exc_info=True)
642-
return ExecutionResult.failure(
643-
error=f"Error writing agentic output: {e}"
644-
).to_dict()
645-
646-
# Write tool result + tool_metadata to METADATA.json
647-
_write_tool_result(fs, execution_data_dir, structured_output, elapsed)
672+
logger.info(
673+
"%s output written to COPY_TO_FOLDER: %s",
674+
label.capitalize(),
675+
copy_output_path,
676+
)
648677

649-
return ExecutionResult(success=True, data=structured_output).to_dict()
678+
logger.info("Output written successfully to workflow storage")
679+
return None
680+
except Exception as e:
681+
logger.error("Failed to write %s output files: %s", label, e, exc_info=True)
682+
return str(e)
650683

651684

652685
def _write_tool_result(

workers/tests/test_context_retrieval_metrics.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ def test_preserves_existing_context_retrieval(self, mock_fs):
111111
executor._inject_context_retrieval_metrics(result, ctx)
112112

113113
# field_a's existing timing preserved
114-
assert result.data["metrics"]["field_a"]["context_retrieval"]["time_taken(s)"] == 0.999
114+
assert result.data["metrics"]["field_a"]["context_retrieval"][
115+
"time_taken(s)"
116+
] == pytest.approx(0.999)
115117
# field_b gets new timing
116118
assert "context_retrieval" in result.data["metrics"]["field_b"]
117119

@@ -253,8 +255,6 @@ def test_single_pass_forces_chunk_size_zero(self, mock_fs):
253255
fs.exists.return_value = False
254256
mock_fs.return_value = fs
255257

256-
executor = _get_executor()
257-
258258
# Build minimal answer_params with non-zero chunk-size
259259
outputs = [
260260
{
@@ -299,11 +299,10 @@ def test_single_pass_forces_chunk_size_zero(self, mock_fs):
299299
}
300300

301301
# Apply the same logic as _handle_structure_pipeline step 4b
302-
is_single_pass = True
303-
if is_single_pass:
304-
for output in answer_params.get("outputs", []):
305-
output["chunk-size"] = 0
306-
output["chunk-overlap"] = 0
302+
# (single pass forces chunk-size=0 to use full-context retrieval)
303+
for output in answer_params.get("outputs", []):
304+
output["chunk-size"] = 0
305+
output["chunk-overlap"] = 0
307306

308307
# Verify outputs were modified
309308
for output in answer_params["outputs"]:

0 commit comments

Comments
 (0)