Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions src/unstract/clone/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -921,3 +921,62 @@ def list_agentic_registries(
params["agentic_project"] = agentic_project
result = self._request("GET", "agentic-studio-registry/", params=params)
return result if isinstance(result, list) else (result or {}).get("results", [])

def list_agentic_documents(self, project_id: str) -> list[dict[str, Any]]:
"""List a project's uploaded documents. Rows carry ``id`` and
``original_filename``. Agentic docs are a store of their own, distinct
from Prompt Studio ``prompt-document`` rows.
"""
result = self._request(
"GET", "agentic/documents/", params={"project_id": project_id}
)
return result if isinstance(result, list) else (result or {}).get("results", [])

def download_agentic_document(self, document_id: str) -> bytes:
"""Download an agentic document's original bytes.

The ``file`` route serves the raw binary (not a JSON envelope), so this
bypasses the JSON-decoding request path.
"""
url = self._url(f"agentic/documents/{document_id}/file/")
logger.debug("GET %s", url)
resp = self._session.get(url, timeout=self.timeout, verify=self.verify)
if not 200 <= resp.status_code < 300:
raise PlatformAPIError(
f"GET agentic/documents/{document_id}/file/ "
f"returned {resp.status_code}",
status_code=resp.status_code,
body=resp.text[:2000],
)
return resp.content

def upload_agentic_document(
self, project_id: str, file_name: str, data: bytes, mime_type: str
) -> dict[str, Any]:
"""Upload a document into a target agentic project.

Creates the ``AgenticDocument`` row; extraction/summary stay a UI step,
as with Prompt Studio uploads. Callers pre-check filenames to avoid
duplicates on re-runs.
"""
files = {"file": (file_name, data, mime_type)}
return self._request(
"POST", f"agentic/projects/{project_id}/documents/upload/", files=files
)

def list_agentic_verified_data(self, project_id: str) -> list[dict[str, Any]]:
"""List a project's verified (ground-truth) data rows. Each carries
``document_name``, ``document``, and ``data`` (the curated JSON).
"""
result = self._request(
"GET", "agentic/verified-data/", params={"project_id": project_id}
)
return result if isinstance(result, list) else (result or {}).get("results", [])

def create_agentic_verified_data(self, payload: dict[str, Any]) -> dict[str, Any]:
"""Create a verified-data row (``project``, ``document``, ``data``).

One row per (project, document); callers pre-check to avoid a
uniqueness clash on re-runs.
"""
return self._request("POST", "agentic/verified-data/", json=payload)
224 changes: 222 additions & 2 deletions src/unstract/clone/phases/agentic_studio.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
per-run ``agentic_prompt_version`` table and the child's ``parent_version``
resolves through it. ``project`` is bound to the target id.
- **schemas**: bound to the target ``project`` and created.
- **documents**: source uploads re-uploaded to the target project (skipping
filenames already present); they live in their own store, not the Prompt
Studio document table.
- **verified data**: ground-truth rows re-pointed to the cloned document by
filename. Extracted/comparison data is regenerable and not cloned.
3. **Registry**: if the project has an active schema + prompt, republish its
registry entry via the ``export`` action (mirror of custom_tool) and
record an ``agentic_studio_registry`` remap. Projects with no source
Expand All @@ -35,6 +40,7 @@
from __future__ import annotations

import logging
import mimetypes
import threading
from typing import Any

Expand Down Expand Up @@ -170,6 +176,8 @@ def _clone_project(
self._replicate_share(src, name, tgt_project_id, result, lock)
self._clone_prompt_versions(name, src_project_id, tgt_project_id, result, lock)
self._clone_schemas(name, src_project_id, tgt_project_id, result, lock)
self._clone_documents(name, src_project_id, tgt_project_id, result, lock)
self._clone_verified_data(name, src_project_id, tgt_project_id, result, lock)
self._republish_registry(name, src_project_id, tgt_project_id, result, lock)

def _build_project_payload(
Expand Down Expand Up @@ -422,6 +430,201 @@ def _clone_schemas(
with lock:
result.created += 1

# ----- documents -----

def _clone_documents(
self,
name: str,
src_project_id: str,
tgt_project_id: str,
result: PhaseResult,
lock: threading.Lock,
) -> None:
try:
src_docs = self.ctx.source.list_agentic_documents(src_project_id)
except Exception as e:
logger.exception("agentic '%s': document listing failed: %s", name, e)
with lock:
result.failed += 1
result.errors.append(f"agentic {name} list documents: {e}")
return
if not src_docs:
return

# Honour the file strategy: 'skip' means no binary transfer, like the
# files and lookups phases. Operator re-uploads on target.
if self.ctx.options.file_strategy == "skip":
with lock:
result.skipped += len(src_docs)
result.warnings.append(
f"agentic {name}: {len(src_docs)} document(s) not copied "
"(file_strategy=skip) — upload them manually on target"
)
return

try:
tgt_docs = self.ctx.target.list_agentic_documents(tgt_project_id)
except Exception as e:
logger.warning(
"agentic '%s': target document listing failed "
"(re-run may duplicate): %s",
name,
e,
)
tgt_docs = []
target_names = {d.get("original_filename") for d in tgt_docs}

for src in src_docs:
file_name = src.get("original_filename")
src_doc_id = src.get("id")
if not file_name or not src_doc_id:
continue
if file_name in target_names:
with lock:
result.skipped += 1
logger.info(
"agentic '%s': document '%s' already on target — skipping",
name,
file_name,
)
continue
self._clone_one_document(
name, tgt_project_id, src_doc_id, file_name, result, lock
)
Comment thread
greptile-apps[bot] marked this conversation as resolved.

def _clone_one_document(
self,
name: str,
tgt_project_id: str,
src_doc_id: str,
file_name: str,
result: PhaseResult,
lock: threading.Lock,
) -> None:
try:
raw = self.ctx.source.download_agentic_document(src_doc_id)
except Exception as e:
logger.exception(
"agentic '%s': document '%s' download failed: %s", name, file_name, e
)
with lock:
result.failed += 1
result.errors.append(f"agentic {name} download {file_name}: {e}")
return

if len(raw) > self.ctx.options.max_file_size:
with lock:
result.skipped += 1
result.warnings.append(
f"agentic {name}: document {file_name} exceeds size cap — "
"upload it manually on target"
)
return

mime = mimetypes.guess_type(file_name)[0] or "application/pdf"
try:
self.ctx.target.upload_agentic_document(
tgt_project_id, file_name, raw, mime
)
except Exception as e:
logger.exception(
"agentic '%s': document '%s' upload failed: %s", name, file_name, e
)
with lock:
result.failed += 1
result.errors.append(f"agentic {name} upload {file_name}: {e}")
return
with lock:
result.created += 1
logger.info("agentic '%s': uploaded document '%s'", name, file_name)

# ----- verified (ground-truth) data -----

def _clone_verified_data(
self,
name: str,
src_project_id: str,
tgt_project_id: str,
result: PhaseResult,
lock: threading.Lock,
) -> None:
try:
src_rows = self.ctx.source.list_agentic_verified_data(src_project_id)
except Exception as e:
logger.exception("agentic '%s': verified-data listing failed: %s", name, e)
with lock:
result.failed += 1
result.errors.append(f"agentic {name} list verified-data: {e}")
return
if not src_rows:
return
Comment thread
greptile-apps[bot] marked this conversation as resolved.

# Verified data FKs a document; under 'skip' no documents are copied,
# so skip the rows too — matching the plan and _clone_documents.
if self.ctx.options.file_strategy == "skip":
with lock:
result.skipped += len(src_rows)
result.warnings.append(
f"agentic {name}: {len(src_rows)} verified-data row(s) not "
"copied (file_strategy=skip)"
)
return

# Verified data FKs a document; map source rows to target docs by
# filename, the only identity stable across orgs.
try:
tgt_docs = self.ctx.target.list_agentic_documents(tgt_project_id)
tgt_existing = self.ctx.target.list_agentic_verified_data(tgt_project_id)
except Exception as e:
logger.warning(
"agentic '%s': target verified-data lookup failed "
"(re-run may duplicate): %s",
name,
e,
)
tgt_docs, tgt_existing = [], []
doc_id_by_name = {d.get("original_filename"): d.get("id") for d in tgt_docs}
verified_doc_ids = {r.get("document") for r in tgt_existing}

for src in src_rows:
file_name = src.get("document_name")
tgt_doc_id = doc_id_by_name.get(file_name)
if not tgt_doc_id:
with lock:
result.skipped += 1
result.warnings.append(
f"agentic {name}: verified data for '{file_name}' skipped — "
"document not on target"
)
continue
if tgt_doc_id in verified_doc_ids:
with lock:
result.skipped += 1
continue
payload = {
"project": tgt_project_id,
"document": tgt_doc_id,
"data": src.get("data"),
}
try:
self.ctx.target.create_agentic_verified_data(payload)
except Exception as e:
logger.exception(
"agentic '%s': verified data for '%s' create failed: %s",
name,
file_name,
e,
)
with lock:
result.failed += 1
result.errors.append(
f"agentic {name} create verified-data {file_name}: {e}"
)
continue
with lock:
result.created += 1
logger.info("agentic '%s': cloned verified data for '%s'", name, file_name)

# ----- registry -----

def _republish_registry(
Expand Down Expand Up @@ -507,8 +710,9 @@ def _plan_children(
result: PhaseResult,
lock: threading.Lock,
) -> None:
"""Dry-run: count source prompt versions + schemas as planned and
record planned prompt-version ids so downstream resolves don't miss.
"""Dry-run: count source prompt versions + schemas + documents as
planned and record planned prompt-version ids so downstream resolves
don't miss.
"""
try:
src_versions = self.ctx.source.list_agentic_prompt_versions(
Expand All @@ -522,11 +726,27 @@ def _plan_children(
)
except Exception:
src_schemas = []
try:
src_docs = self.ctx.source.list_agentic_documents(src_project_id)
except Exception:
src_docs = []
try:
src_verified = self.ctx.source.list_agentic_verified_data(src_project_id)
except Exception:
src_verified = []
with lock:
for v in src_versions:
self.ctx.remap.record_planned("agentic_prompt_version", v["id"])
result.created += 1
result.created += len(src_schemas)
# Documents move only when the file strategy copies binaries.
# Verified data FKs a document, so skipping files strands it too.
if self.ctx.options.file_strategy == "skip":
result.skipped += len(src_docs)
result.skipped += len(src_verified)
else:
result.created += len(src_docs)
result.created += len(src_verified)

# ----- settings -----

Expand Down
9 changes: 7 additions & 2 deletions src/unstract/clone/phases/tool_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,17 @@ def _clone_workflow_tools(
src_ti_id = src_ti["id"]
src_tool_id = src_ti["tool_id"]

# tool_id is a registry id: Prompt Studio tools register under
# prompt_studio_registry, exported agentic projects under
# agentic_studio_registry. Resolve against both.
with lock:
tgt_tool_id = self.ctx.remap.resolve("prompt_studio_registry", src_tool_id)
tgt_tool_id = self.ctx.remap.resolve(
"prompt_studio_registry", src_tool_id
) or self.ctx.remap.resolve("agentic_studio_registry", src_tool_id)
if not tgt_tool_id:
logger.warning(
"skipping tool_instance %s — no registry remap for tool_id %s "
"(custom tool likely unpublished on source)",
"(tool likely unpublished on source)",
src_ti_id,
src_tool_id,
)
Expand Down
Loading
Loading