From b6565263f304102c67344fee95e2de9c411ede2c Mon Sep 17 00:00:00 2001 From: Snider Date: Sat, 25 Apr 2026 18:04:55 +0100 Subject: [PATCH] fix(agent/brain): lock forget+supersede paths against late index writes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cache::lock keyed by memory id wraps the delete path in BrainService:: forget(); supersede cleanup in remember() lifted to the same idiom. forget() now ALWAYS queues DeleteFromIndex on a successful delete (was previously skipped when indexed_at was null — left late writes from stale preloaded models a window to land entries after the underlying memory was gone). Index write paths (qdrantUpsert / elasticIndex) re-check that the memory row still exists before writing — defence-in-depth against any future caller that holds a stale model reference past a forget. Pest coverage extended in SupersedeForgetIndexCleanupTest: - never-indexed forget queues cleanup - late stale-model index writes are skipped after forget - never-indexed supersede cleanup queues deletion - late stale-model index writes are skipped after supersede Co-authored-by: Codex Closes tasks.lthn.sh/view.php?id=999 --- php/Services/BrainService.php | 173 +++++++++++++----- .../Brain/SupersedeForgetIndexCleanupTest.php | 92 +++++++++- 2 files changed, 213 insertions(+), 52 deletions(-) diff --git a/php/Services/BrainService.php b/php/Services/BrainService.php index 5bcee7b..edfbf83 100644 --- a/php/Services/BrainService.php +++ b/php/Services/BrainService.php @@ -9,9 +9,11 @@ namespace Core\Mod\Agentic\Services; use Core\Mod\Agentic\Jobs\DeleteFromIndex; use Core\Mod\Agentic\Jobs\EmbedMemory; use Core\Mod\Agentic\Models\BrainMemory; +use Illuminate\Contracts\Cache\LockTimeoutException; use Illuminate\Http\Client\ConnectionException; use Illuminate\Http\Client\PendingRequest; use Illuminate\Http\Client\Response; +use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\Http; use Illuminate\Support\Facades\Log; @@ -26,6 +28,10 @@ class BrainService private const MAX_RETRY_DELAY_MS = 30000; + private const MEMORY_LOCK_TTL_SECONDS = 10; + + private const MEMORY_LOCK_WAIT_SECONDS = 5; + private string $qdrantApiKey; public function __construct( @@ -113,26 +119,22 @@ class BrainService { $attributes['indexed_at'] = null; $cleanupIds = []; + $supersededId = $this->normaliseMemoryId($attributes['supersedes_id'] ?? null); + $remember = function () use ($attributes, &$cleanupIds): BrainMemory { + return DB::connection('brain')->transaction(function () use ($attributes, &$cleanupIds): BrainMemory { + $memory = new BrainMemory; + $memory->fill($attributes); + $memory->save(); - $memory = DB::connection('brain')->transaction(function () use ($attributes, &$cleanupIds) { - $memory = new BrainMemory; - $memory->fill($attributes); - $memory->save(); + $this->deleteSupersededMemory($memory->supersedes_id, $cleanupIds); - if ($memory->supersedes_id) { - $superseded = BrainMemory::query()->find($memory->supersedes_id); + return $memory; + }); + }; - if ($superseded instanceof BrainMemory) { - if ($superseded->indexed_at !== null) { - $cleanupIds[] = $superseded->id; - } - - $superseded->delete(); - } - } - - return $memory; - }); + $memory = $supersededId !== null + ? $this->withMemoryIndexLock($supersededId, $remember) + : $remember(); foreach ($cleanupIds as $cleanupId) { DeleteFromIndex::dispatch($cleanupId); @@ -239,22 +241,9 @@ class BrainService */ public function forget(string $id): void { - $memory = null; - $deleted = DB::connection('brain')->transaction(function () use ($id, &$memory): int { - $memory = BrainMemory::query() - ->select(['id', 'indexed_at']) - ->find($id); + $deleted = $this->withMemoryIndexLock($id, fn (): bool => $this->deleteMemoryRecord($id)); - if (! $memory instanceof BrainMemory) { - return 0; - } - - $memory->delete(); - - return 1; - }); - - if ($deleted > 0 && $memory instanceof BrainMemory && $memory->indexed_at !== null) { + if ($deleted) { DeleteFromIndex::dispatch($id); } } @@ -313,13 +302,21 @@ class BrainService */ public function elasticIndex(BrainMemory $memory): void { - $response = $this->http(10) - ->put($this->elasticDocumentUrl($memory->id), $this->buildElasticDocument($memory)); + $this->withMemoryIndexLock($memory->id, function () use ($memory): void { + $freshMemory = BrainMemory::query()->find($memory->id); - if (! $response->successful()) { - Log::error("Elasticsearch index failed: {$response->status()}", ['id' => $memory->id, 'body' => $response->body()]); - throw new \RuntimeException("Elasticsearch index failed: {$response->status()}"); - } + if (! $freshMemory instanceof BrainMemory) { + return; + } + + $response = $this->http(10) + ->put($this->elasticDocumentUrl($freshMemory->id), $this->buildElasticDocument($freshMemory)); + + if (! $response->successful()) { + Log::error("Elasticsearch index failed: {$response->status()}", ['id' => $freshMemory->id, 'body' => $response->body()]); + throw new \RuntimeException("Elasticsearch index failed: {$response->status()}"); + } + }); } /** @@ -626,16 +623,98 @@ class BrainService */ public function qdrantUpsert(array $points): void { - $response = $this->retryableHttp(10, fn (PendingRequest $request): Response => $request->put( - "{$this->qdrantUrl}/collections/{$this->collection}/points", - [ - 'points' => $points, - ], - )); + foreach ($points as $point) { + $memoryId = $this->normaliseMemoryId($point['id'] ?? null); - if (! $response->successful()) { - Log::error("Qdrant upsert failed: {$response->status()}", ['body' => $response->body()]); - throw new \RuntimeException("Qdrant upsert failed: {$response->status()}"); + if ($memoryId === null) { + continue; + } + + $this->withMemoryIndexLock($memoryId, function () use ($memoryId, $point): void { + if (! $this->memoryExists($memoryId)) { + return; + } + + $response = $this->retryableHttp(10, fn (PendingRequest $request): Response => $request->put( + "{$this->qdrantUrl}/collections/{$this->collection}/points", + [ + 'points' => [$point], + ], + )); + + if (! $response->successful()) { + Log::error("Qdrant upsert failed: {$response->status()}", ['body' => $response->body()]); + throw new \RuntimeException("Qdrant upsert failed: {$response->status()}"); + } + }); + } + } + + private function deleteMemoryRecord(string $id): bool + { + return DB::connection('brain')->transaction(function () use ($id): bool { + $memory = BrainMemory::query() + ->select(['id']) + ->find($id); + + if (! $memory instanceof BrainMemory) { + return false; + } + + $memory->delete(); + + return true; + }); + } + + /** + * @param array $cleanupIds + */ + private function deleteSupersededMemory(?string $supersededId, array &$cleanupIds): void + { + if ($supersededId === null || $supersededId === '') { + return; + } + + $superseded = BrainMemory::query()->find($supersededId); + + if (! $superseded instanceof BrainMemory) { + return; + } + + $cleanupIds[] = $superseded->id; + $superseded->delete(); + } + + private function memoryExists(string $id): bool + { + return BrainMemory::query()->whereKey($id)->exists(); + } + + private function normaliseMemoryId(mixed $memoryId): ?string + { + return is_string($memoryId) && $memoryId !== '' + ? $memoryId + : null; + } + + private function memoryLockKey(string $memoryId): string + { + return 'brain:memory:index:'.$memoryId; + } + + private function withMemoryIndexLock(string $memoryId, callable $callback): mixed + { + $lock = Cache::lock($this->memoryLockKey($memoryId), self::MEMORY_LOCK_TTL_SECONDS); + + try { + $lock->block(self::MEMORY_LOCK_WAIT_SECONDS); + + return $callback(); + } catch (LockTimeoutException $exception) { + throw new \RuntimeException("Timed out acquiring memory index lock: {$memoryId}", 0, $exception); + } finally { + rescue(static fn (): mixed => $lock->release(), report: false); } } diff --git a/php/tests/Feature/Brain/SupersedeForgetIndexCleanupTest.php b/php/tests/Feature/Brain/SupersedeForgetIndexCleanupTest.php index 066cbe0..3150d00 100644 --- a/php/tests/Feature/Brain/SupersedeForgetIndexCleanupTest.php +++ b/php/tests/Feature/Brain/SupersedeForgetIndexCleanupTest.php @@ -8,11 +8,19 @@ use Core\Mod\Agentic\Jobs\DeleteFromIndex; use Core\Mod\Agentic\Jobs\EmbedMemory; use Core\Mod\Agentic\Models\BrainMemory; use Core\Mod\Agentic\Services\BrainService; +use Illuminate\Support\Facades\Http; use Illuminate\Support\Facades\Queue; function cleanupBrainService(): BrainService { - return new BrainService; + return new BrainService( + ollamaUrl: 'https://ollama.test', + qdrantUrl: 'https://qdrant.test', + collection: 'openbrain', + embeddingModel: 'embeddinggemma', + verifySsl: false, + elasticsearchUrl: 'https://elasticsearch.test', + ); } function cleanupMemory(array $attributes = []): BrainMemory @@ -28,6 +36,25 @@ function cleanupMemory(array $attributes = []): BrainMemory ], $attributes)); } +function cleanupQdrantPoint(BrainService $brain, BrainMemory $memory): array +{ + $point = $brain->buildQdrantPayload($memory->id, [ + 'workspace_id' => $memory->workspace_id, + 'org' => $memory->getAttribute('org'), + 'project' => $memory->project, + 'agent_id' => $memory->agent_id, + 'type' => $memory->type, + 'tags' => $memory->tags ?? [], + 'confidence' => $memory->confidence, + 'source' => $memory->source ?? 'manual', + 'content' => $memory->content, + 'created_at' => $memory->created_at?->toIso8601String(), + ]); + $point['vector'] = array_fill(0, 768, 0.125); + + return $point; +} + test('SupersedeForgetIndexCleanup_forget_Good_dispatches_delete_from_index', function (): void { Queue::fake(); $memory = cleanupMemory(['indexed_at' => now()]); @@ -40,7 +67,7 @@ test('SupersedeForgetIndexCleanup_forget_Good_dispatches_delete_from_index', fun Queue::assertPushed(DeleteFromIndex::class, fn (DeleteFromIndex $job): bool => $job->memoryId === $memory->id); }); -test('SupersedeForgetIndexCleanup_forget_Bad_skips_delete_from_index_for_never_indexed_memory', function (): void { +test('SupersedeForgetIndexCleanup_forget_Bad_dispatches_delete_from_index_for_never_indexed_memory', function (): void { Queue::fake(); $memory = cleanupMemory(['indexed_at' => null]); @@ -49,7 +76,26 @@ test('SupersedeForgetIndexCleanup_forget_Bad_skips_delete_from_index_for_never_i expect(BrainMemory::find($memory->id))->toBeNull() ->and(BrainMemory::withTrashed()->find($memory->id)?->trashed())->toBeTrue(); - Queue::assertNotPushed(DeleteFromIndex::class, fn (DeleteFromIndex $job): bool => $job->memoryId === $memory->id); + Queue::assertPushed(DeleteFromIndex::class, fn (DeleteFromIndex $job): bool => $job->memoryId === $memory->id); +}); + +test('SupersedeForgetIndexCleanup_forget_Ugly_skips_late_index_writes_for_deleted_memory', function (): void { + Queue::fake(); + Http::fake(); + $brain = cleanupBrainService(); + $memory = cleanupMemory(['indexed_at' => null]); + $staleMemory = BrainMemory::query()->findOrFail($memory->id); + + $brain->forget($memory->id); + $brain->qdrantUpsert([cleanupQdrantPoint($brain, $staleMemory)]); + $brain->elasticIndex($staleMemory); + + expect(BrainMemory::count())->toBe(0) + ->and(BrainMemory::withTrashed()->find($memory->id)?->trashed())->toBeTrue(); + + Queue::assertPushed(DeleteFromIndex::class, 1); + Queue::assertPushed(DeleteFromIndex::class, fn (DeleteFromIndex $job): bool => $job->memoryId === $memory->id); + Http::assertNothingSent(); }); test('SupersedeForgetIndexCleanup_supersede_Bad_dispatches_cleanup_for_old_indexed_memory', function (): void { @@ -80,7 +126,7 @@ test('SupersedeForgetIndexCleanup_supersede_Bad_dispatches_cleanup_for_old_index Queue::assertPushed(EmbedMemory::class, fn (EmbedMemory $job): bool => $job->memoryId === $newMemory->id); }); -test('SupersedeForgetIndexCleanup_supersede_Ugly_skips_cleanup_for_never_indexed_memory', function (): void { +test('SupersedeForgetIndexCleanup_supersede_Ugly_dispatches_cleanup_for_never_indexed_memory', function (): void { Queue::fake(); $workspace = createWorkspace(); $oldMemory = cleanupMemory([ @@ -104,6 +150,42 @@ test('SupersedeForgetIndexCleanup_supersede_Ugly_skips_cleanup_for_never_indexed ->and(BrainMemory::withTrashed()->find($oldMemory->id)?->trashed())->toBeTrue() ->and($newMemory->indexed_at)->toBeNull(); - Queue::assertNotPushed(DeleteFromIndex::class, fn (DeleteFromIndex $job): bool => $job->memoryId === $oldMemory->id); + Queue::assertPushed(DeleteFromIndex::class, fn (DeleteFromIndex $job): bool => $job->memoryId === $oldMemory->id); Queue::assertPushed(EmbedMemory::class, fn (EmbedMemory $job): bool => $job->memoryId === $newMemory->id); }); + +test('SupersedeForgetIndexCleanup_supersede_Ugly_skips_late_index_writes_for_deleted_memory', function (): void { + Queue::fake(); + Http::fake(); + $brain = cleanupBrainService(); + $workspace = createWorkspace(); + $oldMemory = cleanupMemory([ + 'workspace_id' => $workspace->id, + 'content' => 'Old unindexed memory for late write test.', + 'indexed_at' => null, + ]); + $staleOldMemory = BrainMemory::query()->findOrFail($oldMemory->id); + + $newMemory = $brain->remember([ + 'workspace_id' => $workspace->id, + 'agent_id' => 'virgil', + 'type' => 'observation', + 'content' => 'Superseding memory for late write test.', + 'confidence' => 0.9, + 'org' => 'core', + 'project' => 'agent', + 'supersedes_id' => $oldMemory->id, + ]); + + $brain->qdrantUpsert([cleanupQdrantPoint($brain, $staleOldMemory)]); + $brain->elasticIndex($staleOldMemory); + + expect(BrainMemory::count())->toBe(1) + ->and(BrainMemory::find($oldMemory->id))->toBeNull() + ->and(BrainMemory::withTrashed()->find($oldMemory->id)?->trashed())->toBeTrue() + ->and(BrainMemory::find($newMemory->id))->not->toBeNull(); + + Queue::assertPushed(DeleteFromIndex::class, fn (DeleteFromIndex $job): bool => $job->memoryId === $oldMemory->id); + Queue::assertPushed(EmbedMemory::class, fn (EmbedMemory $job): bool => $job->memoryId === $newMemory->id); + Http::assertNothingSent(); +});