Hive Mind Document Processing Pipeline
Documents enter the pipeline from anywhere — chat, files, webhooks, or scheduled jobs
Each stage transforms, validates, and passes data forward — with error handling at every step
[[Entity Name]] wiki-style patterns. Extracts all named entities and their types into a structured list.[[A]] VERB [[B]] relationship patterns. Builds the knowledge graph edges between entities./opt/qwen3-embedder/server.py. Produces a 2560-dimensional vector.localhost:6333. Upserts point with vector + full payload (text, metadata, entities, summary, edges).Where the pipeline splits, processes concurrently, and rejoins
Actual pseudocode from our enrichment system
# [[Entity Name]] pattern extraction — Stage 3 import re ENTITY_PATTERN = r'\[\[([^\]]+)\]\]+' def extract_entities(text: str) -> List[dict]: # Find all [[Entity]] mentions raw_matches = re.findall(ENTITY_PATTERN, text) entities = [] for name in raw_matches: entity = { "name": name.strip(), "type": infer_type(name), "confidence": 0.95, "aliases": [] } entities.append(entity) return entities # [[A]] VERB [[B]] relationship extraction — Stage 6 RELATION_PATTERN = r'\[\[([^\]]+)\]\]\s+(\w+)\s+\[\[([^\]]+)\]\]+' def extract_relations(text: str) -> List[dict]: matches = re.finditer(RELATION_PATTERN, text) triplets = [] for m in matches: triplets.append({ "subject": m.group(1), "verb": m.group(2), "object": m.group(3), "loom": classify_verb(m.group(2)) }) return triplets
# PARA classification — Stage 5 from enum import Enum class PARA(Enum): PROJECT = "P" AREA = Enum.value("A") RESOURCE = "R" ARCHIVE = "Archive" def classify_para(doc: Document) -> PARA: score = { PARA.PROJECT: 0, PARA.AREA: 0, PARA.RESOURCE: 0 } # Heuristic scoring if doc.has_verbs("plan,build,launch,create,ship"): score[PARA.PROJECT] += 3 if doc.has_verbs("manage,maintain,support,oversee"): score[PARA.AREA] += 3 if doc.has_keywords("guide,reference,tutorial,docs"): score[PARA.RESOURCE] += 3 if doc.is_stale(): # >90 days no update return PARA.ARCHIVE best = max(score, key=score.get) return best
# Collection routing — Stage 8 from dataclasses import dataclass @dataclass class Loom: name: str collection: str threshold: float LOOMS = [ Loom("facts", "exocortex-facts", 0.95), Loom("decisions", "exocortex-decisions", 0.90), Loom("reports", "exocortex-reports", 0.85), Loom("predictions","exocortex-predictions", 0.50), ] def route_collections(doc: Document) -> List[str]: scores = score_looms(doc) # LLM classifies loom scores hits = [] for loom in LOOMS: if scores.get(loom.name, 0) >= loom.threshold: hits.append(loom.collection) # Always store in PARA collection too hits.append(para_collection(doc.para)) return list(set(hits)) # deduplicate
# Darwinism scoring — quality gate at end of pipeline def darwinism_score(doc: Document) -> float: score = 1.0 # Penalize missing components if not doc.summary: score -= 0.2 if not doc.entities: score -= 0.1 if not doc.edges: score -= 0.15 # Penalize short content if len(doc.text) < 100: score -= 0.25 if len(doc.text) < 50: score -= 0.15 # Bonus for high loom confidence if doc.loom_score and doc.loom_score >= 0.90: score += 0.1 return max(0.0, min(1.0, score))
Each document is vectorized and stored in one or more collections based on its loom + PARA classification
Every stored point contains the 2560-dim embedding plus the full enriched document payload
Every stage has a fallback path — nothing silently fails
Real-time health metrics from the running enrichment system