diff --git a/droidrun/agent/codeact/tools_agent.py b/droidrun/agent/codeact/tools_agent.py
index 0cc8896..cf417d9 100644
--- a/droidrun/agent/codeact/tools_agent.py
+++ b/droidrun/agent/codeact/tools_agent.py
@@ -37,6 +37,10 @@ from droidrun.agent.codeact.xml_parser import (
)
from droidrun.agent.common.constants import LLM_HISTORY_LIMIT
from droidrun.agent.common.events import RecordUIStateEvent, ScreenshotEvent
+from droidrun.agent.droid.events import (
+ ExternalUserMessageAppliedEvent,
+ ExternalUserMessageDroppedEvent,
+)
from droidrun.agent.usage import get_usage_from_response
from droidrun.agent.utils.chat_utils import limit_history
from droidrun.agent.utils.inference import acall_with_retries
@@ -211,6 +215,18 @@ class FastAgent(Workflow):
# Check then bump step counter
if self.shared_state.step_number >= self.max_steps:
+ pending = self.shared_state.drain_user_messages()
+ if pending:
+ logger.warning(
+ f"⚠️ Dropping {len(pending)} external user message(s) at max steps"
+ )
+ ctx.write_event_to_stream(
+ ExternalUserMessageDroppedEvent(
+ message_ids=[m.id for m in pending],
+ reason="max_steps_reached",
+ step_number=self.shared_state.step_number,
+ )
+ )
event = FastAgentEndEvent(
success=False,
reason=f"Reached max step count of {self.max_steps} steps",
@@ -451,6 +467,19 @@ class FastAgent(Workflow):
# Check if complete() was called successfully
if self.shared_state.finished:
+ if self.shared_state.pending_user_messages:
+ logger.info(
+ "⏸️ complete() called but external messages pending, continuing",
+ extra={"color": "cyan"},
+ )
+ self.shared_state.finished = False
+ self.shared_state.success = None
+ self.shared_state.answer = ""
+ results_xml = format_tool_results(results)
+ event = FastAgentOutputEvent(output=results_xml)
+ ctx.write_event_to_stream(event)
+ return event
+
logger.debug("✅ Task marked as complete via complete() tool")
success = (
@@ -493,7 +522,26 @@ class FastAgent(Workflow):
"""Add execution result to history and loop back."""
output = ev.output or "Tool executed, but produced no output."
- # Add results as user message
+ drained = self.shared_state.drain_user_messages()
+ if drained:
+ external_block = "\n".join(
+ f"\n{m.message}\n"
+ for m in drained
+ )
+ output += "\n" + external_block
+ logger.info(
+ f"📩 Applied {len(drained)} external user message(s)",
+ extra={"color": "cyan"},
+ )
+ ctx.write_event_to_stream(
+ ExternalUserMessageAppliedEvent(
+ message_ids=[m.id for m in drained],
+ consumer="fast_agent",
+ step_number=self.shared_state.step_number,
+ )
+ )
+
+ # Add results (+ any external messages) as a single user message
self.shared_state.message_history.append(
ChatMessage(role="user", content=output)
)
diff --git a/droidrun/agent/droid/droid_agent.py b/droidrun/agent/droid/droid_agent.py
index a5ec8c0..0f6c03b 100644
--- a/droidrun/agent/droid/droid_agent.py
+++ b/droidrun/agent/droid/droid_agent.py
@@ -27,6 +27,7 @@ from droidrun.agent.common.events import RecordUIStateEvent, ScreenshotEvent
from droidrun.agent.droid.events import (
ExecutorInputEvent,
ExecutorResultEvent,
+ ExternalUserMessageDroppedEvent,
FastAgentExecuteEvent,
FastAgentResultEvent,
FinalizeEvent,
@@ -38,7 +39,7 @@ from droidrun.agent.droid.events import (
TextManipulatorInputEvent,
TextManipulatorResultEvent,
)
-from droidrun.agent.droid.state import DroidAgentState
+from droidrun.agent.droid.state import DroidAgentState, QueuedUserMessage
from droidrun.agent.executor import ExecutorAgent
from droidrun.agent.external import load_agent
from droidrun.agent.manager import ManagerAgent, StatelessManagerAgent
@@ -591,6 +592,19 @@ class DroidAgent(Workflow):
ctx.write_event_to_stream(event)
return event
+ # ========================================================================
+ # External user message injection
+ # ========================================================================
+
+ def send_user_message(self, message: str) -> QueuedUserMessage:
+ queued = self.shared_state.queue_user_message(message)
+ logger.info(
+ f"📩 External user message queued [id={queued.id}] "
+ f"(queue length: {len(self.shared_state.pending_user_messages)})",
+ extra={"color": "cyan"},
+ )
+ return queued
+
# ========================================================================
# execute_task — FastAgent / CodeActAgent
# ========================================================================
@@ -702,6 +716,18 @@ class DroidAgent(Workflow):
"""Run Manager planning phase."""
if self.shared_state.step_number >= self.config.agent.max_steps:
logger.warning(f"⚠️ Reached maximum steps ({self.config.agent.max_steps})")
+ pending = self.shared_state.drain_user_messages()
+ if pending:
+ logger.warning(
+ f"⚠️ Dropping {len(pending)} external user message(s) at max steps"
+ )
+ ctx.write_event_to_stream(
+ ExternalUserMessageDroppedEvent(
+ message_ids=[m.id for m in pending],
+ reason="max_steps_reached",
+ step_number=self.shared_state.step_number,
+ )
+ )
return FinalizeEvent(
success=False,
reason=f"Reached maximum steps ({self.config.agent.max_steps})",
@@ -741,10 +767,18 @@ class DroidAgent(Workflow):
| ScripterExecutorInputEvent
| FinalizeEvent
| TextManipulatorInputEvent
+ | ManagerInputEvent
):
"""Process Manager output and decide next step."""
# Check for answer-type termination
if ev.answer.strip():
+ if self.shared_state.pending_user_messages:
+ logger.info(
+ "⏸️ Manager tried to finish but external messages pending, "
+ "looping back to Manager",
+ extra={"color": "cyan"},
+ )
+ return ManagerInputEvent()
success = ev.success if ev.success is not None else True
self.shared_state.progress_summary = f"Answer: {ev.answer}"
return FinalizeEvent(success=success, reason=ev.answer)
@@ -997,6 +1031,7 @@ class DroidAgent(Workflow):
@step
async def finalize(self, ctx: Context, ev: FinalizeEvent) -> ResultEvent:
+ self.shared_state.workflow_completed = True
ctx.write_event_to_stream(ev)
capture(
DroidAgentFinalizeEvent(
diff --git a/droidrun/agent/droid/events.py b/droidrun/agent/droid/events.py
index e069daf..8877d03 100644
--- a/droidrun/agent/droid/events.py
+++ b/droidrun/agent/droid/events.py
@@ -5,7 +5,7 @@ These events route between DroidAgent and child agents.
For internal agent events, see each agent's events.py file.
"""
-from typing import Dict, Optional
+from typing import Dict, List, Optional
from llama_index.core.workflow import Event, StopEvent
from pydantic import BaseModel
@@ -99,6 +99,23 @@ class TextManipulatorResultEvent(Event):
code_ran: str
+# ============================================================================
+# EXTERNAL USER MESSAGE EVENTS
+# ============================================================================
+
+
+class ExternalUserMessageAppliedEvent(Event):
+ message_ids: List[str]
+ consumer: str
+ step_number: int
+
+
+class ExternalUserMessageDroppedEvent(Event):
+ message_ids: List[str]
+ reason: str
+ step_number: int
+
+
# ============================================================================
# FINALIZATION EVENTS
# ============================================================================
diff --git a/droidrun/agent/droid/state.py b/droidrun/agent/droid/state.py
index 44513c3..c6507d0 100644
--- a/droidrun/agent/droid/state.py
+++ b/droidrun/agent/droid/state.py
@@ -1,6 +1,7 @@
from __future__ import annotations
from typing import Dict, List, Optional
+from uuid import uuid4
from llama_index.core.base.llms.types import ChatMessage
from pydantic import BaseModel, ConfigDict, Field
@@ -8,6 +9,12 @@ from pydantic import BaseModel, ConfigDict, Field
from droidrun.telemetry import PackageVisitEvent, capture
+class QueuedUserMessage(BaseModel):
+ id: str = Field(default_factory=lambda: str(uuid4()))
+ message: str
+ queued_at_step: int = 0
+
+
class DroidAgentState(BaseModel):
"""
State model for DroidAgent workflow - shared across parent and child workflows.
@@ -111,6 +118,12 @@ class DroidAgentState(BaseModel):
text_manipulation_history: List[Dict] = Field(default_factory=list)
last_text_manipulation_success: bool = False
+ # ========================================================================
+ # External User Messages (mid-run injection queue)
+ # ========================================================================
+ pending_user_messages: List[QueuedUserMessage] = Field(default_factory=list)
+ workflow_completed: bool = False
+
# ========================================================================
# Custom Variables (user-defined)
# ========================================================================
@@ -149,6 +162,22 @@ class DroidAgentState(BaseModel):
self.success = success
self.answer = answer or "Task completed successfully."
+ def queue_user_message(self, message: str) -> QueuedUserMessage:
+ if not message or not message.strip():
+ raise ValueError("Cannot queue an empty or whitespace-only message.")
+ if self.workflow_completed:
+ raise RuntimeError("Cannot queue messages: agent has already finished.")
+ queued = QueuedUserMessage(message=message, queued_at_step=self.step_number)
+ self.pending_user_messages.append(queued)
+ return queued
+
+ def drain_user_messages(self) -> list[QueuedUserMessage]:
+ if not self.pending_user_messages:
+ return []
+ messages = list(self.pending_user_messages)
+ self.pending_user_messages.clear()
+ return messages
+
def update_current_app(self, package_name: str, activity_name: str):
"""
Update package and activity together, capturing telemetry event only once.
diff --git a/droidrun/agent/manager/manager_agent.py b/droidrun/agent/manager/manager_agent.py
index 39f5e2c..e9d0a54 100644
--- a/droidrun/agent/manager/manager_agent.py
+++ b/droidrun/agent/manager/manager_agent.py
@@ -28,6 +28,7 @@ from opentelemetry import trace
from pydantic import BaseModel
from droidrun.agent.common.events import RecordUIStateEvent, ScreenshotEvent
+from droidrun.agent.droid.events import ExternalUserMessageAppliedEvent
from droidrun.agent.manager.events import (
ManagerContextEvent,
ManagerPlanDetailsEvent,
@@ -455,6 +456,26 @@ class ManagerAgent(Workflow):
# Build user message and add to history
user_content = self._build_user_message_content()
+
+ drained = self.shared_state.drain_user_messages()
+ if drained:
+ external_block = "\n".join(
+ f"\n{m.message}\n"
+ for m in drained
+ )
+ user_content += "\n" + external_block + "\n"
+ logger.info(
+ f"📩 Applied {len(drained)} external user message(s)",
+ extra={"color": "cyan"},
+ )
+ ctx.write_event_to_stream(
+ ExternalUserMessageAppliedEvent(
+ message_ids=[m.id for m in drained],
+ consumer="manager",
+ step_number=self.shared_state.step_number,
+ )
+ )
+
self.shared_state.message_history.append(
ChatMessage(role="user", content=user_content)
)
diff --git a/droidrun/config/prompts/codeact/tools_system.jinja2 b/droidrun/config/prompts/codeact/tools_system.jinja2
index 9c9cd31..22fe140 100644
--- a/droidrun/config/prompts/codeact/tools_system.jinja2
+++ b/droidrun/config/prompts/codeact/tools_system.jinja2
@@ -123,6 +123,7 @@ When done:
{% endif %}
- Use `complete` to signal task completion (success or failure)
- Use `remember` to store important information for later steps
+- Messages in `` tags are live corrections or new instructions from the user sent mid-execution. They override earlier assumptions — honor the most recent user guidance on your next step
{% if output_schema %}
diff --git a/droidrun/config/prompts/manager/system.jinja2 b/droidrun/config/prompts/manager/system.jinja2
index c098520..cbc8752 100644
--- a/droidrun/config/prompts/manager/system.jinja2
+++ b/droidrun/config/prompts/manager/system.jinja2
@@ -156,6 +156,8 @@ You should:
{% endif %}
---
+If an `` block is present, it contains live corrections or new instructions from the user sent during execution. Treat these as the newest guidance: revise your plan accordingly, but preserve already-completed progress unless the message explicitly changes course.
+
Carefully assess the current status and the provided screenshot. Check if the current plan needs to be revised.
Determine if the user request has been fully completed. If you are confident that no further actions are required and the task succeeded, use the request_accomplished tag with success="true" and a confirmation message. If the task failed and cannot be completed, use success="false" with an explanation. If the user request is not finished, update the plan and don't use the request_accomplished tag. If you are stuck with errors, think step by step about whether the overall plan needs to be revised to address the error.
NOTE: 1. If the current situation prevents proceeding with the original plan or requires clarification from the user, make reasonable assumptions and revise the plan accordingly. Act as though you are the user in such cases. 2. Please refer to the helpful information and steps in the Guidelines first for planning. 3. If the first subgoal in plan has been completed, please update the plan in time according to the screenshot and progress to ensure that the next subgoal is always the first item in the plan. 4. If the first subgoal is not completed, please copy the previous round's plan or update the plan based on the completion of the subgoal.