From 9df7e523687882974788063784a0ef5a2165a398 Mon Sep 17 00:00:00 2001 From: Chandrasekharan M Date: Tue, 23 Jun 2026 15:23:31 +0530 Subject: [PATCH 1/4] fix(clone): clone agentic project documents + resolve exported agentic tools Two gaps surfaced cloning agentic ("Agentic Prompt Studio") projects: 1. Documents were dropped. Agentic projects keep their uploads in their own store (agentic/documents/), separate from Prompt Studio prompt-documents. The files phase only iterates the custom_tool remap, so agentic docs were never cloned. AgenticStudioPhase now clones them per project (download raw bytes from source, skip names already on target, upload to the target project), and counts them in the dry-run plan. 2. Exported agentic tools were skipped in workflows. A workflow tool_instance references a registry id; exported agentic projects register under agentic_studio_registry, but ToolInstancePhase resolved tool_id only via prompt_studio_registry. It now falls back to the agentic registry, so the "Agentic tool API" workflow lands with its tool wired. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_011ja9H1rnSXmPUgQtHm8TNS --- src/unstract/clone/client.py | 42 ++++++++ src/unstract/clone/phases/agentic_studio.py | 109 +++++++++++++++++++- src/unstract/clone/phases/tool_instance.py | 9 +- tests/clone/test_agentic_studio_phase.py | 66 ++++++++++++ tests/clone/test_tool_instance_phase.py | 18 ++++ 5 files changed, 240 insertions(+), 4 deletions(-) diff --git a/src/unstract/clone/client.py b/src/unstract/clone/client.py index ed8a70b..65dfc64 100644 --- a/src/unstract/clone/client.py +++ b/src/unstract/clone/client.py @@ -921,3 +921,45 @@ def list_agentic_registries( params["agentic_project"] = agentic_project result = self._request("GET", "agentic-studio-registry/", params=params) return result if isinstance(result, list) else (result or {}).get("results", []) + + def list_agentic_documents(self, project_id: str) -> list[dict[str, Any]]: + """List a project's uploaded documents. Rows carry ``id`` and + ``original_filename``. Agentic docs are a store of their own, distinct + from Prompt Studio ``prompt-document`` rows. + """ + result = self._request( + "GET", "agentic/documents/", params={"project_id": project_id} + ) + return result if isinstance(result, list) else (result or {}).get("results", []) + + def download_agentic_document(self, document_id: str) -> bytes: + """Download an agentic document's original bytes. + + The ``file`` route serves the raw binary (not a JSON envelope), so this + bypasses the JSON-decoding request path. + """ + url = self._url(f"agentic/documents/{document_id}/file/") + logger.debug("GET %s", url) + resp = self._session.get(url, timeout=self.timeout, verify=self.verify) + if not 200 <= resp.status_code < 300: + raise PlatformAPIError( + f"GET agentic/documents/{document_id}/file/ " + f"returned {resp.status_code}", + status_code=resp.status_code, + body=resp.text[:2000], + ) + return resp.content + + def upload_agentic_document( + self, project_id: str, file_name: str, data: bytes, mime_type: str + ) -> dict[str, Any]: + """Upload a document into a target agentic project. + + Creates the ``AgenticDocument`` row; extraction/summary stay a UI step, + as with Prompt Studio uploads. Callers pre-check filenames to avoid + duplicates on re-runs. + """ + files = {"file": (file_name, data, mime_type)} + return self._request( + "POST", f"agentic/projects/{project_id}/documents/upload/", files=files + ) diff --git a/src/unstract/clone/phases/agentic_studio.py b/src/unstract/clone/phases/agentic_studio.py index 6f51311..0f31dc4 100644 --- a/src/unstract/clone/phases/agentic_studio.py +++ b/src/unstract/clone/phases/agentic_studio.py @@ -35,6 +35,7 @@ from __future__ import annotations import logging +import mimetypes import threading from typing import Any @@ -170,6 +171,7 @@ def _clone_project( self._replicate_share(src, name, tgt_project_id, result, lock) self._clone_prompt_versions(name, src_project_id, tgt_project_id, result, lock) self._clone_schemas(name, src_project_id, tgt_project_id, result, lock) + self._clone_documents(name, src_project_id, tgt_project_id, result, lock) self._republish_registry(name, src_project_id, tgt_project_id, result, lock) def _build_project_payload( @@ -422,6 +424,103 @@ def _clone_schemas( with lock: result.created += 1 + # ----- documents ----- + + def _clone_documents( + self, + name: str, + src_project_id: str, + tgt_project_id: str, + result: PhaseResult, + lock: threading.Lock, + ) -> None: + try: + src_docs = self.ctx.source.list_agentic_documents(src_project_id) + except Exception as e: + logger.exception("agentic '%s': document listing failed: %s", name, e) + with lock: + result.failed += 1 + result.errors.append(f"agentic {name} list documents: {e}") + return + if not src_docs: + return + + try: + tgt_docs = self.ctx.target.list_agentic_documents(tgt_project_id) + except Exception as e: + logger.warning( + "agentic '%s': target document listing failed " + "(re-run may duplicate): %s", + name, + e, + ) + tgt_docs = [] + target_names = {d.get("original_filename") for d in tgt_docs} + + for src in src_docs: + file_name = src.get("original_filename") + src_doc_id = src.get("id") + if not file_name or not src_doc_id: + continue + if file_name in target_names: + with lock: + result.skipped += 1 + logger.info( + "agentic '%s': document '%s' already on target — skipping", + name, + file_name, + ) + continue + self._clone_one_document( + name, tgt_project_id, src_doc_id, file_name, result, lock + ) + + def _clone_one_document( + self, + name: str, + tgt_project_id: str, + src_doc_id: str, + file_name: str, + result: PhaseResult, + lock: threading.Lock, + ) -> None: + try: + raw = self.ctx.source.download_agentic_document(src_doc_id) + except Exception as e: + logger.exception( + "agentic '%s': document '%s' download failed: %s", name, file_name, e + ) + with lock: + result.failed += 1 + result.errors.append(f"agentic {name} download {file_name}: {e}") + return + + if len(raw) > self.ctx.options.max_file_size: + with lock: + result.skipped += 1 + result.warnings.append( + f"agentic {name}: document {file_name} exceeds size cap — " + "upload it manually on target" + ) + return + + mime = mimetypes.guess_type(file_name)[0] or "application/pdf" + try: + self.ctx.target.upload_agentic_document( + tgt_project_id, file_name, raw, mime + ) + except Exception as e: + logger.exception( + "agentic '%s': document '%s' upload failed: %s", name, file_name, e + ) + with lock: + result.failed += 1 + result.errors.append(f"agentic {name} upload {file_name}: {e}") + return + with lock: + result.created += 1 + logger.info("agentic '%s': uploaded document '%s'", name, file_name) + # ----- registry ----- def _republish_registry( @@ -507,8 +606,9 @@ def _plan_children( result: PhaseResult, lock: threading.Lock, ) -> None: - """Dry-run: count source prompt versions + schemas as planned and - record planned prompt-version ids so downstream resolves don't miss. + """Dry-run: count source prompt versions + schemas + documents as + planned and record planned prompt-version ids so downstream resolves + don't miss. """ try: src_versions = self.ctx.source.list_agentic_prompt_versions( @@ -522,11 +622,16 @@ def _plan_children( ) except Exception: src_schemas = [] + try: + src_docs = self.ctx.source.list_agentic_documents(src_project_id) + except Exception: + src_docs = [] with lock: for v in src_versions: self.ctx.remap.record_planned("agentic_prompt_version", v["id"]) result.created += 1 result.created += len(src_schemas) + result.created += len(src_docs) # ----- settings ----- diff --git a/src/unstract/clone/phases/tool_instance.py b/src/unstract/clone/phases/tool_instance.py index af20171..1220d43 100644 --- a/src/unstract/clone/phases/tool_instance.py +++ b/src/unstract/clone/phases/tool_instance.py @@ -108,12 +108,17 @@ def _clone_workflow_tools( src_ti_id = src_ti["id"] src_tool_id = src_ti["tool_id"] + # tool_id is a registry id: Prompt Studio tools register under + # prompt_studio_registry, exported agentic projects under + # agentic_studio_registry. Resolve against both. with lock: - tgt_tool_id = self.ctx.remap.resolve("prompt_studio_registry", src_tool_id) + tgt_tool_id = self.ctx.remap.resolve( + "prompt_studio_registry", src_tool_id + ) or self.ctx.remap.resolve("agentic_studio_registry", src_tool_id) if not tgt_tool_id: logger.warning( "skipping tool_instance %s — no registry remap for tool_id %s " - "(custom tool likely unpublished on source)", + "(tool likely unpublished on source)", src_ti_id, src_tool_id, ) diff --git a/tests/clone/test_agentic_studio_phase.py b/tests/clone/test_agentic_studio_phase.py index 4521ef4..e2ac961 100644 --- a/tests/clone/test_agentic_studio_phase.py +++ b/tests/clone/test_agentic_studio_phase.py @@ -39,6 +39,8 @@ def __init__( settings=None, registries=None, users=None, + documents=None, + document_blobs=None, ): self.projects = list(projects or []) self.users = list(users or []) @@ -50,6 +52,10 @@ def __init__( self.settings = list(settings or []) # project_id -> list of registry rows self.registries = {k: list(v) for k, v in (registries or {}).items()} + # project_id -> list of document rows + self.documents = {k: list(v) for k, v in (documents or {}).items()} + # document_id -> raw bytes + self.document_blobs = dict(document_blobs or {}) self.created_projects: list[dict] = [] self.created_versions: list[dict] = [] @@ -57,6 +63,7 @@ def __init__( self.created_settings: list[dict] = [] self.updated_settings: list[tuple[str, dict]] = [] self.exported_projects: list[str] = [] + self.uploaded_documents: list[tuple[str, str, bytes]] = [] self._next_id = 1 def _mint(self, prefix: str) -> str: @@ -141,6 +148,20 @@ def export_agentic_project(self, project_id, *, force=True): def list_agentic_registries(self, *, agentic_project=None): return list(self.registries.get(agentic_project, [])) + # ----- documents ----- + + def list_agentic_documents(self, project_id): + return list(self.documents.get(project_id, [])) + + def download_agentic_document(self, document_id): + return self.document_blobs.get(document_id, b"%PDF-fake") + + def upload_agentic_document(self, project_id, file_name, data, mime_type): + self.uploaded_documents.append((project_id, file_name, data)) + row = {"id": self._mint("tgt-doc"), "original_filename": file_name} + self.documents.setdefault(project_id, []).append(row) + return {"data": [{"document_name": file_name}]} + def _ctx(source, target, *, remap=None, **opt_overrides): return CloneContext( @@ -507,3 +528,48 @@ def test_source_group_share_replicated_via_remap(): assert len(tgt.shared_projects) == 1 _, payload = tgt.shared_projects[0] assert payload["shared_groups"] == [70] + + +def test_documents_cloned_skipping_those_already_on_target(): + src = FakeClient( + projects=[_src_project("src-p", "Receipts")], + documents={ + "src-p": [ + {"id": "d-new", "original_filename": "new.pdf"}, + {"id": "d-dup", "original_filename": "dup.pdf"}, + ] + }, + document_blobs={"d-new": b"%PDF-new", "d-dup": b"%PDF-dup"}, + ) + tgt = FakeClient() + # Adopt an existing target project so the dup doc can be pre-seeded on it. + tgt.projects.append({"id": "tgt-proj-0001", "name": "Receipts"}) + tgt.documents["tgt-proj-0001"] = [{"id": "t-dup", "original_filename": "dup.pdf"}] + ctx = _ctx(src, tgt) + + result = AgenticStudioPhase(ctx).run(CloneReport()) + + assert result.failed == 0 + # 'new.pdf' uploaded, 'dup.pdf' skipped (already present). + assert tgt.uploaded_documents == [("tgt-proj-0001", "new.pdf", b"%PDF-new")] + assert result.skipped == 1 + + +def test_dry_run_counts_documents_without_uploading(): + src = FakeClient( + projects=[_src_project("src-p", "Receipts")], + documents={ + "src-p": [ + {"id": "d1", "original_filename": "a.pdf"}, + {"id": "d2", "original_filename": "b.pdf"}, + ] + }, + ) + tgt = FakeClient() + ctx = _ctx(src, tgt, dry_run=True) + + result = AgenticStudioPhase(ctx).run(CloneReport()) + + assert tgt.uploaded_documents == [] + # 1 project + 2 documents predicted as creates. + assert result.created == 3 diff --git a/tests/clone/test_tool_instance_phase.py b/tests/clone/test_tool_instance_phase.py index a161ab4..b456f6b 100644 --- a/tests/clone/test_tool_instance_phase.py +++ b/tests/clone/test_tool_instance_phase.py @@ -144,6 +144,24 @@ def test_skip_when_registry_remap_missing(): assert tgt.create_calls == [] +def test_resolves_tool_id_via_agentic_registry(): + # An exported agentic project registers under agentic_studio_registry, + # not prompt_studio_registry; its tool_instance must still resolve. + src = FakeClient() + src.instances[SRC_WF] = [_src_ti("src-ti-1", SRC_WF, SRC_REG, {})] + tgt = FakeClient() + remap = RemapTable() + remap.record("workflow", SRC_WF, TGT_WF) + remap.record("agentic_studio_registry", SRC_REG, TGT_REG) + ctx = _ctx(src, tgt, remap=remap) + + result = ToolInstancePhase(ctx).run(CloneReport()) + + assert result.created == 1 + assert result.skipped == 0 + assert tgt.create_calls[0]["tool_id"] == TGT_REG + + def test_adopt_existing_target_instance_and_repatch_metadata(): src = FakeClient() src_meta = {"llm": "My OpenAI"} From 20d27e95efb6c982d1b9480f7eb0a15c87f2f373 Mon Sep 17 00:00:00 2001 From: Chandrasekharan M Date: Tue, 23 Jun 2026 16:32:16 +0530 Subject: [PATCH 2/4] feat(clone): clone agentic verified-data; honour skip strategy for docs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Agentic verified-data ("ground truth manually verified by user") is curated input, not regenerable output, so it must be cloned. AgenticStudioPhase now re-points each source verified-data row to the cloned target document by filename and recreates it (skipping docs absent on target and rows already present). Extracted/comparison data stays uncloned — both regenerate on a re-run + re-verify. Also honour file_strategy="skip" in the new document path, matching the files and lookups phases: under skip, agentic documents are listed and counted as skipped (operator re-uploads), not transferred. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_011ja9H1rnSXmPUgQtHm8TNS --- src/unstract/clone/client.py | 17 ++++ src/unstract/clone/phases/agentic_studio.py | 104 +++++++++++++++++++- tests/clone/test_agentic_studio_phase.py | 101 +++++++++++++++++++ 3 files changed, 221 insertions(+), 1 deletion(-) diff --git a/src/unstract/clone/client.py b/src/unstract/clone/client.py index 65dfc64..3f79499 100644 --- a/src/unstract/clone/client.py +++ b/src/unstract/clone/client.py @@ -963,3 +963,20 @@ def upload_agentic_document( return self._request( "POST", f"agentic/projects/{project_id}/documents/upload/", files=files ) + + def list_agentic_verified_data(self, project_id: str) -> list[dict[str, Any]]: + """List a project's verified (ground-truth) data rows. Each carries + ``document_name``, ``document``, and ``data`` (the curated JSON). + """ + result = self._request( + "GET", "agentic/verified-data/", params={"project_id": project_id} + ) + return result if isinstance(result, list) else (result or {}).get("results", []) + + def create_agentic_verified_data(self, payload: dict[str, Any]) -> dict[str, Any]: + """Create a verified-data row (``project``, ``document``, ``data``). + + One row per (project, document); callers pre-check to avoid a + uniqueness clash on re-runs. + """ + return self._request("POST", "agentic/verified-data/", json=payload) diff --git a/src/unstract/clone/phases/agentic_studio.py b/src/unstract/clone/phases/agentic_studio.py index 0f31dc4..5729bcd 100644 --- a/src/unstract/clone/phases/agentic_studio.py +++ b/src/unstract/clone/phases/agentic_studio.py @@ -22,6 +22,11 @@ per-run ``agentic_prompt_version`` table and the child's ``parent_version`` resolves through it. ``project`` is bound to the target id. - **schemas**: bound to the target ``project`` and created. + - **documents**: source uploads re-uploaded to the target project (skipping + filenames already present); they live in their own store, not the Prompt + Studio document table. + - **verified data**: ground-truth rows re-pointed to the cloned document by + filename. Extracted/comparison data is regenerable and not cloned. 3. **Registry**: if the project has an active schema + prompt, republish its registry entry via the ``export`` action (mirror of custom_tool) and record an ``agentic_studio_registry`` remap. Projects with no source @@ -172,6 +177,7 @@ def _clone_project( self._clone_prompt_versions(name, src_project_id, tgt_project_id, result, lock) self._clone_schemas(name, src_project_id, tgt_project_id, result, lock) self._clone_documents(name, src_project_id, tgt_project_id, result, lock) + self._clone_verified_data(name, src_project_id, tgt_project_id, result, lock) self._republish_registry(name, src_project_id, tgt_project_id, result, lock) def _build_project_payload( @@ -445,6 +451,17 @@ def _clone_documents( if not src_docs: return + # Honour the file strategy: 'skip' means no binary transfer, like the + # files and lookups phases. Operator re-uploads on target. + if self.ctx.options.file_strategy == "skip": + with lock: + result.skipped += len(src_docs) + result.warnings.append( + f"agentic {name}: {len(src_docs)} document(s) not copied " + "(file_strategy=skip) — upload them manually on target" + ) + return + try: tgt_docs = self.ctx.target.list_agentic_documents(tgt_project_id) except Exception as e: @@ -521,6 +538,82 @@ def _clone_one_document( result.created += 1 logger.info("agentic '%s': uploaded document '%s'", name, file_name) + # ----- verified (ground-truth) data ----- + + def _clone_verified_data( + self, + name: str, + src_project_id: str, + tgt_project_id: str, + result: PhaseResult, + lock: threading.Lock, + ) -> None: + try: + src_rows = self.ctx.source.list_agentic_verified_data(src_project_id) + except Exception as e: + logger.exception("agentic '%s': verified-data listing failed: %s", name, e) + with lock: + result.failed += 1 + result.errors.append(f"agentic {name} list verified-data: {e}") + return + if not src_rows: + return + + # Verified data FKs a document; map source rows to target docs by + # filename, the only identity stable across orgs. + try: + tgt_docs = self.ctx.target.list_agentic_documents(tgt_project_id) + tgt_existing = self.ctx.target.list_agentic_verified_data(tgt_project_id) + except Exception as e: + logger.warning( + "agentic '%s': target verified-data lookup failed " + "(re-run may duplicate): %s", + name, + e, + ) + tgt_docs, tgt_existing = [], [] + doc_id_by_name = {d.get("original_filename"): d.get("id") for d in tgt_docs} + verified_doc_ids = {r.get("document") for r in tgt_existing} + + for src in src_rows: + file_name = src.get("document_name") + tgt_doc_id = doc_id_by_name.get(file_name) + if not tgt_doc_id: + with lock: + result.skipped += 1 + result.warnings.append( + f"agentic {name}: verified data for '{file_name}' skipped — " + "document not on target" + ) + continue + if tgt_doc_id in verified_doc_ids: + with lock: + result.skipped += 1 + continue + payload = { + "project": tgt_project_id, + "document": tgt_doc_id, + "data": src.get("data"), + } + try: + self.ctx.target.create_agentic_verified_data(payload) + except Exception as e: + logger.exception( + "agentic '%s': verified data for '%s' create failed: %s", + name, + file_name, + e, + ) + with lock: + result.failed += 1 + result.errors.append( + f"agentic {name} create verified-data {file_name}: {e}" + ) + continue + with lock: + result.created += 1 + logger.info("agentic '%s': cloned verified data for '%s'", name, file_name) + # ----- registry ----- def _republish_registry( @@ -626,12 +719,21 @@ def _plan_children( src_docs = self.ctx.source.list_agentic_documents(src_project_id) except Exception: src_docs = [] + try: + src_verified = self.ctx.source.list_agentic_verified_data(src_project_id) + except Exception: + src_verified = [] with lock: for v in src_versions: self.ctx.remap.record_planned("agentic_prompt_version", v["id"]) result.created += 1 result.created += len(src_schemas) - result.created += len(src_docs) + # Documents move only when the file strategy copies binaries. + if self.ctx.options.file_strategy == "skip": + result.skipped += len(src_docs) + else: + result.created += len(src_docs) + result.created += len(src_verified) # ----- settings ----- diff --git a/tests/clone/test_agentic_studio_phase.py b/tests/clone/test_agentic_studio_phase.py index e2ac961..f0703fb 100644 --- a/tests/clone/test_agentic_studio_phase.py +++ b/tests/clone/test_agentic_studio_phase.py @@ -41,6 +41,7 @@ def __init__( users=None, documents=None, document_blobs=None, + verified_data=None, ): self.projects = list(projects or []) self.users = list(users or []) @@ -56,6 +57,8 @@ def __init__( self.documents = {k: list(v) for k, v in (documents or {}).items()} # document_id -> raw bytes self.document_blobs = dict(document_blobs or {}) + # project_id -> list of verified-data rows + self.verified_data = {k: list(v) for k, v in (verified_data or {}).items()} self.created_projects: list[dict] = [] self.created_versions: list[dict] = [] @@ -64,6 +67,7 @@ def __init__( self.updated_settings: list[tuple[str, dict]] = [] self.exported_projects: list[str] = [] self.uploaded_documents: list[tuple[str, str, bytes]] = [] + self.created_verified_data: list[dict] = [] self._next_id = 1 def _mint(self, prefix: str) -> str: @@ -162,6 +166,20 @@ def upload_agentic_document(self, project_id, file_name, data, mime_type): self.documents.setdefault(project_id, []).append(row) return {"data": [{"document_name": file_name}]} + # ----- verified data ----- + + def list_agentic_verified_data(self, project_id): + return list(self.verified_data.get(project_id, [])) + + def create_agentic_verified_data(self, payload): + new = dict(payload) + new["id"] = self._mint("tgt-vd") + self.created_verified_data.append(new) + self.verified_data.setdefault(payload["project"], []).append( + {"document": payload["document"], "data": payload["data"]} + ) + return new + def _ctx(source, target, *, remap=None, **opt_overrides): return CloneContext( @@ -573,3 +591,86 @@ def test_dry_run_counts_documents_without_uploading(): assert tgt.uploaded_documents == [] # 1 project + 2 documents predicted as creates. assert result.created == 3 + + +def test_verified_data_cloned_mapped_to_target_doc_by_filename(): + src = FakeClient( + projects=[_src_project("src-p", "Receipts")], + documents={"src-p": [{"id": "d1", "original_filename": "report.pdf"}]}, + document_blobs={"d1": b"%PDF-r"}, + verified_data={ + "src-p": [ + {"document_name": "report.pdf", "document": "d1", "data": {"x": 1}}, + # Ground truth for a doc that won't exist on target → skipped. + {"document_name": "gone.pdf", "document": "d9", "data": {"y": 2}}, + ] + }, + ) + tgt = FakeClient() + ctx = _ctx(src, tgt) + + result = AgenticStudioPhase(ctx).run(CloneReport()) + + assert result.failed == 0 + assert len(tgt.created_verified_data) == 1 + row = tgt.created_verified_data[0] + # Re-pointed to the freshly uploaded target document, not the source id. + uploaded_doc_id = tgt.documents[row["project"]][0]["id"] + assert row["document"] == uploaded_doc_id + assert row["data"] == {"x": 1} + # 'gone.pdf' ground truth skipped (no matching target document). + assert any("gone.pdf" in w for w in result.warnings) + + +def test_verified_data_skipped_when_already_on_target(): + src = FakeClient( + projects=[_src_project("src-p", "Receipts")], + documents={"src-p": [{"id": "d1", "original_filename": "report.pdf"}]}, + verified_data={ + "src-p": [ + {"document_name": "report.pdf", "document": "d1", "data": {"x": 1}} + ] + }, + ) + tgt = FakeClient() + # Adopt an existing target project pre-seeded with the doc + its verified row. + tgt.projects.append({"id": "tgt-proj-0001", "name": "Receipts"}) + tgt.documents["tgt-proj-0001"] = [{"id": "t-d1", "original_filename": "report.pdf"}] + tgt.verified_data["tgt-proj-0001"] = [{"document": "t-d1", "data": {"x": 1}}] + ctx = _ctx(src, tgt) + + AgenticStudioPhase(ctx).run(CloneReport()) + + assert tgt.created_verified_data == [] + + +def test_dry_run_counts_verified_data_without_writing(): + src = FakeClient( + projects=[_src_project("src-p", "Receipts")], + verified_data={ + "src-p": [{"document_name": "a.pdf", "document": "d1", "data": {}}] + }, + ) + tgt = FakeClient() + ctx = _ctx(src, tgt, dry_run=True) + + result = AgenticStudioPhase(ctx).run(CloneReport()) + + assert tgt.created_verified_data == [] + # 1 project + 1 verified-data row predicted as creates. + assert result.created == 2 + + +def test_documents_not_copied_under_file_strategy_skip(): + src = FakeClient( + projects=[_src_project("src-p", "Receipts")], + documents={"src-p": [{"id": "d1", "original_filename": "report.pdf"}]}, + document_blobs={"d1": b"%PDF-r"}, + ) + tgt = FakeClient() + ctx = _ctx(src, tgt, file_strategy="skip") + + result = AgenticStudioPhase(ctx).run(CloneReport()) + + assert tgt.uploaded_documents == [] + assert result.skipped == 1 From 22e9d93898be07db8a7db61bb218eebac4551a65 Mon Sep 17 00:00:00 2001 From: Chandrasekharan M Date: Tue, 23 Jun 2026 16:52:15 +0530 Subject: [PATCH 3/4] fix(clone): forecast agentic verified-data as skipped under skip strategy Verified data FKs a document; with file_strategy=skip no docs land on target, so a dry-run must predict the rows as skipped rather than as creates that the real run silently drops. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_011ja9H1rnSXmPUgQtHm8TNS --- src/unstract/clone/phases/agentic_studio.py | 4 +++- tests/clone/test_agentic_studio_phase.py | 23 +++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/src/unstract/clone/phases/agentic_studio.py b/src/unstract/clone/phases/agentic_studio.py index 5729bcd..013aac8 100644 --- a/src/unstract/clone/phases/agentic_studio.py +++ b/src/unstract/clone/phases/agentic_studio.py @@ -729,11 +729,13 @@ def _plan_children( result.created += 1 result.created += len(src_schemas) # Documents move only when the file strategy copies binaries. + # Verified data FKs a document, so skipping files strands it too. if self.ctx.options.file_strategy == "skip": result.skipped += len(src_docs) + result.skipped += len(src_verified) else: result.created += len(src_docs) - result.created += len(src_verified) + result.created += len(src_verified) # ----- settings ----- diff --git a/tests/clone/test_agentic_studio_phase.py b/tests/clone/test_agentic_studio_phase.py index f0703fb..f3eb568 100644 --- a/tests/clone/test_agentic_studio_phase.py +++ b/tests/clone/test_agentic_studio_phase.py @@ -674,3 +674,26 @@ def test_documents_not_copied_under_file_strategy_skip(): assert tgt.uploaded_documents == [] assert result.skipped == 1 + + +def test_dry_run_skip_strategy_counts_docs_and_verified_as_skipped(): + # Verified data FKs a document; under skip no docs land on target, so the + # plan must forecast verified as skipped too — not as un-creatable creates. + src = FakeClient( + projects=[_src_project("src-p", "Receipts")], + documents={"src-p": [{"id": "d1", "original_filename": "report.pdf"}]}, + document_blobs={"d1": b"%PDF-r"}, + verified_data={ + "src-p": [{"document_name": "report.pdf", "document": "d1", "data": {}}] + }, + ) + tgt = FakeClient() + ctx = _ctx(src, tgt, dry_run=True, file_strategy="skip") + + result = AgenticStudioPhase(ctx).run(CloneReport()) + + assert tgt.uploaded_documents == [] + assert tgt.created_verified_data == [] + # Only the project is a forecast create; doc + verified row both skipped. + assert result.created == 1 + assert result.skipped == 2 From e35b564ad54f5f5ecb6a8908da9a5aa945f78b5a Mon Sep 17 00:00:00 2001 From: Chandrasekharan M Date: Tue, 23 Jun 2026 17:39:59 +0530 Subject: [PATCH 4/4] fix(clone): honour skip strategy in _clone_verified_data at runtime _plan_children already forecasts verified-data as skipped under file_strategy=skip, but the runtime path lacked the matching guard: on a re-run where documents reached the target by other means, it would create verified rows the plan said it would skip. Add the early-return guard, mirroring _clone_documents. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_011ja9H1rnSXmPUgQtHm8TNS --- src/unstract/clone/phases/agentic_studio.py | 11 ++++++++++ tests/clone/test_agentic_studio_phase.py | 23 +++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/src/unstract/clone/phases/agentic_studio.py b/src/unstract/clone/phases/agentic_studio.py index 013aac8..1ee0d1a 100644 --- a/src/unstract/clone/phases/agentic_studio.py +++ b/src/unstract/clone/phases/agentic_studio.py @@ -559,6 +559,17 @@ def _clone_verified_data( if not src_rows: return + # Verified data FKs a document; under 'skip' no documents are copied, + # so skip the rows too — matching the plan and _clone_documents. + if self.ctx.options.file_strategy == "skip": + with lock: + result.skipped += len(src_rows) + result.warnings.append( + f"agentic {name}: {len(src_rows)} verified-data row(s) not " + "copied (file_strategy=skip)" + ) + return + # Verified data FKs a document; map source rows to target docs by # filename, the only identity stable across orgs. try: diff --git a/tests/clone/test_agentic_studio_phase.py b/tests/clone/test_agentic_studio_phase.py index f3eb568..e476c11 100644 --- a/tests/clone/test_agentic_studio_phase.py +++ b/tests/clone/test_agentic_studio_phase.py @@ -676,6 +676,29 @@ def test_documents_not_copied_under_file_strategy_skip(): assert result.skipped == 1 +def test_verified_data_not_copied_under_skip_even_if_doc_on_target(): + # Docs reached the target by other means (prior non-skip run / manual + # upload), so doc-by-name maps. Under skip, verified data must still be + # skipped at runtime — matching the plan, not created off the stray doc. + src = FakeClient( + projects=[_src_project("src-p", "Receipts")], + documents={"src-p": [{"id": "d1", "original_filename": "report.pdf"}]}, + document_blobs={"d1": b"%PDF-r"}, + verified_data={ + "src-p": [{"document_name": "report.pdf", "document": "d1", "data": {}}] + }, + ) + tgt = FakeClient() + tgt.projects.append({"id": "tgt-proj-0001", "name": "Receipts"}) + tgt.documents["tgt-proj-0001"] = [{"id": "t-d1", "original_filename": "report.pdf"}] + ctx = _ctx(src, tgt, file_strategy="skip") + + result = AgenticStudioPhase(ctx).run(CloneReport()) + + assert tgt.created_verified_data == [] + assert any("verified-data" in w for w in result.warnings) + + def test_dry_run_skip_strategy_counts_docs_and_verified_as_skipped(): # Verified data FKs a document; under skip no docs land on target, so the # plan must forecast verified as skipped too — not as un-creatable creates.