import fs from "fs"; import readline from "readline"; import { Client } from "@langchain/langgraph-sdk"; import cliProgress from "cli-progress"; import pLimit from "p-limit"; const INPUT_FILE = process.env.INPUT_FILE ?? "../../data/claims.json"; const OUTPUT_FILE = process.env.OUTPUT_FILE ?? "../../data/results.jsonl"; const API_URL = "http://localhost:2024"; const AGENT_NAME = process.env.AGENT ?? "agent"; /** * Modes * claim -> claims from DBKF * verifier -> jsonl claims to test reranking with */ const MODE = process.env.MODE ?? "claim"; const MAX_CONCURRENCY = 5; const OFFSET = parseInt(process.env.OFFSET ?? "0", 10); const LIMIT = process.env.LIMIT ? parseInt(process.env.LIMIT, 10) : null; const client = new Client({ apiUrl: API_URL }); type Claim = { documentUrl: string; text: string; dateCreated: string; [key: string]: any; }; type VerifierInput = { documentUrl?: string; text?: string; normalized?: string; events?: any; run_id: string; date?: string; [key: string]: any; }; type ResultRecord = { documentUrl?: string; text?: string; status: "success" | "error" | "wrapper_crash"; normalized?: string; events?: any; run_id: string; date?: string; // error handling error?: string; dump?: any; }; function appendResult(record: ResultRecord) { fs.appendFileSync(OUTPUT_FILE, JSON.stringify(record) + "\n"); } async function readJSONL(file: string): Promise { const stream = fs.createReadStream(file); const rl = readline.createInterface({ input: stream, crlfDelay: Infinity }); const results: any[] = []; for await (const line of rl) { if (line.trim().length === 0) continue; results.push(JSON.parse(line)); } return results; } async function loadInputs(): Promise { if (INPUT_FILE.endsWith(".jsonl")) { return readJSONL(INPUT_FILE); } const raw = fs.readFileSync(INPUT_FILE, "utf-8"); return JSON.parse(raw); } function buildAgentInput(record: Claim | VerifierInput) { if (MODE === "claim") { const claim = record as Claim; return { disinformationTitle: claim.text, date: claim.dateCreated }; } if (MODE === "verifier") { const v = record as VerifierInput; return { disinformationTitle: v.text, date: v.date, proposedTriggerEvent: v.events, normalizedClaim: v.normalizedClaim, proposedTriggerEventIndex: -1 }; } throw new Error(`Unknown mode: ${MODE}`); } async function processRecord(record: any): Promise { try { const thread = await client.threads.create(); const stream = client.runs.stream(thread.thread_id, AGENT_NAME, { input: buildAgentInput(record), streamMode: "values", config: { recursion_limit: 100 } }); let lastContent: any = null; for await (const chunk of stream) { lastContent = chunk; } if (lastContent?.event !== "error") { return { documentUrl: record.documentUrl, text: record.text, date: record.dateCreated, status: "success", events: lastContent.data.proposedTriggerEvent, normalized: lastContent.data.normalizedClaim, run_id: thread.thread_id }; } else { return { documentUrl: record.documentUrl, text: record.text, date: record.date, status: "error", dump: lastContent, run_id: thread.thread_id }; } } catch (err: any) { return { documentUrl: record.documentUrl, text: record.text, date: record.date, status: "wrapper_crash", error: err?.message ?? String(err), run_id: "NONE" }; } } async function main() { console.log("Reading input file..."); const allRecords = await loadInputs(); console.log(`Loaded ${allRecords.length} records`); const records = allRecords.slice( OFFSET, LIMIT !== null ? OFFSET + LIMIT : undefined ); console.log( `Processing ${records.length} records (offset=${OFFSET}, limit=${LIMIT ?? "∞"})` ); fs.writeFileSync(OUTPUT_FILE, "", { flag: "a" }); const limit = pLimit(MAX_CONCURRENCY); const progressBar = new cliProgress.SingleBar( { format: "Progress |{bar}| {percentage}% | {value}/{total} | ETA: {eta}s" }, cliProgress.Presets.shades_classic ); progressBar.start(records.length, 0); let completed = 0; const tasks = records.map((record) => limit(async () => { const result = await processRecord(record); appendResult(result); completed++; progressBar.update(completed); }) ); await Promise.all(tasks); progressBar.stop(); console.log("Processing complete"); } main().catch((err) => { console.error("Fatal error:", err); });