397 lines
13 KiB
Go
397 lines
13 KiB
Go
// SPDX-License-Identifier: EUPL-1.2
|
|
|
|
package agentic
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
"time"
|
|
|
|
"dappco.re/go/agent/pkg/messages"
|
|
core "dappco.re/go/core"
|
|
"github.com/modelcontextprotocol/go-sdk/mcp"
|
|
)
|
|
|
|
// message := agentic.AgentMessage{Workspace: "core/go-io/task-5", FromAgent: "codex", ToAgent: "claude", Subject: "Review", Content: "Please check the prompt."}
|
|
type AgentMessage struct {
|
|
ID string `json:"id"`
|
|
WorkspaceID int `json:"workspace_id,omitempty"`
|
|
Workspace string `json:"workspace"`
|
|
FromAgent string `json:"from_agent"`
|
|
ToAgent string `json:"to_agent"`
|
|
Subject string `json:"subject,omitempty"`
|
|
Content string `json:"content"`
|
|
ReadAt string `json:"read_at,omitempty"`
|
|
CreatedAt string `json:"created_at,omitempty"`
|
|
}
|
|
|
|
// input := agentic.MessageSendInput{Workspace: "core/go-io/task-5", FromAgent: "codex", ToAgent: "claude", Subject: "Review", Content: "Please check the prompt."}
|
|
type MessageSendInput struct {
|
|
WorkspaceID int `json:"workspace_id,omitempty"`
|
|
Workspace string `json:"workspace"`
|
|
FromAgent string `json:"from_agent"`
|
|
ToAgent string `json:"to_agent"`
|
|
Subject string `json:"subject,omitempty"`
|
|
Content string `json:"content"`
|
|
}
|
|
|
|
// input := agentic.MessageInboxInput{Workspace: "core/go-io/task-5", Agent: "claude"}
|
|
type MessageInboxInput struct {
|
|
Workspace string `json:"workspace"`
|
|
Agent string `json:"agent"`
|
|
Limit int `json:"limit,omitempty"`
|
|
}
|
|
|
|
// input := agentic.MessageConversationInput{Workspace: "core/go-io/task-5", Agent: "codex", WithAgent: "claude"}
|
|
type MessageConversationInput struct {
|
|
Workspace string `json:"workspace"`
|
|
Agent string `json:"agent"`
|
|
WithAgent string `json:"with_agent"`
|
|
Limit int `json:"limit,omitempty"`
|
|
}
|
|
|
|
// out := agentic.MessageSendOutput{Success: true, Message: agentic.AgentMessage{ID: "msg-1"}}
|
|
type MessageSendOutput struct {
|
|
Success bool `json:"success"`
|
|
Message AgentMessage `json:"message"`
|
|
}
|
|
|
|
// out := agentic.MessageListOutput{Success: true, Count: 1, Messages: []agentic.AgentMessage{{ID: "msg-1"}}}
|
|
type MessageListOutput struct {
|
|
Success bool `json:"success"`
|
|
New int `json:"new,omitempty"`
|
|
Count int `json:"count"`
|
|
Messages []AgentMessage `json:"messages"`
|
|
}
|
|
|
|
// result := c.Action("agentic.message.send").Run(ctx, core.NewOptions(
|
|
//
|
|
// core.Option{Key: "workspace", Value: "core/go-io/task-5"},
|
|
// core.Option{Key: "from_agent", Value: "codex"},
|
|
// core.Option{Key: "to_agent", Value: "claude"},
|
|
//
|
|
// ))
|
|
func (s *PrepSubsystem) handleMessageSend(ctx context.Context, options core.Options) core.Result {
|
|
_, output, err := s.messageSend(ctx, nil, MessageSendInput{
|
|
WorkspaceID: optionIntValue(options, "workspace_id", "workspace-id"),
|
|
Workspace: optionStringValue(options, "workspace", "_arg"),
|
|
FromAgent: optionStringValue(options, "from_agent", "from"),
|
|
ToAgent: optionStringValue(options, "to_agent", "to"),
|
|
Subject: optionStringValue(options, "subject"),
|
|
Content: optionStringValue(options, "content", "body"),
|
|
})
|
|
if err != nil {
|
|
return core.Result{Value: err, OK: false}
|
|
}
|
|
return core.Result{Value: output, OK: true}
|
|
}
|
|
|
|
// result := c.Action("agentic.message.inbox").Run(ctx, core.NewOptions(core.Option{Key: "workspace", Value: "core/go-io/task-5"}))
|
|
func (s *PrepSubsystem) handleMessageInbox(ctx context.Context, options core.Options) core.Result {
|
|
_, output, err := s.messageInbox(ctx, nil, MessageInboxInput{
|
|
Workspace: optionStringValue(options, "workspace", "_arg"),
|
|
Agent: optionStringValue(options, "agent", "agent_id", "agent-id"),
|
|
Limit: optionIntValue(options, "limit"),
|
|
})
|
|
if err != nil {
|
|
return core.Result{Value: err, OK: false}
|
|
}
|
|
if s.Core() != nil {
|
|
s.Core().ACTION(messages.InboxMessage{
|
|
New: output.New,
|
|
Total: output.Count,
|
|
})
|
|
}
|
|
return core.Result{Value: output, OK: true}
|
|
}
|
|
|
|
// result := c.Action("agentic.message.conversation").Run(ctx, core.NewOptions(
|
|
//
|
|
// core.Option{Key: "workspace", Value: "core/go-io/task-5"},
|
|
// core.Option{Key: "agent", Value: "codex"},
|
|
// core.Option{Key: "with_agent", Value: "claude"},
|
|
//
|
|
// ))
|
|
func (s *PrepSubsystem) handleMessageConversation(ctx context.Context, options core.Options) core.Result {
|
|
_, output, err := s.messageConversation(ctx, nil, MessageConversationInput{
|
|
Workspace: optionStringValue(options, "workspace", "_arg"),
|
|
Agent: optionStringValue(options, "agent", "agent_id", "agent-id"),
|
|
WithAgent: optionStringValue(options, "with_agent", "with-agent", "with", "to_agent", "to-agent"),
|
|
Limit: optionIntValue(options, "limit"),
|
|
})
|
|
if err != nil {
|
|
return core.Result{Value: err, OK: false}
|
|
}
|
|
return core.Result{Value: output, OK: true}
|
|
}
|
|
|
|
func (s *PrepSubsystem) registerMessageTools(server *mcp.Server) {
|
|
mcp.AddTool(server, &mcp.Tool{
|
|
Name: "agentic_message_send",
|
|
Description: "Send a direct message between two agents within a workspace.",
|
|
}, s.messageSend)
|
|
mcp.AddTool(server, &mcp.Tool{
|
|
Name: "agent_send",
|
|
Description: "Send a direct message between two agents within a workspace.",
|
|
}, s.messageSend)
|
|
|
|
mcp.AddTool(server, &mcp.Tool{
|
|
Name: "agentic_message_inbox",
|
|
Description: "List messages delivered to an agent within a workspace.",
|
|
}, s.messageInbox)
|
|
mcp.AddTool(server, &mcp.Tool{
|
|
Name: "agent_inbox",
|
|
Description: "List messages delivered to an agent within a workspace.",
|
|
}, s.messageInbox)
|
|
|
|
mcp.AddTool(server, &mcp.Tool{
|
|
Name: "agentic_message_conversation",
|
|
Description: "List the chronological conversation between two agents within a workspace.",
|
|
}, s.messageConversation)
|
|
mcp.AddTool(server, &mcp.Tool{
|
|
Name: "agent_conversation",
|
|
Description: "List the chronological conversation between two agents within a workspace.",
|
|
}, s.messageConversation)
|
|
}
|
|
|
|
func (s *PrepSubsystem) messageSend(_ context.Context, _ *mcp.CallToolRequest, input MessageSendInput) (*mcp.CallToolResult, MessageSendOutput, error) {
|
|
message, err := messageStoreSend(input)
|
|
if err != nil {
|
|
return nil, MessageSendOutput{}, err
|
|
}
|
|
return nil, MessageSendOutput{Success: true, Message: message}, nil
|
|
}
|
|
|
|
func (s *PrepSubsystem) messageInbox(_ context.Context, _ *mcp.CallToolRequest, input MessageInboxInput) (*mcp.CallToolResult, MessageListOutput, error) {
|
|
messages, newCount, err := messageStoreInbox(input.Workspace, input.Agent, input.Limit)
|
|
if err != nil {
|
|
return nil, MessageListOutput{}, err
|
|
}
|
|
return nil, MessageListOutput{Success: true, New: newCount, Count: len(messages), Messages: messages}, nil
|
|
}
|
|
|
|
func (s *PrepSubsystem) messageConversation(_ context.Context, _ *mcp.CallToolRequest, input MessageConversationInput) (*mcp.CallToolResult, MessageListOutput, error) {
|
|
messages, err := messageStoreConversation(input.Workspace, input.Agent, input.WithAgent, input.Limit)
|
|
if err != nil {
|
|
return nil, MessageListOutput{}, err
|
|
}
|
|
return nil, MessageListOutput{Success: true, Count: len(messages), Messages: messages}, nil
|
|
}
|
|
|
|
func messageStoreSend(input MessageSendInput) (AgentMessage, error) {
|
|
if input.Workspace == "" {
|
|
return AgentMessage{}, core.E("messageSend", "workspace is required", nil)
|
|
}
|
|
if input.FromAgent == "" {
|
|
return AgentMessage{}, core.E("messageSend", "from_agent is required", nil)
|
|
}
|
|
if input.ToAgent == "" {
|
|
return AgentMessage{}, core.E("messageSend", "to_agent is required", nil)
|
|
}
|
|
if core.Trim(input.Content) == "" {
|
|
return AgentMessage{}, core.E("messageSend", "content is required", nil)
|
|
}
|
|
|
|
messages, err := readWorkspaceMessages(input.Workspace)
|
|
if err != nil {
|
|
return AgentMessage{}, err
|
|
}
|
|
|
|
now := time.Now().Format(time.RFC3339)
|
|
message := AgentMessage{
|
|
ID: messageID(),
|
|
WorkspaceID: input.WorkspaceID,
|
|
Workspace: core.Trim(input.Workspace),
|
|
FromAgent: core.Trim(input.FromAgent),
|
|
ToAgent: core.Trim(input.ToAgent),
|
|
Subject: core.Trim(input.Subject),
|
|
Content: input.Content,
|
|
CreatedAt: now,
|
|
}
|
|
messages = append(messages, message)
|
|
|
|
if err := writeWorkspaceMessages(input.Workspace, messages); err != nil {
|
|
return AgentMessage{}, err
|
|
}
|
|
|
|
return message, nil
|
|
}
|
|
|
|
func messageStoreInbox(workspace, agent string, limit int) ([]AgentMessage, int, error) {
|
|
if workspace == "" {
|
|
return nil, 0, core.E("messageInbox", "workspace is required", nil)
|
|
}
|
|
if agent == "" {
|
|
return nil, 0, core.E("messageInbox", "agent is required", nil)
|
|
}
|
|
|
|
messages, err := readWorkspaceMessages(workspace)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
if limit <= 0 {
|
|
limit = 50
|
|
}
|
|
|
|
now := time.Now().Format(time.RFC3339)
|
|
inbox := make([]AgentMessage, 0, len(messages))
|
|
newCount := 0
|
|
changed := false
|
|
|
|
for i := range messages {
|
|
message := normaliseAgentMessage(messages[i])
|
|
if message.ToAgent != agent {
|
|
messages[i] = message
|
|
continue
|
|
}
|
|
if message.ReadAt == "" {
|
|
message.ReadAt = now
|
|
newCount++
|
|
changed = true
|
|
}
|
|
messages[i] = message
|
|
inbox = append(inbox, message)
|
|
}
|
|
|
|
if changed {
|
|
if err := writeWorkspaceMessages(workspace, messages); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
}
|
|
|
|
if len(inbox) > limit {
|
|
inbox = inbox[len(inbox)-limit:]
|
|
}
|
|
|
|
return inbox, newCount, nil
|
|
}
|
|
|
|
func messageStoreConversation(workspace, agent, withAgent string, limit int) ([]AgentMessage, error) {
|
|
if workspace == "" {
|
|
return nil, core.E("messageConversation", "workspace is required", nil)
|
|
}
|
|
if agent == "" {
|
|
return nil, core.E("messageConversation", "agent is required", nil)
|
|
}
|
|
if withAgent == "" {
|
|
return nil, core.E("messageConversation", "with_agent is required", nil)
|
|
}
|
|
|
|
return messageStoreFilter(workspace, limit, func(message AgentMessage) bool {
|
|
return (message.FromAgent == agent && message.ToAgent == withAgent) || (message.FromAgent == withAgent && message.ToAgent == agent)
|
|
})
|
|
}
|
|
|
|
func messageStoreFilter(workspace string, limit int, match func(AgentMessage) bool) ([]AgentMessage, error) {
|
|
messages, err := readWorkspaceMessages(workspace)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
filtered := make([]AgentMessage, 0, len(messages))
|
|
for _, message := range messages {
|
|
message = normaliseAgentMessage(message)
|
|
if match(message) {
|
|
filtered = append(filtered, message)
|
|
}
|
|
}
|
|
|
|
sort.SliceStable(filtered, func(i, j int) bool {
|
|
return filtered[i].CreatedAt < filtered[j].CreatedAt
|
|
})
|
|
|
|
if limit <= 0 {
|
|
limit = 50
|
|
}
|
|
if len(filtered) > limit {
|
|
filtered = filtered[len(filtered)-limit:]
|
|
}
|
|
|
|
return filtered, nil
|
|
}
|
|
|
|
func messageRoot() string {
|
|
return core.JoinPath(CoreRoot(), "messages")
|
|
}
|
|
|
|
func messagePath(workspace string) string {
|
|
return core.JoinPath(messageRoot(), core.Concat(pathKey(workspace), ".json"))
|
|
}
|
|
|
|
func readWorkspaceMessages(workspace string) ([]AgentMessage, error) {
|
|
if workspace == "" {
|
|
return []AgentMessage{}, nil
|
|
}
|
|
|
|
result := fs.Read(messagePath(workspace))
|
|
if !result.OK {
|
|
err, _ := result.Value.(error)
|
|
if err == nil || core.Contains(err.Error(), "no such file") {
|
|
return []AgentMessage{}, nil
|
|
}
|
|
return nil, core.E("readWorkspaceMessages", "failed to read message store", err)
|
|
}
|
|
|
|
content := core.Trim(result.Value.(string))
|
|
if content == "" {
|
|
return []AgentMessage{}, nil
|
|
}
|
|
|
|
var messages []AgentMessage
|
|
if parseResult := core.JSONUnmarshalString(content, &messages); !parseResult.OK {
|
|
err, _ := parseResult.Value.(error)
|
|
return nil, core.E("readWorkspaceMessages", "failed to parse message store", err)
|
|
}
|
|
|
|
for i := range messages {
|
|
messages[i] = normaliseAgentMessage(messages[i])
|
|
}
|
|
|
|
sort.SliceStable(messages, func(i, j int) bool {
|
|
return messages[i].CreatedAt < messages[j].CreatedAt
|
|
})
|
|
|
|
return messages, nil
|
|
}
|
|
|
|
func writeWorkspaceMessages(workspace string, messages []AgentMessage) error {
|
|
if workspace == "" {
|
|
return core.E("writeWorkspaceMessages", "workspace is required", nil)
|
|
}
|
|
|
|
normalised := make([]AgentMessage, 0, len(messages))
|
|
for _, message := range messages {
|
|
normalised = append(normalised, normaliseAgentMessage(message))
|
|
}
|
|
|
|
if ensureDirResult := fs.EnsureDir(messageRoot()); !ensureDirResult.OK {
|
|
err, _ := ensureDirResult.Value.(error)
|
|
return core.E("writeWorkspaceMessages", "failed to create message store directory", err)
|
|
}
|
|
|
|
if writeResult := fs.WriteAtomic(messagePath(workspace), core.JSONMarshalString(normalised)); !writeResult.OK {
|
|
err, _ := writeResult.Value.(error)
|
|
return core.E("writeWorkspaceMessages", "failed to write message store", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func normaliseAgentMessage(message AgentMessage) AgentMessage {
|
|
message.Workspace = core.Trim(message.Workspace)
|
|
message.FromAgent = core.Trim(message.FromAgent)
|
|
message.ToAgent = core.Trim(message.ToAgent)
|
|
message.Subject = core.Trim(message.Subject)
|
|
if message.ID == "" {
|
|
message.ID = messageID()
|
|
}
|
|
if message.CreatedAt == "" {
|
|
message.CreatedAt = time.Now().Format(time.RFC3339)
|
|
}
|
|
return message
|
|
}
|
|
|
|
func messageID() string {
|
|
return core.Concat("msg-", core.Sprint(time.Now().UnixNano()))
|
|
}
|