ULUSOY_DIGITAL
← Tüm Yazılar
Teknoloji

How to Build a Document Intelligence Backend with iii Using Workers, Functions, and Cron Triggers

Otomatik Bot 04 June 2026 0 görüntülenme

In this tutorial, we build a document-intelligence workflow with iii. We begin by installing the iii engine and Python SDK, then start the engine as a background process and connect a Python worker to it. After the setup, we register separate functions for text normalization, tokenization, sentiment analysis, keyword extraction, reporting, and heartbeat tracking. We then combine these functions into a single analysis pipeline and run the same logic via direct invocation, an HTTP endpoint, fire-and-forget execution, and a scheduled cron trigger. Along the way, we also track basic runtime state, making the workflow feel closer to a real backend system than a static notebook demo. Check out the FULL CODES here.

import os, sys, subprocess, time, socket, json, threading
from collections import Counter
HOME    = os.path.expanduser("~")
BIN_DIR = f"{HOME}/.local/bin"
os.environ["PATH"] = BIN_DIR + os.pathsep + os.environ.get("PATH", "")
def sh(cmd):
   print(f"$ {cmd}")
   subprocess.run(cmd, shell=True, check=True)
if not os.path.exists(f"{BIN_DIR}/iii"):
   sh(f"curl -fsSL https://install.iii.dev/iii/main/install.sh | BIN_DIR={BIN_DIR} sh")
sh(f"{sys.executable} -m pip install -q iii-sdk requests")
III = f"{BIN_DIR}/iii"
sh(f"{III} --version")

We start by importing the required Python modules and setting up the local binary path for the III engine. We define a small helper function to run shell commands and install the III engine if it is not already available. We also install the Python SDK and requests package, then verify the iii installation by checking its version.

WS_URL, HTTP_URL = "ws://localhost:49134", "http://localhost:3111"
engine_log = open("/tmp/iii-engine.log", "w")
engine = subprocess.Popen([III, "--use-default-config"],
                         stdout=engine_log, stderr=subprocess.STDOUT)
def wait_port(host, port, timeout=90):
   end = time.time() + timeout
   while time.time() < end:
       with socket.socket() as s:
           s.settimeout(1)
           try:
               s.connect((host, port)); return True
           except OSError:
               time.sleep(0.5)
   return False
assert wait_port("localhost", 49134), "engine never came up — see /tmp/iii-engine.log"
print(f"✓ engine up — WS {WS_URL} | HTTP {HTTP_URL}")
from iii import register_worker
try:
   from iii import TriggerAction
except Exception:
   TriggerAction = None
worker = register_worker(WS_URL)
_STATE = {"docs_analyzed": 0, "heartbeats": 0, "keyword_totals": Counter()}
_LOCK  = threading.Lock()
POSITIVE = {"good","great","love","excellent","happy","fast","reliable","amazing","best","win"}
NEGATIVE = {"bad","terrible","hate","slow","broken","sad","worst","bug","crash","fail"}

We launch the iii engine as a background process and wait for its WebSocket port to become available. We then connect a Python worker to the running engine and prepare optional support for fire-and-forget triggers. We also define a shared in-memory state, a thread lock, and simple positive and negative word sets for sentiment analysis.

def normalize(data):
   return {"text": (data.get("text") or "").strip().lower()}
def tokenize(data):
   text   = data.get("text", "")
   cleaned = "".join(c if (c.isalnum() or c.isspace()) else " " for c in text)
   tokens = [t for t in cleaned.split() if t]
   return {"tokens": tokens, "count": len(tokens)}
def sentiment(data):
   toks  = data.get("tokens", [])
   pos   = sum(t in POSITIVE for t in toks)
   neg   = sum(t in NEGATIVE for t in toks)
   score = pos - neg
   label = "positive" if score > 0 else "negative" if score < 0 else "neutral"
   return {"label": label, "score": score, "pos": pos, "neg": neg}
def keywords(data):
   toks = data.get("tokens", [])
   stop = {"the","a","an","is","it","to","of","and","in","for","on","how"}
   freq = Counter(t for t in toks if t not in stop and len(t) > 2)
   return {"keywords": freq.most_common(data.get("top_n", 5))}
def analyze(data):
   norm = worker.trigger({"function_id": "text::normalize", "payload": {"text": data.get("text","")}})
   toks = worker.trigger({"function_id": "text::tokenize",  "payload": norm})
   sent = worker.trigger({"function_id": "text::sentiment", "payload": toks})
   keys = worker.trigger({"function_id": "text::keywords",  "payload": {**toks, "top_n": data.get("top_n", 5)}})
   with _LOCK:
       _STATE["docs_analyzed"] += 1
       for k, c in keys["keywords"]:
           _STATE["keyword_totals"][k] += c
       n = _STATE["docs_analyzed"]
   return {"tokens": toks["count"], "sentiment": sent, "keywords": keys["keywords"], "docs_analyzed": n}
def report(data):
   with _LOCK:
       return {"docs_analyzed": _STATE["docs_analyzed"],
               "heartbeats":    _STATE["heartbeats"],
               "top_keywords_all_docs": _STATE["keyword_totals"].most_common(5)}
def http_analyze(data):
   body   = data.get("body") or {}
   result = worker.trigger({"function_id": "pipeline::analyze", "payload": body})
   return {"status_code": 200, "body": result, "headers": {"Content-Type": "application/json"}}
def heartbeat(data):
   with _LOCK:
       _STATE["heartbeats"] += 1
   return {"ok": True}
for fid, fn in [
   ("text::normalize", normalize), ("text::tokenize", tokenize),
   ("text::sentiment", sentiment), ("text::keywords", keywords),
   ("pipeline::analyze", analyze), ("stats::report", report),
   ("http::analyze", http_analyze), ("cron::heartbeat", heartbeat),
]:
   worker.register_function(fid, fn)

We define the core functions used in the text-analysis workflow, including normalization, tokenization, sentiment detection, and keyword extraction. We then create an analysis function that routes each step through the III engine instead of calling everything directly. We also add reporting, HTTP handling, and heartbeat functions before registering all of them with the worker.

worker.register_trigger({"type": "http", "function_id": "http::analyze",
                        "config": {"api_path": "/analyze", "http_method": "POST"}})
cron_ok = False
try:
   worker.register_trigger({"type": "cron", "function_id": "cron::heartbeat",
                            "config": {"schedule": "*/2 * * * * *"}})
   cron_ok = True
except Exception as e:
   print("cron trigger skipped:", e)
try:
   worker.connect()
except Exception:
   pass
time.sleep(2)

We register an HTTP trigger so that the analysis pipeline can be invoked via a POST request. We also try to register a cron trigger that runs the heartbeat function on a fixed schedule, while safely skipping it if the engine build does not support that schema. We then connect the worker and pause briefly so the registered functions and triggers are ready to use.

print("\n=== A) Direct invocation — orchestrated through the engine ===")
docs = [
   "iii makes the backend amazing and fast, I love how reliable it is",
   "The legacy gateway was slow and broken, a terrible buggy experience",
   "Workers register functions and triggers; the engine routes every call",
]
for d in docs:
   r = worker.trigger({"function_id": "pipeline::analyze", "payload": {"text": d, "top_n": 4}})
   print(f"  [{r['sentiment']['label']:>8}] tokens={r['tokens']:>2}  keywords={r['keywords']}")
print("\n=== B) The SAME function over HTTP (:3111) — zero handler changes ===")
import requests
try:
   resp = requests.post(f"{HTTP_URL}/analyze",
                        json={"text": "great great product, best ever", "top_n": 3}, timeout=10)
   print("  HTTP", resp.status_code, "->", resp.json())
except Exception as e:
   print("  HTTP call failed (engine HTTP module/version?):", e)
print("\n=== C) Fire-and-forget invocation ===")
if TriggerAction:
   worker.trigger({"function_id": "pipeline::analyze",
                   "payload": {"text": "async win, no waiting"},
                   "action": TriggerAction.Void()})
   print("  dispatched (no result awaited)")
else:
   print("  TriggerAction not in this SDK build — skipping")
print("\n=== D) Cron trigger firing on its own ===")
if cron_ok:
   time.sleep(5)
   print("  heartbeats so far:",
         worker.trigger({"function_id": "stats::report", "payload": {}})["heartbeats"])
else:
   print("  cron not registered on this engine build")
print("\n=== E) Aggregate state report ===")
print(json.dumps(worker.trigger({"function_id": "stats::report", "payload": {}}), indent=2))
print("\nTraces/metrics: run `iii console` locally, or scrape Prometheus at :9464")
print("engine log tail:")
print(subprocess.run(["tail", "-n", "8", "/tmp/iii-engine.log"],
                    capture_output=True, text=True).stdout)

We test the complete III workflow by sending sample text documents through the registered analysis pipeline. We then call the same logic through HTTP, try fire-and-forget execution, and check whether the cron heartbeat is running. Finally, we print the aggregate state report and show the engine log tail for basic runtime visibility.

In conclusion, we have a working III system that processes text using modular, registered functions rather than a single fixed script. We analyzed sample documents, exposed the pipeline through HTTP, tested async-style execution, tracked heartbeat activity, and printed an aggregate state report. The tutorial keeps the example readable while showing the main working pattern of iii: define functions once, register them with a worker, and reuse them through different triggers and execution paths. It also shows how small functions can be cleanly connected as the workflow grows into something more production-ready.


Check out the FULL CODES hereAlso, feel free to follow us on Twitter and don’t forget to join our 150k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

Need to partner with us for promoting your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar etc.? Connect with us

The post How to Build a Document Intelligence Backend with iii Using Workers, Functions, and Cron Triggers appeared first on MarkTechPost.

📌 Kaynak: MarkTechPost

#Teknoloji