diff --git a/pkg/agentic/message.go b/pkg/agentic/message.go index be1e152..9f5eae0 100644 --- a/pkg/agentic/message.go +++ b/pkg/agentic/message.go @@ -7,6 +7,7 @@ import ( "sort" "time" + "dappco.re/go/agent/pkg/messages" core "dappco.re/go/core" "github.com/modelcontextprotocol/go-sdk/mcp" ) @@ -56,6 +57,7 @@ type MessageSendOutput struct { // out := agentic.MessageListOutput{Success: true, Count: 1, Messages: []agentic.AgentMessage{{ID: "msg-1"}}} type MessageListOutput struct { Success bool `json:"success"` + New int `json:"new,omitempty"` Count int `json:"count"` Messages []AgentMessage `json:"messages"` } @@ -91,6 +93,12 @@ func (s *PrepSubsystem) handleMessageInbox(ctx context.Context, options core.Opt if err != nil { return core.Result{Value: err, OK: false} } + if s.Core() != nil { + s.Core().ACTION(messages.InboxMessage{ + New: output.New, + Total: output.Count, + }) + } return core.Result{Value: output, OK: true} } @@ -140,11 +148,11 @@ func (s *PrepSubsystem) messageSend(_ context.Context, _ *mcp.CallToolRequest, i } func (s *PrepSubsystem) messageInbox(_ context.Context, _ *mcp.CallToolRequest, input MessageInboxInput) (*mcp.CallToolResult, MessageListOutput, error) { - messages, err := messageStoreInbox(input.Workspace, input.Agent, input.Limit) + messages, newCount, err := messageStoreInbox(input.Workspace, input.Agent, input.Limit) if err != nil { return nil, MessageListOutput{}, err } - return nil, MessageListOutput{Success: true, Count: len(messages), Messages: messages}, nil + return nil, MessageListOutput{Success: true, New: newCount, Count: len(messages), Messages: messages}, nil } func (s *PrepSubsystem) messageConversation(_ context.Context, _ *mcp.CallToolRequest, input MessageConversationInput) (*mcp.CallToolResult, MessageListOutput, error) { @@ -193,17 +201,54 @@ func messageStoreSend(input MessageSendInput) (AgentMessage, error) { return message, nil } -func messageStoreInbox(workspace, agent string, limit int) ([]AgentMessage, error) { +func messageStoreInbox(workspace, agent string, limit int) ([]AgentMessage, int, error) { if workspace == "" { - return nil, core.E("messageInbox", "workspace is required", nil) + return nil, 0, core.E("messageInbox", "workspace is required", nil) } if agent == "" { - return nil, core.E("messageInbox", "agent is required", nil) + return nil, 0, core.E("messageInbox", "agent is required", nil) } - return messageStoreFilter(workspace, limit, func(message AgentMessage) bool { - return message.ToAgent == agent - }) + messages, err := readWorkspaceMessages(workspace) + if err != nil { + return nil, 0, err + } + + if limit <= 0 { + limit = 50 + } + + now := time.Now().Format(time.RFC3339) + inbox := make([]AgentMessage, 0, len(messages)) + newCount := 0 + changed := false + + for i := range messages { + message := normaliseAgentMessage(messages[i]) + if message.ToAgent != agent { + messages[i] = message + continue + } + if message.ReadAt == "" { + message.ReadAt = now + newCount++ + changed = true + } + messages[i] = message + inbox = append(inbox, message) + } + + if changed { + if err := writeWorkspaceMessages(workspace, messages); err != nil { + return nil, 0, err + } + } + + if len(inbox) > limit { + inbox = inbox[len(inbox)-limit:] + } + + return inbox, newCount, nil } func messageStoreConversation(workspace, agent, withAgent string, limit int) ([]AgentMessage, error) { diff --git a/pkg/agentic/message_test.go b/pkg/agentic/message_test.go index d627450..4b753d8 100644 --- a/pkg/agentic/message_test.go +++ b/pkg/agentic/message_test.go @@ -5,7 +5,9 @@ package agentic import ( "context" "testing" + "time" + "dappco.re/go/agent/pkg/messages" core "dappco.re/go/core" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -65,6 +67,62 @@ func TestMessage_MessageSend_Good_PersistsAndReadsBack(t *testing.T) { assert.Equal(t, output.Message.ID, conversation.Messages[0].ID) } +func TestMessage_MessageInbox_Good_MarksReadAndEmitsCounts(t *testing.T) { + dir := t.TempDir() + t.Setenv("CORE_WORKSPACE", dir) + + c := core.New() + var inboxEvents []messages.InboxMessage + c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result { + if ev, ok := msg.(messages.InboxMessage); ok { + inboxEvents = append(inboxEvents, ev) + } + return core.Result{OK: true} + }) + + s := &PrepSubsystem{ + ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}), + backoff: make(map[string]time.Time), + failCount: make(map[string]int), + } + + sendResult := s.handleMessageSend(context.Background(), core.NewOptions( + core.Option{Key: "workspace", Value: "core/go-io/task-5"}, + core.Option{Key: "from_agent", Value: "codex"}, + core.Option{Key: "to_agent", Value: "claude"}, + core.Option{Key: "content", Value: "Please review this."}, + )) + require.True(t, sendResult.OK) + + inboxResult := s.handleMessageInbox(context.Background(), core.NewOptions( + core.Option{Key: "workspace", Value: "core/go-io/task-5"}, + core.Option{Key: "agent", Value: "claude"}, + )) + require.True(t, inboxResult.OK) + + inbox, ok := inboxResult.Value.(MessageListOutput) + require.True(t, ok) + assert.Equal(t, 1, inbox.New) + assert.Equal(t, 1, inbox.Count) + require.Len(t, inbox.Messages, 1) + assert.NotEmpty(t, inbox.Messages[0].ReadAt) + + secondResult := s.handleMessageInbox(context.Background(), core.NewOptions( + core.Option{Key: "workspace", Value: "core/go-io/task-5"}, + core.Option{Key: "agent", Value: "claude"}, + )) + require.True(t, secondResult.OK) + + secondInbox, ok := secondResult.Value.(MessageListOutput) + require.True(t, ok) + assert.Equal(t, 0, secondInbox.New) + assert.Len(t, inboxEvents, 2) + assert.Equal(t, 1, inboxEvents[0].New) + assert.Equal(t, 1, inboxEvents[0].Total) + assert.Equal(t, 0, inboxEvents[1].New) + assert.Equal(t, 1, inboxEvents[1].Total) +} + func TestMessage_MessageSend_Bad_MissingRequiredFields(t *testing.T) { s := newTestPrep(t)