From 039b76965bdcce2c1689d80e5c83b68874f542b7 Mon Sep 17 00:00:00 2001 From: johnmalek312 Date: Mon, 9 Mar 2026 17:45:29 +1100 Subject: [PATCH 1/7] feat: add mid-run external user message injection Allow callers to inject user messages into a running DroidAgent via handler.send_event(ExternalUserMessageEvent(message="...")). Messages are queued in shared state and drained at safe checkpoints: - FastAgent: after tool results, before next LLM call - ManagerAgent: merged into synthesized user turn in prepare_context Adds ExternalUserMessageEvent, QueuedEvent, and AppliedEvent for observability. Updates FastAgent and Manager system prompts to instruct the LLM to honor mid-execution user corrections. --- droidrun/agent/codeact/tools_agent.py | 25 +++++++++++- droidrun/agent/droid/droid_agent.py | 29 ++++++++++++++ droidrun/agent/droid/events.py | 40 +++++++++++++++++++ droidrun/agent/droid/state.py | 26 ++++++++++++ droidrun/agent/manager/manager_agent.py | 22 ++++++++++ .../prompts/codeact/tools_system.jinja2 | 1 + droidrun/config/prompts/manager/system.jinja2 | 2 + 7 files changed, 144 insertions(+), 1 deletion(-) diff --git a/droidrun/agent/codeact/tools_agent.py b/droidrun/agent/codeact/tools_agent.py index 0cc8896..dbab6aa 100644 --- a/droidrun/agent/codeact/tools_agent.py +++ b/droidrun/agent/codeact/tools_agent.py @@ -37,6 +37,7 @@ from droidrun.agent.codeact.xml_parser import ( ) from droidrun.agent.common.constants import LLM_HISTORY_LIMIT from droidrun.agent.common.events import RecordUIStateEvent, ScreenshotEvent +from droidrun.agent.droid.events import ExternalUserMessageAppliedEvent from droidrun.agent.usage import get_usage_from_response from droidrun.agent.utils.chat_utils import limit_history from droidrun.agent.utils.inference import acall_with_retries @@ -490,7 +491,7 @@ class FastAgent(Workflow): async def handle_execution_result( self, ctx: Context, ev: FastAgentOutputEvent ) -> FastAgentInputEvent: - """Add execution result to history and loop back.""" + """Add execution result to history, drain external user messages, and loop back.""" output = ev.output or "Tool executed, but produced no output." # Add results as user message @@ -498,6 +499,28 @@ class FastAgent(Workflow): ChatMessage(role="user", content=output) ) + # Drain any external user messages queued during tool execution + drained = self.shared_state.drain_user_messages() + if drained: + for msg in drained: + self.shared_state.message_history.append( + ChatMessage( + role="user", + content=f"\n{msg}\n", + ) + ) + logger.info( + f"πŸ“© Applied {len(drained)} external user message(s)", + extra={"color": "cyan"}, + ) + ctx.write_event_to_stream( + ExternalUserMessageAppliedEvent( + count=len(drained), + consumer="fast_agent", + step_number=self.shared_state.step_number, + ) + ) + return FastAgentInputEvent() @step diff --git a/droidrun/agent/droid/droid_agent.py b/droidrun/agent/droid/droid_agent.py index a5ec8c0..395baf2 100644 --- a/droidrun/agent/droid/droid_agent.py +++ b/droidrun/agent/droid/droid_agent.py @@ -27,6 +27,8 @@ from droidrun.agent.common.events import RecordUIStateEvent, ScreenshotEvent from droidrun.agent.droid.events import ( ExecutorInputEvent, ExecutorResultEvent, + ExternalUserMessageEvent, + ExternalUserMessageQueuedEvent, FastAgentExecuteEvent, FastAgentResultEvent, FinalizeEvent, @@ -591,6 +593,33 @@ class DroidAgent(Workflow): ctx.write_event_to_stream(event) return event + # ======================================================================== + # External user message ingestion + # ======================================================================== + + @step + async def ingest_external_user_message( + self, ctx: Context, ev: ExternalUserMessageEvent + ) -> None: + """Accept an external user message and queue it in shared state. + + This step runs any time during the workflow. It does NOT touch + message_history directly β€” the active agent loop drains the queue + at its next safe checkpoint. + """ + queue_len = self.shared_state.queue_user_message(ev.message) + logger.info( + f"πŸ“© External user message queued (queue length: {queue_len})", + extra={"color": "cyan"}, + ) + ctx.write_event_to_stream( + ExternalUserMessageQueuedEvent( + message=ev.message, + queue_length=queue_len, + step_number=self.shared_state.step_number, + ) + ) + # ======================================================================== # execute_task β€” FastAgent / CodeActAgent # ======================================================================== diff --git a/droidrun/agent/droid/events.py b/droidrun/agent/droid/events.py index e069daf..60928fa 100644 --- a/droidrun/agent/droid/events.py +++ b/droidrun/agent/droid/events.py @@ -99,6 +99,46 @@ class TextManipulatorResultEvent(Event): code_ran: str +# ============================================================================ +# EXTERNAL USER MESSAGE EVENTS +# ============================================================================ + + +class ExternalUserMessageEvent(Event): + """Sent by the caller to inject a user message into the running agent loop. + + Usage:: + + handler = agent.run() + await handler.send_event( + ExternalUserMessageEvent(message="Actually use Chrome"), + step="ingest_external_user_message", + ) + + The message is queued in shared state and drained at the next safe + checkpoint (after tool results in direct mode, or at Manager's + prepare_context in reasoning mode). + """ + + message: str + + +class ExternalUserMessageQueuedEvent(Event): + """Streamed to the caller when an external message is accepted into the queue.""" + + message: str + queue_length: int + step_number: int + + +class ExternalUserMessageAppliedEvent(Event): + """Streamed when queued external messages are drained into the agent loop.""" + + count: int + consumer: str # "fast_agent" or "manager" + step_number: int + + # ============================================================================ # FINALIZATION EVENTS # ============================================================================ diff --git a/droidrun/agent/droid/state.py b/droidrun/agent/droid/state.py index 44513c3..2257ec2 100644 --- a/droidrun/agent/droid/state.py +++ b/droidrun/agent/droid/state.py @@ -111,6 +111,11 @@ class DroidAgentState(BaseModel): text_manipulation_history: List[Dict] = Field(default_factory=list) last_text_manipulation_success: bool = False + # ======================================================================== + # External User Messages (mid-run injection queue) + # ======================================================================== + pending_user_messages: List[str] = Field(default_factory=list) + # ======================================================================== # Custom Variables (user-defined) # ======================================================================== @@ -149,6 +154,27 @@ class DroidAgentState(BaseModel): self.success = success self.answer = answer or "Task completed successfully." + def queue_user_message(self, message: str) -> int: + """Append an external user message to the pending queue. + + Returns: + Current queue length after appending. + """ + self.pending_user_messages.append(message) + return len(self.pending_user_messages) + + def drain_user_messages(self) -> list[str]: + """Drain and return all pending user messages, clearing the queue. + + Returns: + List of messages in FIFO order (empty list if none queued). + """ + if not self.pending_user_messages: + return [] + messages = list(self.pending_user_messages) + self.pending_user_messages.clear() + return messages + def update_current_app(self, package_name: str, activity_name: str): """ Update package and activity together, capturing telemetry event only once. diff --git a/droidrun/agent/manager/manager_agent.py b/droidrun/agent/manager/manager_agent.py index 39f5e2c..de298d5 100644 --- a/droidrun/agent/manager/manager_agent.py +++ b/droidrun/agent/manager/manager_agent.py @@ -28,6 +28,7 @@ from opentelemetry import trace from pydantic import BaseModel from droidrun.agent.common.events import RecordUIStateEvent, ScreenshotEvent +from droidrun.agent.droid.events import ExternalUserMessageAppliedEvent from droidrun.agent.manager.events import ( ManagerContextEvent, ManagerPlanDetailsEvent, @@ -455,6 +456,27 @@ class ManagerAgent(Workflow): # Build user message and add to history user_content = self._build_user_message_content() + + # Drain any external user messages and merge into this turn + drained = self.shared_state.drain_user_messages() + if drained: + block_lines = [""] + for msg in drained: + block_lines.append(msg) + block_lines.append("") + user_content += "\n" + "\n".join(block_lines) + "\n" + logger.info( + f"πŸ“© Applied {len(drained)} external user message(s)", + extra={"color": "cyan"}, + ) + ctx.write_event_to_stream( + ExternalUserMessageAppliedEvent( + count=len(drained), + consumer="manager", + step_number=self.shared_state.step_number, + ) + ) + self.shared_state.message_history.append( ChatMessage(role="user", content=user_content) ) diff --git a/droidrun/config/prompts/codeact/tools_system.jinja2 b/droidrun/config/prompts/codeact/tools_system.jinja2 index 9c9cd31..22fe140 100644 --- a/droidrun/config/prompts/codeact/tools_system.jinja2 +++ b/droidrun/config/prompts/codeact/tools_system.jinja2 @@ -123,6 +123,7 @@ When done: {% endif %} - Use `complete` to signal task completion (success or failure) - Use `remember` to store important information for later steps +- Messages in `` tags are live corrections or new instructions from the user sent mid-execution. They override earlier assumptions β€” honor the most recent user guidance on your next step {% if output_schema %} diff --git a/droidrun/config/prompts/manager/system.jinja2 b/droidrun/config/prompts/manager/system.jinja2 index c098520..cffd53d 100644 --- a/droidrun/config/prompts/manager/system.jinja2 +++ b/droidrun/config/prompts/manager/system.jinja2 @@ -156,6 +156,8 @@ You should: {% endif %} --- +If an `` block is present, it contains live corrections or new instructions from the user sent during execution. Treat these as the newest guidance: revise your plan accordingly, but preserve already-completed progress unless the message explicitly changes course. + Carefully assess the current status and the provided screenshot. Check if the current plan needs to be revised. Determine if the user request has been fully completed. If you are confident that no further actions are required and the task succeeded, use the request_accomplished tag with success="true" and a confirmation message. If the task failed and cannot be completed, use success="false" with an explanation. If the user request is not finished, update the plan and don't use the request_accomplished tag. If you are stuck with errors, think step by step about whether the overall plan needs to be revised to address the error. NOTE: 1. If the current situation prevents proceeding with the original plan or requires clarification from the user, make reasonable assumptions and revise the plan accordingly. Act as though you are the user in such cases. 2. Please refer to the helpful information and steps in the Guidelines first for planning. 3. If the first subgoal in plan has been completed, please update the plan in time according to the screenshot and progress to ensure that the next subgoal is always the first item in the plan. 4. If the first subgoal is not completed, please copy the previous round's plan or update the plan based on the completion of the subgoal. From 5aeae4789e5fab71a6594a978cbf8e823e6611e6 Mon Sep 17 00:00:00 2001 From: johnmalek312 Date: Mon, 9 Mar 2026 18:09:19 +1100 Subject: [PATCH 2/7] feat: add message IDs, completion guards, and dropped message handling - QueuedUserMessage dataclass with uuid4 ID and queued_at_step - Events now carry message IDs for end-to-end tracking - ExternalUserMessageDroppedEvent for max-steps scenario - FastAgent complete() with pending messages: reset and continue loop so LLM sees its own complete() result alongside the user's message - Manager request_accomplished with pending messages: loop back to ManagerInputEvent instead of finalizing - Max steps: drain queue, emit DroppedEvent, finalize as normal - workflow_completed flag on shared state, guarding queue_user_message - Fix device-state injection bug: merge external messages into tool results user message instead of appending separate ChatMessages --- droidrun/agent/codeact/tools_agent.py | 64 +++++++++++++++++++------ droidrun/agent/droid/droid_agent.py | 33 +++++++++++-- droidrun/agent/droid/events.py | 13 ++++- droidrun/agent/droid/state.py | 34 +++++++++---- droidrun/agent/manager/manager_agent.py | 11 +++-- 5 files changed, 122 insertions(+), 33 deletions(-) diff --git a/droidrun/agent/codeact/tools_agent.py b/droidrun/agent/codeact/tools_agent.py index dbab6aa..a63ba86 100644 --- a/droidrun/agent/codeact/tools_agent.py +++ b/droidrun/agent/codeact/tools_agent.py @@ -37,7 +37,10 @@ from droidrun.agent.codeact.xml_parser import ( ) from droidrun.agent.common.constants import LLM_HISTORY_LIMIT from droidrun.agent.common.events import RecordUIStateEvent, ScreenshotEvent -from droidrun.agent.droid.events import ExternalUserMessageAppliedEvent +from droidrun.agent.droid.events import ( + ExternalUserMessageAppliedEvent, + ExternalUserMessageDroppedEvent, +) from droidrun.agent.usage import get_usage_from_response from droidrun.agent.utils.chat_utils import limit_history from droidrun.agent.utils.inference import acall_with_retries @@ -212,6 +215,19 @@ class FastAgent(Workflow): # Check then bump step counter if self.shared_state.step_number >= self.max_steps: + # Drop any pending external messages + pending = self.shared_state.drain_user_messages() + if pending: + logger.warning( + f"⚠️ Dropping {len(pending)} external user message(s) at max steps" + ) + ctx.write_event_to_stream( + ExternalUserMessageDroppedEvent( + message_ids=[m.id for m in pending], + reason="max_steps_reached", + step_number=self.shared_state.step_number, + ) + ) event = FastAgentEndEvent( success=False, reason=f"Reached max step count of {self.max_steps} steps", @@ -452,6 +468,23 @@ class FastAgent(Workflow): # Check if complete() was called successfully if self.shared_state.finished: + # If there are pending external user messages, don't finish β€” + # reset completion state, format results so far, and continue + # the loop so the LLM sees its own complete() result alongside + # the user's new message. + if self.shared_state.pending_user_messages: + logger.info( + "⏸️ complete() called but external messages pending, continuing", + extra={"color": "cyan"}, + ) + self.shared_state.finished = False + self.shared_state.success = None + self.shared_state.answer = "" + results_xml = format_tool_results(results) + event = FastAgentOutputEvent(output=results_xml) + ctx.write_event_to_stream(event) + return event + logger.debug("βœ… Task marked as complete via complete() tool") success = ( @@ -494,33 +527,34 @@ class FastAgent(Workflow): """Add execution result to history, drain external user messages, and loop back.""" output = ev.output or "Tool executed, but produced no output." - # Add results as user message - self.shared_state.message_history.append( - ChatMessage(role="user", content=output) - ) - - # Drain any external user messages queued during tool execution + # Drain any external user messages queued during tool execution. + # Merged into the same user message as tool results so that + # handle_llm_input's ephemeral device-state injection (which targets + # the last user message) lands on the correct message. drained = self.shared_state.drain_user_messages() if drained: - for msg in drained: - self.shared_state.message_history.append( - ChatMessage( - role="user", - content=f"\n{msg}\n", - ) - ) + external_block = "\n".join( + f"\n{m.message}\n" + for m in drained + ) + output += "\n" + external_block logger.info( f"πŸ“© Applied {len(drained)} external user message(s)", extra={"color": "cyan"}, ) ctx.write_event_to_stream( ExternalUserMessageAppliedEvent( - count=len(drained), + message_ids=[m.id for m in drained], consumer="fast_agent", step_number=self.shared_state.step_number, ) ) + # Add results (+ any external messages) as a single user message + self.shared_state.message_history.append( + ChatMessage(role="user", content=output) + ) + return FastAgentInputEvent() @step diff --git a/droidrun/agent/droid/droid_agent.py b/droidrun/agent/droid/droid_agent.py index 395baf2..24909fd 100644 --- a/droidrun/agent/droid/droid_agent.py +++ b/droidrun/agent/droid/droid_agent.py @@ -27,6 +27,7 @@ from droidrun.agent.common.events import RecordUIStateEvent, ScreenshotEvent from droidrun.agent.droid.events import ( ExecutorInputEvent, ExecutorResultEvent, + ExternalUserMessageDroppedEvent, ExternalUserMessageEvent, ExternalUserMessageQueuedEvent, FastAgentExecuteEvent, @@ -607,15 +608,17 @@ class DroidAgent(Workflow): message_history directly β€” the active agent loop drains the queue at its next safe checkpoint. """ - queue_len = self.shared_state.queue_user_message(ev.message) + queued = self.shared_state.queue_user_message(ev.message) logger.info( - f"πŸ“© External user message queued (queue length: {queue_len})", + f"πŸ“© External user message queued [id={queued.id}] " + f"(queue length: {len(self.shared_state.pending_user_messages)})", extra={"color": "cyan"}, ) ctx.write_event_to_stream( ExternalUserMessageQueuedEvent( + message_id=queued.id, message=ev.message, - queue_length=queue_len, + queue_length=len(self.shared_state.pending_user_messages), step_number=self.shared_state.step_number, ) ) @@ -731,6 +734,19 @@ class DroidAgent(Workflow): """Run Manager planning phase.""" if self.shared_state.step_number >= self.config.agent.max_steps: logger.warning(f"⚠️ Reached maximum steps ({self.config.agent.max_steps})") + # Drop any pending external messages + pending = self.shared_state.drain_user_messages() + if pending: + logger.warning( + f"⚠️ Dropping {len(pending)} external user message(s) at max steps" + ) + ctx.write_event_to_stream( + ExternalUserMessageDroppedEvent( + message_ids=[m.id for m in pending], + reason="max_steps_reached", + step_number=self.shared_state.step_number, + ) + ) return FinalizeEvent( success=False, reason=f"Reached maximum steps ({self.config.agent.max_steps})", @@ -770,10 +786,20 @@ class DroidAgent(Workflow): | ScripterExecutorInputEvent | FinalizeEvent | TextManipulatorInputEvent + | ManagerInputEvent ): """Process Manager output and decide next step.""" # Check for answer-type termination if ev.answer.strip(): + # If there are pending external messages, don't finalize β€” loop back + # so Manager sees the new user guidance on the next cycle. + if self.shared_state.pending_user_messages: + logger.info( + "⏸️ Manager tried to finish but external messages pending, " + "looping back to Manager", + extra={"color": "cyan"}, + ) + return ManagerInputEvent() success = ev.success if ev.success is not None else True self.shared_state.progress_summary = f"Answer: {ev.answer}" return FinalizeEvent(success=success, reason=ev.answer) @@ -1026,6 +1052,7 @@ class DroidAgent(Workflow): @step async def finalize(self, ctx: Context, ev: FinalizeEvent) -> ResultEvent: + self.shared_state.workflow_completed = True ctx.write_event_to_stream(ev) capture( DroidAgentFinalizeEvent( diff --git a/droidrun/agent/droid/events.py b/droidrun/agent/droid/events.py index 60928fa..fc6dded 100644 --- a/droidrun/agent/droid/events.py +++ b/droidrun/agent/droid/events.py @@ -5,7 +5,7 @@ These events route between DroidAgent and child agents. For internal agent events, see each agent's events.py file. """ -from typing import Dict, Optional +from typing import Dict, List, Optional from llama_index.core.workflow import Event, StopEvent from pydantic import BaseModel @@ -126,6 +126,7 @@ class ExternalUserMessageEvent(Event): class ExternalUserMessageQueuedEvent(Event): """Streamed to the caller when an external message is accepted into the queue.""" + message_id: str message: str queue_length: int step_number: int @@ -134,11 +135,19 @@ class ExternalUserMessageQueuedEvent(Event): class ExternalUserMessageAppliedEvent(Event): """Streamed when queued external messages are drained into the agent loop.""" - count: int + message_ids: List[str] consumer: str # "fast_agent" or "manager" step_number: int +class ExternalUserMessageDroppedEvent(Event): + """Streamed when queued messages are dropped without being processed.""" + + message_ids: List[str] + reason: str # e.g. "max_steps_reached" + step_number: int + + # ============================================================================ # FINALIZATION EVENTS # ============================================================================ diff --git a/droidrun/agent/droid/state.py b/droidrun/agent/droid/state.py index 2257ec2..bcce85d 100644 --- a/droidrun/agent/droid/state.py +++ b/droidrun/agent/droid/state.py @@ -1,6 +1,7 @@ from __future__ import annotations from typing import Dict, List, Optional +from uuid import uuid4 from llama_index.core.base.llms.types import ChatMessage from pydantic import BaseModel, ConfigDict, Field @@ -8,6 +9,14 @@ from pydantic import BaseModel, ConfigDict, Field from droidrun.telemetry import PackageVisitEvent, capture +class QueuedUserMessage(BaseModel): + """A user message queued for injection into the running agent loop.""" + + id: str = Field(default_factory=lambda: str(uuid4())) + message: str + queued_at_step: int = 0 + + class DroidAgentState(BaseModel): """ State model for DroidAgent workflow - shared across parent and child workflows. @@ -114,7 +123,8 @@ class DroidAgentState(BaseModel): # ======================================================================== # External User Messages (mid-run injection queue) # ======================================================================== - pending_user_messages: List[str] = Field(default_factory=list) + pending_user_messages: List[QueuedUserMessage] = Field(default_factory=list) + workflow_completed: bool = False # ======================================================================== # Custom Variables (user-defined) @@ -154,20 +164,26 @@ class DroidAgentState(BaseModel): self.success = success self.answer = answer or "Task completed successfully." - def queue_user_message(self, message: str) -> int: + def queue_user_message(self, message: str) -> QueuedUserMessage: """Append an external user message to the pending queue. - Returns: - Current queue length after appending. - """ - self.pending_user_messages.append(message) - return len(self.pending_user_messages) + Raises: + RuntimeError: If the workflow has already completed. - def drain_user_messages(self) -> list[str]: + Returns: + The queued message object (includes generated ID). + """ + if self.workflow_completed: + raise RuntimeError("Cannot queue messages: agent has already finished.") + queued = QueuedUserMessage(message=message, queued_at_step=self.step_number) + self.pending_user_messages.append(queued) + return queued + + def drain_user_messages(self) -> list[QueuedUserMessage]: """Drain and return all pending user messages, clearing the queue. Returns: - List of messages in FIFO order (empty list if none queued). + List of QueuedUserMessage in FIFO order (empty list if none queued). """ if not self.pending_user_messages: return [] diff --git a/droidrun/agent/manager/manager_agent.py b/droidrun/agent/manager/manager_agent.py index de298d5..a4bd577 100644 --- a/droidrun/agent/manager/manager_agent.py +++ b/droidrun/agent/manager/manager_agent.py @@ -457,12 +457,15 @@ class ManagerAgent(Workflow): # Build user message and add to history user_content = self._build_user_message_content() - # Drain any external user messages and merge into this turn + # Drain any external user messages and merge into this turn. + # Merged into the same user message (not appended separately) so that + # _build_messages_with_context's "last user message gets device_state" + # logic targets the correct message. drained = self.shared_state.drain_user_messages() if drained: block_lines = [""] - for msg in drained: - block_lines.append(msg) + for m in drained: + block_lines.append(m.message) block_lines.append("") user_content += "\n" + "\n".join(block_lines) + "\n" logger.info( @@ -471,7 +474,7 @@ class ManagerAgent(Workflow): ) ctx.write_event_to_stream( ExternalUserMessageAppliedEvent( - count=len(drained), + message_ids=[m.id for m in drained], consumer="manager", step_number=self.shared_state.step_number, ) From cac887c8fd2d97fefefecbfb0446c02b7c536307 Mon Sep 17 00:00:00 2001 From: johnmalek312 Date: Mon, 9 Mar 2026 18:41:14 +1100 Subject: [PATCH 3/7] fix: replace workflow step with direct method for message injection WorkflowValidationError: llama-index validates that all consumed events are produced by some step. ExternalUserMessageEvent is external-only, so the @step approach fails validation. Replace with DroidAgent.send_user_message(text) -> QueuedUserMessage. Directly queues on shared state, no event routing needed. Drain points in FastAgent and Manager still emit Applied/Dropped stream events. --- droidrun/agent/droid/droid_agent.py | 40 +++++++++++----------- tests/test_external_message.py | 53 +++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 20 deletions(-) create mode 100644 tests/test_external_message.py diff --git a/droidrun/agent/droid/droid_agent.py b/droidrun/agent/droid/droid_agent.py index 24909fd..25aa894 100644 --- a/droidrun/agent/droid/droid_agent.py +++ b/droidrun/agent/droid/droid_agent.py @@ -28,8 +28,6 @@ from droidrun.agent.droid.events import ( ExecutorInputEvent, ExecutorResultEvent, ExternalUserMessageDroppedEvent, - ExternalUserMessageEvent, - ExternalUserMessageQueuedEvent, FastAgentExecuteEvent, FastAgentResultEvent, FinalizeEvent, @@ -595,33 +593,35 @@ class DroidAgent(Workflow): return event # ======================================================================== - # External user message ingestion + # External user message injection # ======================================================================== - @step - async def ingest_external_user_message( - self, ctx: Context, ev: ExternalUserMessageEvent - ) -> None: - """Accept an external user message and queue it in shared state. + def send_user_message(self, message: str) -> "QueuedUserMessage": + """Inject a user message into the running agent loop. - This step runs any time during the workflow. It does NOT touch - message_history directly β€” the active agent loop drains the queue - at its next safe checkpoint. + Thread-safe to call from any context while the agent is running. + The message is queued in shared state and drained at the next safe + checkpoint (after tool results in direct mode, or at Manager's + prepare_context in reasoning mode). + + Args: + message: The user's message text. + + Returns: + QueuedUserMessage with a unique ID for tracking. + + Raises: + RuntimeError: If the workflow has already completed. """ - queued = self.shared_state.queue_user_message(ev.message) + from droidrun.agent.droid.state import QueuedUserMessage + + queued = self.shared_state.queue_user_message(message) logger.info( f"πŸ“© External user message queued [id={queued.id}] " f"(queue length: {len(self.shared_state.pending_user_messages)})", extra={"color": "cyan"}, ) - ctx.write_event_to_stream( - ExternalUserMessageQueuedEvent( - message_id=queued.id, - message=ev.message, - queue_length=len(self.shared_state.pending_user_messages), - step_number=self.shared_state.step_number, - ) - ) + return queued # ======================================================================== # execute_task β€” FastAgent / CodeActAgent diff --git a/tests/test_external_message.py b/tests/test_external_message.py new file mode 100644 index 0000000..ece0247 --- /dev/null +++ b/tests/test_external_message.py @@ -0,0 +1,53 @@ +"""Test mid-run external user message injection. + +Run with: python tests/test_external_message.py +Requires a connected Android device and configured LLM in ~/.config/droidrun/config.yaml +""" + +import asyncio + +from droidrun.agent.droid import DroidAgent +from droidrun.agent.droid.events import ( + ExternalUserMessageAppliedEvent, + ExternalUserMessageDroppedEvent, +) +from droidrun.config_manager.loader import ConfigLoader + + +async def main(): + config = ConfigLoader.load() + agent = DroidAgent( + goal="Open the settings app and check the android version", config=config + ) + handler = agent.run() + + async def inject_after_delay(): + await asyncio.sleep(10) + print( + "\n>>> Sending: 'Actually open Chrome and go to google.com'\n" + ) + queued = agent.send_user_message( + "Actually open Chrome and go to google.com" + ) + print(f"[QUEUED] id={queued.id}") + + task = asyncio.create_task(inject_after_delay()) + + async for ev in handler.stream_events(): + if isinstance(ev, ExternalUserMessageAppliedEvent): + print( + f"[APPLIED] ids={ev.message_ids} consumer={ev.consumer} step={ev.step_number}" + ) + elif isinstance(ev, ExternalUserMessageDroppedEvent): + print(f"[DROPPED] ids={ev.message_ids} reason={ev.reason}") + + result = await handler + task.cancel() + + print( + f"\nResult: success={result.success} reason={result.reason} steps={result.steps}" + ) + + +if __name__ == "__main__": + asyncio.run(main()) From 1f9998752c501a01c244fe3285a5ac01c48f5ef3 Mon Sep 17 00:00:00 2001 From: johnmalek312 Date: Mon, 9 Mar 2026 18:56:57 +1100 Subject: [PATCH 4/7] chore: remove verbose comments and delete test_external_message.py --- droidrun/agent/codeact/tools_agent.py | 11 +---- droidrun/agent/droid/droid_agent.py | 19 --------- droidrun/agent/droid/events.py | 25 +----------- droidrun/agent/droid/state.py | 15 ------- droidrun/agent/manager/manager_agent.py | 4 -- tests/test_external_message.py | 53 ------------------------- 6 files changed, 3 insertions(+), 124 deletions(-) delete mode 100644 tests/test_external_message.py diff --git a/droidrun/agent/codeact/tools_agent.py b/droidrun/agent/codeact/tools_agent.py index a63ba86..cf417d9 100644 --- a/droidrun/agent/codeact/tools_agent.py +++ b/droidrun/agent/codeact/tools_agent.py @@ -215,7 +215,6 @@ class FastAgent(Workflow): # Check then bump step counter if self.shared_state.step_number >= self.max_steps: - # Drop any pending external messages pending = self.shared_state.drain_user_messages() if pending: logger.warning( @@ -468,10 +467,6 @@ class FastAgent(Workflow): # Check if complete() was called successfully if self.shared_state.finished: - # If there are pending external user messages, don't finish β€” - # reset completion state, format results so far, and continue - # the loop so the LLM sees its own complete() result alongside - # the user's new message. if self.shared_state.pending_user_messages: logger.info( "⏸️ complete() called but external messages pending, continuing", @@ -524,13 +519,9 @@ class FastAgent(Workflow): async def handle_execution_result( self, ctx: Context, ev: FastAgentOutputEvent ) -> FastAgentInputEvent: - """Add execution result to history, drain external user messages, and loop back.""" + """Add execution result to history and loop back.""" output = ev.output or "Tool executed, but produced no output." - # Drain any external user messages queued during tool execution. - # Merged into the same user message as tool results so that - # handle_llm_input's ephemeral device-state injection (which targets - # the last user message) lands on the correct message. drained = self.shared_state.drain_user_messages() if drained: external_block = "\n".join( diff --git a/droidrun/agent/droid/droid_agent.py b/droidrun/agent/droid/droid_agent.py index 25aa894..90ac5ba 100644 --- a/droidrun/agent/droid/droid_agent.py +++ b/droidrun/agent/droid/droid_agent.py @@ -597,22 +597,6 @@ class DroidAgent(Workflow): # ======================================================================== def send_user_message(self, message: str) -> "QueuedUserMessage": - """Inject a user message into the running agent loop. - - Thread-safe to call from any context while the agent is running. - The message is queued in shared state and drained at the next safe - checkpoint (after tool results in direct mode, or at Manager's - prepare_context in reasoning mode). - - Args: - message: The user's message text. - - Returns: - QueuedUserMessage with a unique ID for tracking. - - Raises: - RuntimeError: If the workflow has already completed. - """ from droidrun.agent.droid.state import QueuedUserMessage queued = self.shared_state.queue_user_message(message) @@ -734,7 +718,6 @@ class DroidAgent(Workflow): """Run Manager planning phase.""" if self.shared_state.step_number >= self.config.agent.max_steps: logger.warning(f"⚠️ Reached maximum steps ({self.config.agent.max_steps})") - # Drop any pending external messages pending = self.shared_state.drain_user_messages() if pending: logger.warning( @@ -791,8 +774,6 @@ class DroidAgent(Workflow): """Process Manager output and decide next step.""" # Check for answer-type termination if ev.answer.strip(): - # If there are pending external messages, don't finalize β€” loop back - # so Manager sees the new user guidance on the next cycle. if self.shared_state.pending_user_messages: logger.info( "⏸️ Manager tried to finish but external messages pending, " diff --git a/droidrun/agent/droid/events.py b/droidrun/agent/droid/events.py index fc6dded..c7b6dee 100644 --- a/droidrun/agent/droid/events.py +++ b/droidrun/agent/droid/events.py @@ -105,27 +105,10 @@ class TextManipulatorResultEvent(Event): class ExternalUserMessageEvent(Event): - """Sent by the caller to inject a user message into the running agent loop. - - Usage:: - - handler = agent.run() - await handler.send_event( - ExternalUserMessageEvent(message="Actually use Chrome"), - step="ingest_external_user_message", - ) - - The message is queued in shared state and drained at the next safe - checkpoint (after tool results in direct mode, or at Manager's - prepare_context in reasoning mode). - """ - message: str class ExternalUserMessageQueuedEvent(Event): - """Streamed to the caller when an external message is accepted into the queue.""" - message_id: str message: str queue_length: int @@ -133,18 +116,14 @@ class ExternalUserMessageQueuedEvent(Event): class ExternalUserMessageAppliedEvent(Event): - """Streamed when queued external messages are drained into the agent loop.""" - message_ids: List[str] - consumer: str # "fast_agent" or "manager" + consumer: str step_number: int class ExternalUserMessageDroppedEvent(Event): - """Streamed when queued messages are dropped without being processed.""" - message_ids: List[str] - reason: str # e.g. "max_steps_reached" + reason: str step_number: int diff --git a/droidrun/agent/droid/state.py b/droidrun/agent/droid/state.py index bcce85d..6a8b38d 100644 --- a/droidrun/agent/droid/state.py +++ b/droidrun/agent/droid/state.py @@ -10,8 +10,6 @@ from droidrun.telemetry import PackageVisitEvent, capture class QueuedUserMessage(BaseModel): - """A user message queued for injection into the running agent loop.""" - id: str = Field(default_factory=lambda: str(uuid4())) message: str queued_at_step: int = 0 @@ -165,14 +163,6 @@ class DroidAgentState(BaseModel): self.answer = answer or "Task completed successfully." def queue_user_message(self, message: str) -> QueuedUserMessage: - """Append an external user message to the pending queue. - - Raises: - RuntimeError: If the workflow has already completed. - - Returns: - The queued message object (includes generated ID). - """ if self.workflow_completed: raise RuntimeError("Cannot queue messages: agent has already finished.") queued = QueuedUserMessage(message=message, queued_at_step=self.step_number) @@ -180,11 +170,6 @@ class DroidAgentState(BaseModel): return queued def drain_user_messages(self) -> list[QueuedUserMessage]: - """Drain and return all pending user messages, clearing the queue. - - Returns: - List of QueuedUserMessage in FIFO order (empty list if none queued). - """ if not self.pending_user_messages: return [] messages = list(self.pending_user_messages) diff --git a/droidrun/agent/manager/manager_agent.py b/droidrun/agent/manager/manager_agent.py index a4bd577..661c567 100644 --- a/droidrun/agent/manager/manager_agent.py +++ b/droidrun/agent/manager/manager_agent.py @@ -457,10 +457,6 @@ class ManagerAgent(Workflow): # Build user message and add to history user_content = self._build_user_message_content() - # Drain any external user messages and merge into this turn. - # Merged into the same user message (not appended separately) so that - # _build_messages_with_context's "last user message gets device_state" - # logic targets the correct message. drained = self.shared_state.drain_user_messages() if drained: block_lines = [""] diff --git a/tests/test_external_message.py b/tests/test_external_message.py deleted file mode 100644 index ece0247..0000000 --- a/tests/test_external_message.py +++ /dev/null @@ -1,53 +0,0 @@ -"""Test mid-run external user message injection. - -Run with: python tests/test_external_message.py -Requires a connected Android device and configured LLM in ~/.config/droidrun/config.yaml -""" - -import asyncio - -from droidrun.agent.droid import DroidAgent -from droidrun.agent.droid.events import ( - ExternalUserMessageAppliedEvent, - ExternalUserMessageDroppedEvent, -) -from droidrun.config_manager.loader import ConfigLoader - - -async def main(): - config = ConfigLoader.load() - agent = DroidAgent( - goal="Open the settings app and check the android version", config=config - ) - handler = agent.run() - - async def inject_after_delay(): - await asyncio.sleep(10) - print( - "\n>>> Sending: 'Actually open Chrome and go to google.com'\n" - ) - queued = agent.send_user_message( - "Actually open Chrome and go to google.com" - ) - print(f"[QUEUED] id={queued.id}") - - task = asyncio.create_task(inject_after_delay()) - - async for ev in handler.stream_events(): - if isinstance(ev, ExternalUserMessageAppliedEvent): - print( - f"[APPLIED] ids={ev.message_ids} consumer={ev.consumer} step={ev.step_number}" - ) - elif isinstance(ev, ExternalUserMessageDroppedEvent): - print(f"[DROPPED] ids={ev.message_ids} reason={ev.reason}") - - result = await handler - task.cancel() - - print( - f"\nResult: success={result.success} reason={result.reason} steps={result.steps}" - ) - - -if __name__ == "__main__": - asyncio.run(main()) From 21e594673f71349668cc9c5e8cb31ccaff74a610 Mon Sep 17 00:00:00 2001 From: johnmalek312 Date: Mon, 9 Mar 2026 19:21:26 +1100 Subject: [PATCH 5/7] fix: unify external_user_message tag format across agents --- droidrun/agent/codeact/tools_agent.py | 6 +++--- droidrun/agent/manager/manager_agent.py | 10 +++++----- droidrun/config/prompts/manager/system.jinja2 | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/droidrun/agent/codeact/tools_agent.py b/droidrun/agent/codeact/tools_agent.py index cf417d9..f7b2b95 100644 --- a/droidrun/agent/codeact/tools_agent.py +++ b/droidrun/agent/codeact/tools_agent.py @@ -524,9 +524,9 @@ class FastAgent(Workflow): drained = self.shared_state.drain_user_messages() if drained: - external_block = "\n".join( - f"\n{m.message}\n" - for m in drained + inner = "\n".join(m.message for m in drained) + external_block = ( + f"\n{inner}\n" ) output += "\n" + external_block logger.info( diff --git a/droidrun/agent/manager/manager_agent.py b/droidrun/agent/manager/manager_agent.py index 661c567..3b86b01 100644 --- a/droidrun/agent/manager/manager_agent.py +++ b/droidrun/agent/manager/manager_agent.py @@ -459,11 +459,11 @@ class ManagerAgent(Workflow): drained = self.shared_state.drain_user_messages() if drained: - block_lines = [""] - for m in drained: - block_lines.append(m.message) - block_lines.append("") - user_content += "\n" + "\n".join(block_lines) + "\n" + inner = "\n".join(m.message for m in drained) + external_block = ( + f"\n{inner}\n" + ) + user_content += "\n" + external_block + "\n" logger.info( f"πŸ“© Applied {len(drained)} external user message(s)", extra={"color": "cyan"}, diff --git a/droidrun/config/prompts/manager/system.jinja2 b/droidrun/config/prompts/manager/system.jinja2 index cffd53d..cbc8752 100644 --- a/droidrun/config/prompts/manager/system.jinja2 +++ b/droidrun/config/prompts/manager/system.jinja2 @@ -156,7 +156,7 @@ You should: {% endif %} --- -If an `` block is present, it contains live corrections or new instructions from the user sent during execution. Treat these as the newest guidance: revise your plan accordingly, but preserve already-completed progress unless the message explicitly changes course. +If an `` block is present, it contains live corrections or new instructions from the user sent during execution. Treat these as the newest guidance: revise your plan accordingly, but preserve already-completed progress unless the message explicitly changes course. Carefully assess the current status and the provided screenshot. Check if the current plan needs to be revised. Determine if the user request has been fully completed. If you are confident that no further actions are required and the task succeeded, use the request_accomplished tag with success="true" and a confirmation message. If the task failed and cannot be completed, use success="false" with an explanation. If the user request is not finished, update the plan and don't use the request_accomplished tag. If you are stuck with errors, think step by step about whether the overall plan needs to be revised to address the error. From 8d0eea03499af22cc88198749777c5c2504b0ec0 Mon Sep 17 00:00:00 2001 From: johnmalek312 Date: Mon, 9 Mar 2026 21:16:15 +1100 Subject: [PATCH 6/7] fix: per-message XML tags and remove dead event classes --- droidrun/agent/codeact/tools_agent.py | 6 +++--- droidrun/agent/droid/events.py | 11 ----------- droidrun/agent/manager/manager_agent.py | 6 +++--- 3 files changed, 6 insertions(+), 17 deletions(-) diff --git a/droidrun/agent/codeact/tools_agent.py b/droidrun/agent/codeact/tools_agent.py index f7b2b95..cf417d9 100644 --- a/droidrun/agent/codeact/tools_agent.py +++ b/droidrun/agent/codeact/tools_agent.py @@ -524,9 +524,9 @@ class FastAgent(Workflow): drained = self.shared_state.drain_user_messages() if drained: - inner = "\n".join(m.message for m in drained) - external_block = ( - f"\n{inner}\n" + external_block = "\n".join( + f"\n{m.message}\n" + for m in drained ) output += "\n" + external_block logger.info( diff --git a/droidrun/agent/droid/events.py b/droidrun/agent/droid/events.py index c7b6dee..8877d03 100644 --- a/droidrun/agent/droid/events.py +++ b/droidrun/agent/droid/events.py @@ -104,17 +104,6 @@ class TextManipulatorResultEvent(Event): # ============================================================================ -class ExternalUserMessageEvent(Event): - message: str - - -class ExternalUserMessageQueuedEvent(Event): - message_id: str - message: str - queue_length: int - step_number: int - - class ExternalUserMessageAppliedEvent(Event): message_ids: List[str] consumer: str diff --git a/droidrun/agent/manager/manager_agent.py b/droidrun/agent/manager/manager_agent.py index 3b86b01..e9d0a54 100644 --- a/droidrun/agent/manager/manager_agent.py +++ b/droidrun/agent/manager/manager_agent.py @@ -459,9 +459,9 @@ class ManagerAgent(Workflow): drained = self.shared_state.drain_user_messages() if drained: - inner = "\n".join(m.message for m in drained) - external_block = ( - f"\n{inner}\n" + external_block = "\n".join( + f"\n{m.message}\n" + for m in drained ) user_content += "\n" + external_block + "\n" logger.info( From 6899d31bf3335c42b9d6908b01b18a756509c2f8 Mon Sep 17 00:00:00 2001 From: johnmalek312 Date: Tue, 10 Mar 2026 00:27:07 +1100 Subject: [PATCH 7/7] fix: validate empty messages and clean up QueuedUserMessage import --- droidrun/agent/droid/droid_agent.py | 6 ++---- droidrun/agent/droid/state.py | 2 ++ 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/droidrun/agent/droid/droid_agent.py b/droidrun/agent/droid/droid_agent.py index 90ac5ba..0f6c03b 100644 --- a/droidrun/agent/droid/droid_agent.py +++ b/droidrun/agent/droid/droid_agent.py @@ -39,7 +39,7 @@ from droidrun.agent.droid.events import ( TextManipulatorInputEvent, TextManipulatorResultEvent, ) -from droidrun.agent.droid.state import DroidAgentState +from droidrun.agent.droid.state import DroidAgentState, QueuedUserMessage from droidrun.agent.executor import ExecutorAgent from droidrun.agent.external import load_agent from droidrun.agent.manager import ManagerAgent, StatelessManagerAgent @@ -596,9 +596,7 @@ class DroidAgent(Workflow): # External user message injection # ======================================================================== - def send_user_message(self, message: str) -> "QueuedUserMessage": - from droidrun.agent.droid.state import QueuedUserMessage - + def send_user_message(self, message: str) -> QueuedUserMessage: queued = self.shared_state.queue_user_message(message) logger.info( f"πŸ“© External user message queued [id={queued.id}] " diff --git a/droidrun/agent/droid/state.py b/droidrun/agent/droid/state.py index 6a8b38d..c6507d0 100644 --- a/droidrun/agent/droid/state.py +++ b/droidrun/agent/droid/state.py @@ -163,6 +163,8 @@ class DroidAgentState(BaseModel): self.answer = answer or "Task completed successfully." def queue_user_message(self, message: str) -> QueuedUserMessage: + if not message or not message.strip(): + raise ValueError("Cannot queue an empty or whitespace-only message.") if self.workflow_completed: raise RuntimeError("Cannot queue messages: agent has already finished.") queued = QueuedUserMessage(message=message, queued_at_step=self.step_number)