feat(agentic): emit inbox counts and mark messages read
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
cc552ed9dd
commit
129cf5575e
2 changed files with 111 additions and 8 deletions
|
|
@ -7,6 +7,7 @@ import (
|
|||
"sort"
|
||||
"time"
|
||||
|
||||
"dappco.re/go/agent/pkg/messages"
|
||||
core "dappco.re/go/core"
|
||||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||
)
|
||||
|
|
@ -56,6 +57,7 @@ type MessageSendOutput struct {
|
|||
// out := agentic.MessageListOutput{Success: true, Count: 1, Messages: []agentic.AgentMessage{{ID: "msg-1"}}}
|
||||
type MessageListOutput struct {
|
||||
Success bool `json:"success"`
|
||||
New int `json:"new,omitempty"`
|
||||
Count int `json:"count"`
|
||||
Messages []AgentMessage `json:"messages"`
|
||||
}
|
||||
|
|
@ -91,6 +93,12 @@ func (s *PrepSubsystem) handleMessageInbox(ctx context.Context, options core.Opt
|
|||
if err != nil {
|
||||
return core.Result{Value: err, OK: false}
|
||||
}
|
||||
if s.Core() != nil {
|
||||
s.Core().ACTION(messages.InboxMessage{
|
||||
New: output.New,
|
||||
Total: output.Count,
|
||||
})
|
||||
}
|
||||
return core.Result{Value: output, OK: true}
|
||||
}
|
||||
|
||||
|
|
@ -140,11 +148,11 @@ func (s *PrepSubsystem) messageSend(_ context.Context, _ *mcp.CallToolRequest, i
|
|||
}
|
||||
|
||||
func (s *PrepSubsystem) messageInbox(_ context.Context, _ *mcp.CallToolRequest, input MessageInboxInput) (*mcp.CallToolResult, MessageListOutput, error) {
|
||||
messages, err := messageStoreInbox(input.Workspace, input.Agent, input.Limit)
|
||||
messages, newCount, err := messageStoreInbox(input.Workspace, input.Agent, input.Limit)
|
||||
if err != nil {
|
||||
return nil, MessageListOutput{}, err
|
||||
}
|
||||
return nil, MessageListOutput{Success: true, Count: len(messages), Messages: messages}, nil
|
||||
return nil, MessageListOutput{Success: true, New: newCount, Count: len(messages), Messages: messages}, nil
|
||||
}
|
||||
|
||||
func (s *PrepSubsystem) messageConversation(_ context.Context, _ *mcp.CallToolRequest, input MessageConversationInput) (*mcp.CallToolResult, MessageListOutput, error) {
|
||||
|
|
@ -193,17 +201,54 @@ func messageStoreSend(input MessageSendInput) (AgentMessage, error) {
|
|||
return message, nil
|
||||
}
|
||||
|
||||
func messageStoreInbox(workspace, agent string, limit int) ([]AgentMessage, error) {
|
||||
func messageStoreInbox(workspace, agent string, limit int) ([]AgentMessage, int, error) {
|
||||
if workspace == "" {
|
||||
return nil, core.E("messageInbox", "workspace is required", nil)
|
||||
return nil, 0, core.E("messageInbox", "workspace is required", nil)
|
||||
}
|
||||
if agent == "" {
|
||||
return nil, core.E("messageInbox", "agent is required", nil)
|
||||
return nil, 0, core.E("messageInbox", "agent is required", nil)
|
||||
}
|
||||
|
||||
return messageStoreFilter(workspace, limit, func(message AgentMessage) bool {
|
||||
return message.ToAgent == agent
|
||||
})
|
||||
messages, err := readWorkspaceMessages(workspace)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if limit <= 0 {
|
||||
limit = 50
|
||||
}
|
||||
|
||||
now := time.Now().Format(time.RFC3339)
|
||||
inbox := make([]AgentMessage, 0, len(messages))
|
||||
newCount := 0
|
||||
changed := false
|
||||
|
||||
for i := range messages {
|
||||
message := normaliseAgentMessage(messages[i])
|
||||
if message.ToAgent != agent {
|
||||
messages[i] = message
|
||||
continue
|
||||
}
|
||||
if message.ReadAt == "" {
|
||||
message.ReadAt = now
|
||||
newCount++
|
||||
changed = true
|
||||
}
|
||||
messages[i] = message
|
||||
inbox = append(inbox, message)
|
||||
}
|
||||
|
||||
if changed {
|
||||
if err := writeWorkspaceMessages(workspace, messages); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
|
||||
if len(inbox) > limit {
|
||||
inbox = inbox[len(inbox)-limit:]
|
||||
}
|
||||
|
||||
return inbox, newCount, nil
|
||||
}
|
||||
|
||||
func messageStoreConversation(workspace, agent, withAgent string, limit int) ([]AgentMessage, error) {
|
||||
|
|
|
|||
|
|
@ -5,7 +5,9 @@ package agentic
|
|||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"dappco.re/go/agent/pkg/messages"
|
||||
core "dappco.re/go/core"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
|
@ -65,6 +67,62 @@ func TestMessage_MessageSend_Good_PersistsAndReadsBack(t *testing.T) {
|
|||
assert.Equal(t, output.Message.ID, conversation.Messages[0].ID)
|
||||
}
|
||||
|
||||
func TestMessage_MessageInbox_Good_MarksReadAndEmitsCounts(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", dir)
|
||||
|
||||
c := core.New()
|
||||
var inboxEvents []messages.InboxMessage
|
||||
c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result {
|
||||
if ev, ok := msg.(messages.InboxMessage); ok {
|
||||
inboxEvents = append(inboxEvents, ev)
|
||||
}
|
||||
return core.Result{OK: true}
|
||||
})
|
||||
|
||||
s := &PrepSubsystem{
|
||||
ServiceRuntime: core.NewServiceRuntime(c, AgentOptions{}),
|
||||
backoff: make(map[string]time.Time),
|
||||
failCount: make(map[string]int),
|
||||
}
|
||||
|
||||
sendResult := s.handleMessageSend(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "workspace", Value: "core/go-io/task-5"},
|
||||
core.Option{Key: "from_agent", Value: "codex"},
|
||||
core.Option{Key: "to_agent", Value: "claude"},
|
||||
core.Option{Key: "content", Value: "Please review this."},
|
||||
))
|
||||
require.True(t, sendResult.OK)
|
||||
|
||||
inboxResult := s.handleMessageInbox(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "workspace", Value: "core/go-io/task-5"},
|
||||
core.Option{Key: "agent", Value: "claude"},
|
||||
))
|
||||
require.True(t, inboxResult.OK)
|
||||
|
||||
inbox, ok := inboxResult.Value.(MessageListOutput)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, 1, inbox.New)
|
||||
assert.Equal(t, 1, inbox.Count)
|
||||
require.Len(t, inbox.Messages, 1)
|
||||
assert.NotEmpty(t, inbox.Messages[0].ReadAt)
|
||||
|
||||
secondResult := s.handleMessageInbox(context.Background(), core.NewOptions(
|
||||
core.Option{Key: "workspace", Value: "core/go-io/task-5"},
|
||||
core.Option{Key: "agent", Value: "claude"},
|
||||
))
|
||||
require.True(t, secondResult.OK)
|
||||
|
||||
secondInbox, ok := secondResult.Value.(MessageListOutput)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, 0, secondInbox.New)
|
||||
assert.Len(t, inboxEvents, 2)
|
||||
assert.Equal(t, 1, inboxEvents[0].New)
|
||||
assert.Equal(t, 1, inboxEvents[0].Total)
|
||||
assert.Equal(t, 0, inboxEvents[1].New)
|
||||
assert.Equal(t, 1, inboxEvents[1].Total)
|
||||
}
|
||||
|
||||
func TestMessage_MessageSend_Bad_MissingRequiredFields(t *testing.T) {
|
||||
s := newTestPrep(t)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue