diff --git a/src/unstract/clone/client.py b/src/unstract/clone/client.py index ed8a70b..3f79499 100644 --- a/src/unstract/clone/client.py +++ b/src/unstract/clone/client.py @@ -921,3 +921,62 @@ def list_agentic_registries( params["agentic_project"] = agentic_project result = self._request("GET", "agentic-studio-registry/", params=params) return result if isinstance(result, list) else (result or {}).get("results", []) + + def list_agentic_documents(self, project_id: str) -> list[dict[str, Any]]: + """List a project's uploaded documents. Rows carry ``id`` and + ``original_filename``. Agentic docs are a store of their own, distinct + from Prompt Studio ``prompt-document`` rows. + """ + result = self._request( + "GET", "agentic/documents/", params={"project_id": project_id} + ) + return result if isinstance(result, list) else (result or {}).get("results", []) + + def download_agentic_document(self, document_id: str) -> bytes: + """Download an agentic document's original bytes. + + The ``file`` route serves the raw binary (not a JSON envelope), so this + bypasses the JSON-decoding request path. + """ + url = self._url(f"agentic/documents/{document_id}/file/") + logger.debug("GET %s", url) + resp = self._session.get(url, timeout=self.timeout, verify=self.verify) + if not 200 <= resp.status_code < 300: + raise PlatformAPIError( + f"GET agentic/documents/{document_id}/file/ " + f"returned {resp.status_code}", + status_code=resp.status_code, + body=resp.text[:2000], + ) + return resp.content + + def upload_agentic_document( + self, project_id: str, file_name: str, data: bytes, mime_type: str + ) -> dict[str, Any]: + """Upload a document into a target agentic project. + + Creates the ``AgenticDocument`` row; extraction/summary stay a UI step, + as with Prompt Studio uploads. Callers pre-check filenames to avoid + duplicates on re-runs. + """ + files = {"file": (file_name, data, mime_type)} + return self._request( + "POST", f"agentic/projects/{project_id}/documents/upload/", files=files + ) + + def list_agentic_verified_data(self, project_id: str) -> list[dict[str, Any]]: + """List a project's verified (ground-truth) data rows. Each carries + ``document_name``, ``document``, and ``data`` (the curated JSON). + """ + result = self._request( + "GET", "agentic/verified-data/", params={"project_id": project_id} + ) + return result if isinstance(result, list) else (result or {}).get("results", []) + + def create_agentic_verified_data(self, payload: dict[str, Any]) -> dict[str, Any]: + """Create a verified-data row (``project``, ``document``, ``data``). + + One row per (project, document); callers pre-check to avoid a + uniqueness clash on re-runs. + """ + return self._request("POST", "agentic/verified-data/", json=payload) diff --git a/src/unstract/clone/phases/agentic_studio.py b/src/unstract/clone/phases/agentic_studio.py index 6f51311..1ee0d1a 100644 --- a/src/unstract/clone/phases/agentic_studio.py +++ b/src/unstract/clone/phases/agentic_studio.py @@ -22,6 +22,11 @@ per-run ``agentic_prompt_version`` table and the child's ``parent_version`` resolves through it. ``project`` is bound to the target id. - **schemas**: bound to the target ``project`` and created. + - **documents**: source uploads re-uploaded to the target project (skipping + filenames already present); they live in their own store, not the Prompt + Studio document table. + - **verified data**: ground-truth rows re-pointed to the cloned document by + filename. Extracted/comparison data is regenerable and not cloned. 3. **Registry**: if the project has an active schema + prompt, republish its registry entry via the ``export`` action (mirror of custom_tool) and record an ``agentic_studio_registry`` remap. Projects with no source @@ -35,6 +40,7 @@ from __future__ import annotations import logging +import mimetypes import threading from typing import Any @@ -170,6 +176,8 @@ def _clone_project( self._replicate_share(src, name, tgt_project_id, result, lock) self._clone_prompt_versions(name, src_project_id, tgt_project_id, result, lock) self._clone_schemas(name, src_project_id, tgt_project_id, result, lock) + self._clone_documents(name, src_project_id, tgt_project_id, result, lock) + self._clone_verified_data(name, src_project_id, tgt_project_id, result, lock) self._republish_registry(name, src_project_id, tgt_project_id, result, lock) def _build_project_payload( @@ -422,6 +430,201 @@ def _clone_schemas( with lock: result.created += 1 + # ----- documents ----- + + def _clone_documents( + self, + name: str, + src_project_id: str, + tgt_project_id: str, + result: PhaseResult, + lock: threading.Lock, + ) -> None: + try: + src_docs = self.ctx.source.list_agentic_documents(src_project_id) + except Exception as e: + logger.exception("agentic '%s': document listing failed: %s", name, e) + with lock: + result.failed += 1 + result.errors.append(f"agentic {name} list documents: {e}") + return + if not src_docs: + return + + # Honour the file strategy: 'skip' means no binary transfer, like the + # files and lookups phases. Operator re-uploads on target. + if self.ctx.options.file_strategy == "skip": + with lock: + result.skipped += len(src_docs) + result.warnings.append( + f"agentic {name}: {len(src_docs)} document(s) not copied " + "(file_strategy=skip) — upload them manually on target" + ) + return + + try: + tgt_docs = self.ctx.target.list_agentic_documents(tgt_project_id) + except Exception as e: + logger.warning( + "agentic '%s': target document listing failed " + "(re-run may duplicate): %s", + name, + e, + ) + tgt_docs = [] + target_names = {d.get("original_filename") for d in tgt_docs} + + for src in src_docs: + file_name = src.get("original_filename") + src_doc_id = src.get("id") + if not file_name or not src_doc_id: + continue + if file_name in target_names: + with lock: + result.skipped += 1 + logger.info( + "agentic '%s': document '%s' already on target — skipping", + name, + file_name, + ) + continue + self._clone_one_document( + name, tgt_project_id, src_doc_id, file_name, result, lock + ) + + def _clone_one_document( + self, + name: str, + tgt_project_id: str, + src_doc_id: str, + file_name: str, + result: PhaseResult, + lock: threading.Lock, + ) -> None: + try: + raw = self.ctx.source.download_agentic_document(src_doc_id) + except Exception as e: + logger.exception( + "agentic '%s': document '%s' download failed: %s", name, file_name, e + ) + with lock: + result.failed += 1 + result.errors.append(f"agentic {name} download {file_name}: {e}") + return + + if len(raw) > self.ctx.options.max_file_size: + with lock: + result.skipped += 1 + result.warnings.append( + f"agentic {name}: document {file_name} exceeds size cap — " + "upload it manually on target" + ) + return + + mime = mimetypes.guess_type(file_name)[0] or "application/pdf" + try: + self.ctx.target.upload_agentic_document( + tgt_project_id, file_name, raw, mime + ) + except Exception as e: + logger.exception( + "agentic '%s': document '%s' upload failed: %s", name, file_name, e + ) + with lock: + result.failed += 1 + result.errors.append(f"agentic {name} upload {file_name}: {e}") + return + with lock: + result.created += 1 + logger.info("agentic '%s': uploaded document '%s'", name, file_name) + + # ----- verified (ground-truth) data ----- + + def _clone_verified_data( + self, + name: str, + src_project_id: str, + tgt_project_id: str, + result: PhaseResult, + lock: threading.Lock, + ) -> None: + try: + src_rows = self.ctx.source.list_agentic_verified_data(src_project_id) + except Exception as e: + logger.exception("agentic '%s': verified-data listing failed: %s", name, e) + with lock: + result.failed += 1 + result.errors.append(f"agentic {name} list verified-data: {e}") + return + if not src_rows: + return + + # Verified data FKs a document; under 'skip' no documents are copied, + # so skip the rows too — matching the plan and _clone_documents. + if self.ctx.options.file_strategy == "skip": + with lock: + result.skipped += len(src_rows) + result.warnings.append( + f"agentic {name}: {len(src_rows)} verified-data row(s) not " + "copied (file_strategy=skip)" + ) + return + + # Verified data FKs a document; map source rows to target docs by + # filename, the only identity stable across orgs. + try: + tgt_docs = self.ctx.target.list_agentic_documents(tgt_project_id) + tgt_existing = self.ctx.target.list_agentic_verified_data(tgt_project_id) + except Exception as e: + logger.warning( + "agentic '%s': target verified-data lookup failed " + "(re-run may duplicate): %s", + name, + e, + ) + tgt_docs, tgt_existing = [], [] + doc_id_by_name = {d.get("original_filename"): d.get("id") for d in tgt_docs} + verified_doc_ids = {r.get("document") for r in tgt_existing} + + for src in src_rows: + file_name = src.get("document_name") + tgt_doc_id = doc_id_by_name.get(file_name) + if not tgt_doc_id: + with lock: + result.skipped += 1 + result.warnings.append( + f"agentic {name}: verified data for '{file_name}' skipped — " + "document not on target" + ) + continue + if tgt_doc_id in verified_doc_ids: + with lock: + result.skipped += 1 + continue + payload = { + "project": tgt_project_id, + "document": tgt_doc_id, + "data": src.get("data"), + } + try: + self.ctx.target.create_agentic_verified_data(payload) + except Exception as e: + logger.exception( + "agentic '%s': verified data for '%s' create failed: %s", + name, + file_name, + e, + ) + with lock: + result.failed += 1 + result.errors.append( + f"agentic {name} create verified-data {file_name}: {e}" + ) + continue + with lock: + result.created += 1 + logger.info("agentic '%s': cloned verified data for '%s'", name, file_name) + # ----- registry ----- def _republish_registry( @@ -507,8 +710,9 @@ def _plan_children( result: PhaseResult, lock: threading.Lock, ) -> None: - """Dry-run: count source prompt versions + schemas as planned and - record planned prompt-version ids so downstream resolves don't miss. + """Dry-run: count source prompt versions + schemas + documents as + planned and record planned prompt-version ids so downstream resolves + don't miss. """ try: src_versions = self.ctx.source.list_agentic_prompt_versions( @@ -522,11 +726,27 @@ def _plan_children( ) except Exception: src_schemas = [] + try: + src_docs = self.ctx.source.list_agentic_documents(src_project_id) + except Exception: + src_docs = [] + try: + src_verified = self.ctx.source.list_agentic_verified_data(src_project_id) + except Exception: + src_verified = [] with lock: for v in src_versions: self.ctx.remap.record_planned("agentic_prompt_version", v["id"]) result.created += 1 result.created += len(src_schemas) + # Documents move only when the file strategy copies binaries. + # Verified data FKs a document, so skipping files strands it too. + if self.ctx.options.file_strategy == "skip": + result.skipped += len(src_docs) + result.skipped += len(src_verified) + else: + result.created += len(src_docs) + result.created += len(src_verified) # ----- settings ----- diff --git a/src/unstract/clone/phases/tool_instance.py b/src/unstract/clone/phases/tool_instance.py index af20171..1220d43 100644 --- a/src/unstract/clone/phases/tool_instance.py +++ b/src/unstract/clone/phases/tool_instance.py @@ -108,12 +108,17 @@ def _clone_workflow_tools( src_ti_id = src_ti["id"] src_tool_id = src_ti["tool_id"] + # tool_id is a registry id: Prompt Studio tools register under + # prompt_studio_registry, exported agentic projects under + # agentic_studio_registry. Resolve against both. with lock: - tgt_tool_id = self.ctx.remap.resolve("prompt_studio_registry", src_tool_id) + tgt_tool_id = self.ctx.remap.resolve( + "prompt_studio_registry", src_tool_id + ) or self.ctx.remap.resolve("agentic_studio_registry", src_tool_id) if not tgt_tool_id: logger.warning( "skipping tool_instance %s — no registry remap for tool_id %s " - "(custom tool likely unpublished on source)", + "(tool likely unpublished on source)", src_ti_id, src_tool_id, ) diff --git a/tests/clone/test_agentic_studio_phase.py b/tests/clone/test_agentic_studio_phase.py index 4521ef4..e476c11 100644 --- a/tests/clone/test_agentic_studio_phase.py +++ b/tests/clone/test_agentic_studio_phase.py @@ -39,6 +39,9 @@ def __init__( settings=None, registries=None, users=None, + documents=None, + document_blobs=None, + verified_data=None, ): self.projects = list(projects or []) self.users = list(users or []) @@ -50,6 +53,12 @@ def __init__( self.settings = list(settings or []) # project_id -> list of registry rows self.registries = {k: list(v) for k, v in (registries or {}).items()} + # project_id -> list of document rows + self.documents = {k: list(v) for k, v in (documents or {}).items()} + # document_id -> raw bytes + self.document_blobs = dict(document_blobs or {}) + # project_id -> list of verified-data rows + self.verified_data = {k: list(v) for k, v in (verified_data or {}).items()} self.created_projects: list[dict] = [] self.created_versions: list[dict] = [] @@ -57,6 +66,8 @@ def __init__( self.created_settings: list[dict] = [] self.updated_settings: list[tuple[str, dict]] = [] self.exported_projects: list[str] = [] + self.uploaded_documents: list[tuple[str, str, bytes]] = [] + self.created_verified_data: list[dict] = [] self._next_id = 1 def _mint(self, prefix: str) -> str: @@ -141,6 +152,34 @@ def export_agentic_project(self, project_id, *, force=True): def list_agentic_registries(self, *, agentic_project=None): return list(self.registries.get(agentic_project, [])) + # ----- documents ----- + + def list_agentic_documents(self, project_id): + return list(self.documents.get(project_id, [])) + + def download_agentic_document(self, document_id): + return self.document_blobs.get(document_id, b"%PDF-fake") + + def upload_agentic_document(self, project_id, file_name, data, mime_type): + self.uploaded_documents.append((project_id, file_name, data)) + row = {"id": self._mint("tgt-doc"), "original_filename": file_name} + self.documents.setdefault(project_id, []).append(row) + return {"data": [{"document_name": file_name}]} + + # ----- verified data ----- + + def list_agentic_verified_data(self, project_id): + return list(self.verified_data.get(project_id, [])) + + def create_agentic_verified_data(self, payload): + new = dict(payload) + new["id"] = self._mint("tgt-vd") + self.created_verified_data.append(new) + self.verified_data.setdefault(payload["project"], []).append( + {"document": payload["document"], "data": payload["data"]} + ) + return new + def _ctx(source, target, *, remap=None, **opt_overrides): return CloneContext( @@ -507,3 +546,177 @@ def test_source_group_share_replicated_via_remap(): assert len(tgt.shared_projects) == 1 _, payload = tgt.shared_projects[0] assert payload["shared_groups"] == [70] + + +def test_documents_cloned_skipping_those_already_on_target(): + src = FakeClient( + projects=[_src_project("src-p", "Receipts")], + documents={ + "src-p": [ + {"id": "d-new", "original_filename": "new.pdf"}, + {"id": "d-dup", "original_filename": "dup.pdf"}, + ] + }, + document_blobs={"d-new": b"%PDF-new", "d-dup": b"%PDF-dup"}, + ) + tgt = FakeClient() + # Adopt an existing target project so the dup doc can be pre-seeded on it. + tgt.projects.append({"id": "tgt-proj-0001", "name": "Receipts"}) + tgt.documents["tgt-proj-0001"] = [{"id": "t-dup", "original_filename": "dup.pdf"}] + ctx = _ctx(src, tgt) + + result = AgenticStudioPhase(ctx).run(CloneReport()) + + assert result.failed == 0 + # 'new.pdf' uploaded, 'dup.pdf' skipped (already present). + assert tgt.uploaded_documents == [("tgt-proj-0001", "new.pdf", b"%PDF-new")] + assert result.skipped == 1 + + +def test_dry_run_counts_documents_without_uploading(): + src = FakeClient( + projects=[_src_project("src-p", "Receipts")], + documents={ + "src-p": [ + {"id": "d1", "original_filename": "a.pdf"}, + {"id": "d2", "original_filename": "b.pdf"}, + ] + }, + ) + tgt = FakeClient() + ctx = _ctx(src, tgt, dry_run=True) + + result = AgenticStudioPhase(ctx).run(CloneReport()) + + assert tgt.uploaded_documents == [] + # 1 project + 2 documents predicted as creates. + assert result.created == 3 + + +def test_verified_data_cloned_mapped_to_target_doc_by_filename(): + src = FakeClient( + projects=[_src_project("src-p", "Receipts")], + documents={"src-p": [{"id": "d1", "original_filename": "report.pdf"}]}, + document_blobs={"d1": b"%PDF-r"}, + verified_data={ + "src-p": [ + {"document_name": "report.pdf", "document": "d1", "data": {"x": 1}}, + # Ground truth for a doc that won't exist on target → skipped. + {"document_name": "gone.pdf", "document": "d9", "data": {"y": 2}}, + ] + }, + ) + tgt = FakeClient() + ctx = _ctx(src, tgt) + + result = AgenticStudioPhase(ctx).run(CloneReport()) + + assert result.failed == 0 + assert len(tgt.created_verified_data) == 1 + row = tgt.created_verified_data[0] + # Re-pointed to the freshly uploaded target document, not the source id. + uploaded_doc_id = tgt.documents[row["project"]][0]["id"] + assert row["document"] == uploaded_doc_id + assert row["data"] == {"x": 1} + # 'gone.pdf' ground truth skipped (no matching target document). + assert any("gone.pdf" in w for w in result.warnings) + + +def test_verified_data_skipped_when_already_on_target(): + src = FakeClient( + projects=[_src_project("src-p", "Receipts")], + documents={"src-p": [{"id": "d1", "original_filename": "report.pdf"}]}, + verified_data={ + "src-p": [ + {"document_name": "report.pdf", "document": "d1", "data": {"x": 1}} + ] + }, + ) + tgt = FakeClient() + # Adopt an existing target project pre-seeded with the doc + its verified row. + tgt.projects.append({"id": "tgt-proj-0001", "name": "Receipts"}) + tgt.documents["tgt-proj-0001"] = [{"id": "t-d1", "original_filename": "report.pdf"}] + tgt.verified_data["tgt-proj-0001"] = [{"document": "t-d1", "data": {"x": 1}}] + ctx = _ctx(src, tgt) + + AgenticStudioPhase(ctx).run(CloneReport()) + + assert tgt.created_verified_data == [] + + +def test_dry_run_counts_verified_data_without_writing(): + src = FakeClient( + projects=[_src_project("src-p", "Receipts")], + verified_data={ + "src-p": [{"document_name": "a.pdf", "document": "d1", "data": {}}] + }, + ) + tgt = FakeClient() + ctx = _ctx(src, tgt, dry_run=True) + + result = AgenticStudioPhase(ctx).run(CloneReport()) + + assert tgt.created_verified_data == [] + # 1 project + 1 verified-data row predicted as creates. + assert result.created == 2 + + +def test_documents_not_copied_under_file_strategy_skip(): + src = FakeClient( + projects=[_src_project("src-p", "Receipts")], + documents={"src-p": [{"id": "d1", "original_filename": "report.pdf"}]}, + document_blobs={"d1": b"%PDF-r"}, + ) + tgt = FakeClient() + ctx = _ctx(src, tgt, file_strategy="skip") + + result = AgenticStudioPhase(ctx).run(CloneReport()) + + assert tgt.uploaded_documents == [] + assert result.skipped == 1 + + +def test_verified_data_not_copied_under_skip_even_if_doc_on_target(): + # Docs reached the target by other means (prior non-skip run / manual + # upload), so doc-by-name maps. Under skip, verified data must still be + # skipped at runtime — matching the plan, not created off the stray doc. + src = FakeClient( + projects=[_src_project("src-p", "Receipts")], + documents={"src-p": [{"id": "d1", "original_filename": "report.pdf"}]}, + document_blobs={"d1": b"%PDF-r"}, + verified_data={ + "src-p": [{"document_name": "report.pdf", "document": "d1", "data": {}}] + }, + ) + tgt = FakeClient() + tgt.projects.append({"id": "tgt-proj-0001", "name": "Receipts"}) + tgt.documents["tgt-proj-0001"] = [{"id": "t-d1", "original_filename": "report.pdf"}] + ctx = _ctx(src, tgt, file_strategy="skip") + + result = AgenticStudioPhase(ctx).run(CloneReport()) + + assert tgt.created_verified_data == [] + assert any("verified-data" in w for w in result.warnings) + + +def test_dry_run_skip_strategy_counts_docs_and_verified_as_skipped(): + # Verified data FKs a document; under skip no docs land on target, so the + # plan must forecast verified as skipped too — not as un-creatable creates. + src = FakeClient( + projects=[_src_project("src-p", "Receipts")], + documents={"src-p": [{"id": "d1", "original_filename": "report.pdf"}]}, + document_blobs={"d1": b"%PDF-r"}, + verified_data={ + "src-p": [{"document_name": "report.pdf", "document": "d1", "data": {}}] + }, + ) + tgt = FakeClient() + ctx = _ctx(src, tgt, dry_run=True, file_strategy="skip") + + result = AgenticStudioPhase(ctx).run(CloneReport()) + + assert tgt.uploaded_documents == [] + assert tgt.created_verified_data == [] + # Only the project is a forecast create; doc + verified row both skipped. + assert result.created == 1 + assert result.skipped == 2 diff --git a/tests/clone/test_tool_instance_phase.py b/tests/clone/test_tool_instance_phase.py index a161ab4..b456f6b 100644 --- a/tests/clone/test_tool_instance_phase.py +++ b/tests/clone/test_tool_instance_phase.py @@ -144,6 +144,24 @@ def test_skip_when_registry_remap_missing(): assert tgt.create_calls == [] +def test_resolves_tool_id_via_agentic_registry(): + # An exported agentic project registers under agentic_studio_registry, + # not prompt_studio_registry; its tool_instance must still resolve. + src = FakeClient() + src.instances[SRC_WF] = [_src_ti("src-ti-1", SRC_WF, SRC_REG, {})] + tgt = FakeClient() + remap = RemapTable() + remap.record("workflow", SRC_WF, TGT_WF) + remap.record("agentic_studio_registry", SRC_REG, TGT_REG) + ctx = _ctx(src, tgt, remap=remap) + + result = ToolInstancePhase(ctx).run(CloneReport()) + + assert result.created == 1 + assert result.skipped == 0 + assert tgt.create_calls[0]["tool_id"] == TGT_REG + + def test_adopt_existing_target_instance_and_repatch_metadata(): src = FakeClient() src_meta = {"llm": "My OpenAI"}