mirror of
https://github.com/droidrun/droidrun.git
synced 2026-05-23 07:40:37 +00:00
Merge pull request #283 from droidrun/feat/external-user-message-injection
Feat/external user message injection
This commit is contained in:
@@ -37,6 +37,10 @@ from droidrun.agent.codeact.xml_parser import (
|
||||
)
|
||||
from droidrun.agent.common.constants import LLM_HISTORY_LIMIT
|
||||
from droidrun.agent.common.events import RecordUIStateEvent, ScreenshotEvent
|
||||
from droidrun.agent.droid.events import (
|
||||
ExternalUserMessageAppliedEvent,
|
||||
ExternalUserMessageDroppedEvent,
|
||||
)
|
||||
from droidrun.agent.usage import get_usage_from_response
|
||||
from droidrun.agent.utils.chat_utils import limit_history
|
||||
from droidrun.agent.utils.inference import acall_with_retries
|
||||
@@ -211,6 +215,18 @@ class FastAgent(Workflow):
|
||||
|
||||
# Check then bump step counter
|
||||
if self.shared_state.step_number >= self.max_steps:
|
||||
pending = self.shared_state.drain_user_messages()
|
||||
if pending:
|
||||
logger.warning(
|
||||
f"⚠️ Dropping {len(pending)} external user message(s) at max steps"
|
||||
)
|
||||
ctx.write_event_to_stream(
|
||||
ExternalUserMessageDroppedEvent(
|
||||
message_ids=[m.id for m in pending],
|
||||
reason="max_steps_reached",
|
||||
step_number=self.shared_state.step_number,
|
||||
)
|
||||
)
|
||||
event = FastAgentEndEvent(
|
||||
success=False,
|
||||
reason=f"Reached max step count of {self.max_steps} steps",
|
||||
@@ -451,6 +467,19 @@ class FastAgent(Workflow):
|
||||
|
||||
# Check if complete() was called successfully
|
||||
if self.shared_state.finished:
|
||||
if self.shared_state.pending_user_messages:
|
||||
logger.info(
|
||||
"⏸️ complete() called but external messages pending, continuing",
|
||||
extra={"color": "cyan"},
|
||||
)
|
||||
self.shared_state.finished = False
|
||||
self.shared_state.success = None
|
||||
self.shared_state.answer = ""
|
||||
results_xml = format_tool_results(results)
|
||||
event = FastAgentOutputEvent(output=results_xml)
|
||||
ctx.write_event_to_stream(event)
|
||||
return event
|
||||
|
||||
logger.debug("✅ Task marked as complete via complete() tool")
|
||||
|
||||
success = (
|
||||
@@ -493,7 +522,26 @@ class FastAgent(Workflow):
|
||||
"""Add execution result to history and loop back."""
|
||||
output = ev.output or "Tool executed, but produced no output."
|
||||
|
||||
# Add results as user message
|
||||
drained = self.shared_state.drain_user_messages()
|
||||
if drained:
|
||||
external_block = "\n".join(
|
||||
f"<external_user_message>\n{m.message}\n</external_user_message>"
|
||||
for m in drained
|
||||
)
|
||||
output += "\n" + external_block
|
||||
logger.info(
|
||||
f"📩 Applied {len(drained)} external user message(s)",
|
||||
extra={"color": "cyan"},
|
||||
)
|
||||
ctx.write_event_to_stream(
|
||||
ExternalUserMessageAppliedEvent(
|
||||
message_ids=[m.id for m in drained],
|
||||
consumer="fast_agent",
|
||||
step_number=self.shared_state.step_number,
|
||||
)
|
||||
)
|
||||
|
||||
# Add results (+ any external messages) as a single user message
|
||||
self.shared_state.message_history.append(
|
||||
ChatMessage(role="user", content=output)
|
||||
)
|
||||
|
||||
@@ -27,6 +27,7 @@ from droidrun.agent.common.events import RecordUIStateEvent, ScreenshotEvent
|
||||
from droidrun.agent.droid.events import (
|
||||
ExecutorInputEvent,
|
||||
ExecutorResultEvent,
|
||||
ExternalUserMessageDroppedEvent,
|
||||
FastAgentExecuteEvent,
|
||||
FastAgentResultEvent,
|
||||
FinalizeEvent,
|
||||
@@ -38,7 +39,7 @@ from droidrun.agent.droid.events import (
|
||||
TextManipulatorInputEvent,
|
||||
TextManipulatorResultEvent,
|
||||
)
|
||||
from droidrun.agent.droid.state import DroidAgentState
|
||||
from droidrun.agent.droid.state import DroidAgentState, QueuedUserMessage
|
||||
from droidrun.agent.executor import ExecutorAgent
|
||||
from droidrun.agent.external import load_agent
|
||||
from droidrun.agent.manager import ManagerAgent, StatelessManagerAgent
|
||||
@@ -591,6 +592,19 @@ class DroidAgent(Workflow):
|
||||
ctx.write_event_to_stream(event)
|
||||
return event
|
||||
|
||||
# ========================================================================
|
||||
# External user message injection
|
||||
# ========================================================================
|
||||
|
||||
def send_user_message(self, message: str) -> QueuedUserMessage:
|
||||
queued = self.shared_state.queue_user_message(message)
|
||||
logger.info(
|
||||
f"📩 External user message queued [id={queued.id}] "
|
||||
f"(queue length: {len(self.shared_state.pending_user_messages)})",
|
||||
extra={"color": "cyan"},
|
||||
)
|
||||
return queued
|
||||
|
||||
# ========================================================================
|
||||
# execute_task — FastAgent / CodeActAgent
|
||||
# ========================================================================
|
||||
@@ -702,6 +716,18 @@ class DroidAgent(Workflow):
|
||||
"""Run Manager planning phase."""
|
||||
if self.shared_state.step_number >= self.config.agent.max_steps:
|
||||
logger.warning(f"⚠️ Reached maximum steps ({self.config.agent.max_steps})")
|
||||
pending = self.shared_state.drain_user_messages()
|
||||
if pending:
|
||||
logger.warning(
|
||||
f"⚠️ Dropping {len(pending)} external user message(s) at max steps"
|
||||
)
|
||||
ctx.write_event_to_stream(
|
||||
ExternalUserMessageDroppedEvent(
|
||||
message_ids=[m.id for m in pending],
|
||||
reason="max_steps_reached",
|
||||
step_number=self.shared_state.step_number,
|
||||
)
|
||||
)
|
||||
return FinalizeEvent(
|
||||
success=False,
|
||||
reason=f"Reached maximum steps ({self.config.agent.max_steps})",
|
||||
@@ -741,10 +767,18 @@ class DroidAgent(Workflow):
|
||||
| ScripterExecutorInputEvent
|
||||
| FinalizeEvent
|
||||
| TextManipulatorInputEvent
|
||||
| ManagerInputEvent
|
||||
):
|
||||
"""Process Manager output and decide next step."""
|
||||
# Check for answer-type termination
|
||||
if ev.answer.strip():
|
||||
if self.shared_state.pending_user_messages:
|
||||
logger.info(
|
||||
"⏸️ Manager tried to finish but external messages pending, "
|
||||
"looping back to Manager",
|
||||
extra={"color": "cyan"},
|
||||
)
|
||||
return ManagerInputEvent()
|
||||
success = ev.success if ev.success is not None else True
|
||||
self.shared_state.progress_summary = f"Answer: {ev.answer}"
|
||||
return FinalizeEvent(success=success, reason=ev.answer)
|
||||
@@ -997,6 +1031,7 @@ class DroidAgent(Workflow):
|
||||
|
||||
@step
|
||||
async def finalize(self, ctx: Context, ev: FinalizeEvent) -> ResultEvent:
|
||||
self.shared_state.workflow_completed = True
|
||||
ctx.write_event_to_stream(ev)
|
||||
capture(
|
||||
DroidAgentFinalizeEvent(
|
||||
|
||||
@@ -5,7 +5,7 @@ These events route between DroidAgent and child agents.
|
||||
For internal agent events, see each agent's events.py file.
|
||||
"""
|
||||
|
||||
from typing import Dict, Optional
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from llama_index.core.workflow import Event, StopEvent
|
||||
from pydantic import BaseModel
|
||||
@@ -99,6 +99,23 @@ class TextManipulatorResultEvent(Event):
|
||||
code_ran: str
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# EXTERNAL USER MESSAGE EVENTS
|
||||
# ============================================================================
|
||||
|
||||
|
||||
class ExternalUserMessageAppliedEvent(Event):
|
||||
message_ids: List[str]
|
||||
consumer: str
|
||||
step_number: int
|
||||
|
||||
|
||||
class ExternalUserMessageDroppedEvent(Event):
|
||||
message_ids: List[str]
|
||||
reason: str
|
||||
step_number: int
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# FINALIZATION EVENTS
|
||||
# ============================================================================
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Dict, List, Optional
|
||||
from uuid import uuid4
|
||||
|
||||
from llama_index.core.base.llms.types import ChatMessage
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
@@ -8,6 +9,12 @@ from pydantic import BaseModel, ConfigDict, Field
|
||||
from droidrun.telemetry import PackageVisitEvent, capture
|
||||
|
||||
|
||||
class QueuedUserMessage(BaseModel):
|
||||
id: str = Field(default_factory=lambda: str(uuid4()))
|
||||
message: str
|
||||
queued_at_step: int = 0
|
||||
|
||||
|
||||
class DroidAgentState(BaseModel):
|
||||
"""
|
||||
State model for DroidAgent workflow - shared across parent and child workflows.
|
||||
@@ -111,6 +118,12 @@ class DroidAgentState(BaseModel):
|
||||
text_manipulation_history: List[Dict] = Field(default_factory=list)
|
||||
last_text_manipulation_success: bool = False
|
||||
|
||||
# ========================================================================
|
||||
# External User Messages (mid-run injection queue)
|
||||
# ========================================================================
|
||||
pending_user_messages: List[QueuedUserMessage] = Field(default_factory=list)
|
||||
workflow_completed: bool = False
|
||||
|
||||
# ========================================================================
|
||||
# Custom Variables (user-defined)
|
||||
# ========================================================================
|
||||
@@ -149,6 +162,22 @@ class DroidAgentState(BaseModel):
|
||||
self.success = success
|
||||
self.answer = answer or "Task completed successfully."
|
||||
|
||||
def queue_user_message(self, message: str) -> QueuedUserMessage:
|
||||
if not message or not message.strip():
|
||||
raise ValueError("Cannot queue an empty or whitespace-only message.")
|
||||
if self.workflow_completed:
|
||||
raise RuntimeError("Cannot queue messages: agent has already finished.")
|
||||
queued = QueuedUserMessage(message=message, queued_at_step=self.step_number)
|
||||
self.pending_user_messages.append(queued)
|
||||
return queued
|
||||
|
||||
def drain_user_messages(self) -> list[QueuedUserMessage]:
|
||||
if not self.pending_user_messages:
|
||||
return []
|
||||
messages = list(self.pending_user_messages)
|
||||
self.pending_user_messages.clear()
|
||||
return messages
|
||||
|
||||
def update_current_app(self, package_name: str, activity_name: str):
|
||||
"""
|
||||
Update package and activity together, capturing telemetry event only once.
|
||||
|
||||
@@ -28,6 +28,7 @@ from opentelemetry import trace
|
||||
from pydantic import BaseModel
|
||||
|
||||
from droidrun.agent.common.events import RecordUIStateEvent, ScreenshotEvent
|
||||
from droidrun.agent.droid.events import ExternalUserMessageAppliedEvent
|
||||
from droidrun.agent.manager.events import (
|
||||
ManagerContextEvent,
|
||||
ManagerPlanDetailsEvent,
|
||||
@@ -455,6 +456,26 @@ class ManagerAgent(Workflow):
|
||||
|
||||
# Build user message and add to history
|
||||
user_content = self._build_user_message_content()
|
||||
|
||||
drained = self.shared_state.drain_user_messages()
|
||||
if drained:
|
||||
external_block = "\n".join(
|
||||
f"<external_user_message>\n{m.message}\n</external_user_message>"
|
||||
for m in drained
|
||||
)
|
||||
user_content += "\n" + external_block + "\n"
|
||||
logger.info(
|
||||
f"📩 Applied {len(drained)} external user message(s)",
|
||||
extra={"color": "cyan"},
|
||||
)
|
||||
ctx.write_event_to_stream(
|
||||
ExternalUserMessageAppliedEvent(
|
||||
message_ids=[m.id for m in drained],
|
||||
consumer="manager",
|
||||
step_number=self.shared_state.step_number,
|
||||
)
|
||||
)
|
||||
|
||||
self.shared_state.message_history.append(
|
||||
ChatMessage(role="user", content=user_content)
|
||||
)
|
||||
|
||||
@@ -123,6 +123,7 @@ When done:
|
||||
{% endif %}
|
||||
- Use `complete` to signal task completion (success or failure)
|
||||
- Use `remember` to store important information for later steps
|
||||
- Messages in `<external_user_message>` tags are live corrections or new instructions from the user sent mid-execution. They override earlier assumptions — honor the most recent user guidance on your next step
|
||||
|
||||
{% if output_schema %}
|
||||
|
||||
|
||||
@@ -156,6 +156,8 @@ You should:
|
||||
{% endif %}
|
||||
|
||||
---
|
||||
If an `<external_user_message>` block is present, it contains live corrections or new instructions from the user sent during execution. Treat these as the newest guidance: revise your plan accordingly, but preserve already-completed progress unless the message explicitly changes course.
|
||||
|
||||
Carefully assess the current status and the provided screenshot. Check if the current plan needs to be revised.
|
||||
Determine if the user request has been fully completed. If you are confident that no further actions are required and the task succeeded, use the request_accomplished tag with success="true" and a confirmation message. If the task failed and cannot be completed, use success="false" with an explanation. If the user request is not finished, update the plan and don't use the request_accomplished tag. If you are stuck with errors, think step by step about whether the overall plan needs to be revised to address the error.
|
||||
NOTE: 1. If the current situation prevents proceeding with the original plan or requires clarification from the user, make reasonable assumptions and revise the plan accordingly. Act as though you are the user in such cases. 2. Please refer to the helpful information and steps in the Guidelines first for planning. 3. If the first subgoal in plan has been completed, please update the plan in time according to the screenshot and progress to ensure that the next subgoal is always the first item in the plan. 4. If the first subgoal is not completed, please copy the previous round's plan or update the plan based on the completion of the subgoal.
|
||||
|
||||
Reference in New Issue
Block a user