@task(inject_context=True)
def adaptive_doc(context, doc, model_ctx):
# まず文書構造を把握
analysis = analyze_document(doc)
# 最適な分割方式を選択
chunks = choose_chunker(analysis, model_ctx)(doc)
# チャンク内容に応じて「必要な処理タスク」を生成(動的ノード生成)
tasks = [create_chunk_task(chunk, i, model_ctx)
for i, chunk in enumerate(chunks)]
# fan-out / fan-in を1行で
context.next_task(
parallel(*tasks) >> aggregate(analysis, model_ctx)
)
@task(inject_context=True)
def aggregate(context, analysis, model_ctx, chunk_results):
report = merge_results(chunk_results)
critique = critique_agent(report, analysis)
if critique.verdict == "REVISE":
# 不合格チャンクだけ再処理タスクを動的生成(再帰)
redo_tasks = [create_chunk_task(
chunk, i, model_ctx,
feedback=critique.feedback_for(i)
)
for i in critique.failed_chunk_ids]
# 再処理結果で該当部分だけ差し替え → 再集約
pipeline = parallel(*redo_tasks)
>> partial_update(report, critique)
context.next_task(pipeline)
else:
context.next_task(output_report(report))
Last active
February 6, 2026 17:15
-
-
Save myui/545b38d89bf25210242a22036c4e6420 to your computer and use it in GitHub Desktop.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment