fix(agent/brain): lock forget+supersede paths against late index writes
Cache::lock keyed by memory id wraps the delete path in BrainService:: forget(); supersede cleanup in remember() lifted to the same idiom. forget() now ALWAYS queues DeleteFromIndex on a successful delete (was previously skipped when indexed_at was null — left late writes from stale preloaded models a window to land entries after the underlying memory was gone). Index write paths (qdrantUpsert / elasticIndex) re-check that the memory row still exists before writing — defence-in-depth against any future caller that holds a stale model reference past a forget. Pest coverage extended in SupersedeForgetIndexCleanupTest: - never-indexed forget queues cleanup - late stale-model index writes are skipped after forget - never-indexed supersede cleanup queues deletion - late stale-model index writes are skipped after supersede Co-authored-by: Codex <noreply@openai.com> Closes tasks.lthn.sh/view.php?id=999
This commit is contained in:
parent
167be2f396
commit
b6565263f3
2 changed files with 213 additions and 52 deletions
|
|
@ -9,9 +9,11 @@ namespace Core\Mod\Agentic\Services;
|
|||
use Core\Mod\Agentic\Jobs\DeleteFromIndex;
|
||||
use Core\Mod\Agentic\Jobs\EmbedMemory;
|
||||
use Core\Mod\Agentic\Models\BrainMemory;
|
||||
use Illuminate\Contracts\Cache\LockTimeoutException;
|
||||
use Illuminate\Http\Client\ConnectionException;
|
||||
use Illuminate\Http\Client\PendingRequest;
|
||||
use Illuminate\Http\Client\Response;
|
||||
use Illuminate\Support\Facades\Cache;
|
||||
use Illuminate\Support\Facades\DB;
|
||||
use Illuminate\Support\Facades\Http;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
|
|
@ -26,6 +28,10 @@ class BrainService
|
|||
|
||||
private const MAX_RETRY_DELAY_MS = 30000;
|
||||
|
||||
private const MEMORY_LOCK_TTL_SECONDS = 10;
|
||||
|
||||
private const MEMORY_LOCK_WAIT_SECONDS = 5;
|
||||
|
||||
private string $qdrantApiKey;
|
||||
|
||||
public function __construct(
|
||||
|
|
@ -113,26 +119,22 @@ class BrainService
|
|||
{
|
||||
$attributes['indexed_at'] = null;
|
||||
$cleanupIds = [];
|
||||
$supersededId = $this->normaliseMemoryId($attributes['supersedes_id'] ?? null);
|
||||
$remember = function () use ($attributes, &$cleanupIds): BrainMemory {
|
||||
return DB::connection('brain')->transaction(function () use ($attributes, &$cleanupIds): BrainMemory {
|
||||
$memory = new BrainMemory;
|
||||
$memory->fill($attributes);
|
||||
$memory->save();
|
||||
|
||||
$memory = DB::connection('brain')->transaction(function () use ($attributes, &$cleanupIds) {
|
||||
$memory = new BrainMemory;
|
||||
$memory->fill($attributes);
|
||||
$memory->save();
|
||||
$this->deleteSupersededMemory($memory->supersedes_id, $cleanupIds);
|
||||
|
||||
if ($memory->supersedes_id) {
|
||||
$superseded = BrainMemory::query()->find($memory->supersedes_id);
|
||||
return $memory;
|
||||
});
|
||||
};
|
||||
|
||||
if ($superseded instanceof BrainMemory) {
|
||||
if ($superseded->indexed_at !== null) {
|
||||
$cleanupIds[] = $superseded->id;
|
||||
}
|
||||
|
||||
$superseded->delete();
|
||||
}
|
||||
}
|
||||
|
||||
return $memory;
|
||||
});
|
||||
$memory = $supersededId !== null
|
||||
? $this->withMemoryIndexLock($supersededId, $remember)
|
||||
: $remember();
|
||||
|
||||
foreach ($cleanupIds as $cleanupId) {
|
||||
DeleteFromIndex::dispatch($cleanupId);
|
||||
|
|
@ -239,22 +241,9 @@ class BrainService
|
|||
*/
|
||||
public function forget(string $id): void
|
||||
{
|
||||
$memory = null;
|
||||
$deleted = DB::connection('brain')->transaction(function () use ($id, &$memory): int {
|
||||
$memory = BrainMemory::query()
|
||||
->select(['id', 'indexed_at'])
|
||||
->find($id);
|
||||
$deleted = $this->withMemoryIndexLock($id, fn (): bool => $this->deleteMemoryRecord($id));
|
||||
|
||||
if (! $memory instanceof BrainMemory) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
$memory->delete();
|
||||
|
||||
return 1;
|
||||
});
|
||||
|
||||
if ($deleted > 0 && $memory instanceof BrainMemory && $memory->indexed_at !== null) {
|
||||
if ($deleted) {
|
||||
DeleteFromIndex::dispatch($id);
|
||||
}
|
||||
}
|
||||
|
|
@ -313,13 +302,21 @@ class BrainService
|
|||
*/
|
||||
public function elasticIndex(BrainMemory $memory): void
|
||||
{
|
||||
$response = $this->http(10)
|
||||
->put($this->elasticDocumentUrl($memory->id), $this->buildElasticDocument($memory));
|
||||
$this->withMemoryIndexLock($memory->id, function () use ($memory): void {
|
||||
$freshMemory = BrainMemory::query()->find($memory->id);
|
||||
|
||||
if (! $response->successful()) {
|
||||
Log::error("Elasticsearch index failed: {$response->status()}", ['id' => $memory->id, 'body' => $response->body()]);
|
||||
throw new \RuntimeException("Elasticsearch index failed: {$response->status()}");
|
||||
}
|
||||
if (! $freshMemory instanceof BrainMemory) {
|
||||
return;
|
||||
}
|
||||
|
||||
$response = $this->http(10)
|
||||
->put($this->elasticDocumentUrl($freshMemory->id), $this->buildElasticDocument($freshMemory));
|
||||
|
||||
if (! $response->successful()) {
|
||||
Log::error("Elasticsearch index failed: {$response->status()}", ['id' => $freshMemory->id, 'body' => $response->body()]);
|
||||
throw new \RuntimeException("Elasticsearch index failed: {$response->status()}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -626,16 +623,98 @@ class BrainService
|
|||
*/
|
||||
public function qdrantUpsert(array $points): void
|
||||
{
|
||||
$response = $this->retryableHttp(10, fn (PendingRequest $request): Response => $request->put(
|
||||
"{$this->qdrantUrl}/collections/{$this->collection}/points",
|
||||
[
|
||||
'points' => $points,
|
||||
],
|
||||
));
|
||||
foreach ($points as $point) {
|
||||
$memoryId = $this->normaliseMemoryId($point['id'] ?? null);
|
||||
|
||||
if (! $response->successful()) {
|
||||
Log::error("Qdrant upsert failed: {$response->status()}", ['body' => $response->body()]);
|
||||
throw new \RuntimeException("Qdrant upsert failed: {$response->status()}");
|
||||
if ($memoryId === null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$this->withMemoryIndexLock($memoryId, function () use ($memoryId, $point): void {
|
||||
if (! $this->memoryExists($memoryId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$response = $this->retryableHttp(10, fn (PendingRequest $request): Response => $request->put(
|
||||
"{$this->qdrantUrl}/collections/{$this->collection}/points",
|
||||
[
|
||||
'points' => [$point],
|
||||
],
|
||||
));
|
||||
|
||||
if (! $response->successful()) {
|
||||
Log::error("Qdrant upsert failed: {$response->status()}", ['body' => $response->body()]);
|
||||
throw new \RuntimeException("Qdrant upsert failed: {$response->status()}");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private function deleteMemoryRecord(string $id): bool
|
||||
{
|
||||
return DB::connection('brain')->transaction(function () use ($id): bool {
|
||||
$memory = BrainMemory::query()
|
||||
->select(['id'])
|
||||
->find($id);
|
||||
|
||||
if (! $memory instanceof BrainMemory) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$memory->delete();
|
||||
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<int, string> $cleanupIds
|
||||
*/
|
||||
private function deleteSupersededMemory(?string $supersededId, array &$cleanupIds): void
|
||||
{
|
||||
if ($supersededId === null || $supersededId === '') {
|
||||
return;
|
||||
}
|
||||
|
||||
$superseded = BrainMemory::query()->find($supersededId);
|
||||
|
||||
if (! $superseded instanceof BrainMemory) {
|
||||
return;
|
||||
}
|
||||
|
||||
$cleanupIds[] = $superseded->id;
|
||||
$superseded->delete();
|
||||
}
|
||||
|
||||
private function memoryExists(string $id): bool
|
||||
{
|
||||
return BrainMemory::query()->whereKey($id)->exists();
|
||||
}
|
||||
|
||||
private function normaliseMemoryId(mixed $memoryId): ?string
|
||||
{
|
||||
return is_string($memoryId) && $memoryId !== ''
|
||||
? $memoryId
|
||||
: null;
|
||||
}
|
||||
|
||||
private function memoryLockKey(string $memoryId): string
|
||||
{
|
||||
return 'brain:memory:index:'.$memoryId;
|
||||
}
|
||||
|
||||
private function withMemoryIndexLock(string $memoryId, callable $callback): mixed
|
||||
{
|
||||
$lock = Cache::lock($this->memoryLockKey($memoryId), self::MEMORY_LOCK_TTL_SECONDS);
|
||||
|
||||
try {
|
||||
$lock->block(self::MEMORY_LOCK_WAIT_SECONDS);
|
||||
|
||||
return $callback();
|
||||
} catch (LockTimeoutException $exception) {
|
||||
throw new \RuntimeException("Timed out acquiring memory index lock: {$memoryId}", 0, $exception);
|
||||
} finally {
|
||||
rescue(static fn (): mixed => $lock->release(), report: false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,11 +8,19 @@ use Core\Mod\Agentic\Jobs\DeleteFromIndex;
|
|||
use Core\Mod\Agentic\Jobs\EmbedMemory;
|
||||
use Core\Mod\Agentic\Models\BrainMemory;
|
||||
use Core\Mod\Agentic\Services\BrainService;
|
||||
use Illuminate\Support\Facades\Http;
|
||||
use Illuminate\Support\Facades\Queue;
|
||||
|
||||
function cleanupBrainService(): BrainService
|
||||
{
|
||||
return new BrainService;
|
||||
return new BrainService(
|
||||
ollamaUrl: 'https://ollama.test',
|
||||
qdrantUrl: 'https://qdrant.test',
|
||||
collection: 'openbrain',
|
||||
embeddingModel: 'embeddinggemma',
|
||||
verifySsl: false,
|
||||
elasticsearchUrl: 'https://elasticsearch.test',
|
||||
);
|
||||
}
|
||||
|
||||
function cleanupMemory(array $attributes = []): BrainMemory
|
||||
|
|
@ -28,6 +36,25 @@ function cleanupMemory(array $attributes = []): BrainMemory
|
|||
], $attributes));
|
||||
}
|
||||
|
||||
function cleanupQdrantPoint(BrainService $brain, BrainMemory $memory): array
|
||||
{
|
||||
$point = $brain->buildQdrantPayload($memory->id, [
|
||||
'workspace_id' => $memory->workspace_id,
|
||||
'org' => $memory->getAttribute('org'),
|
||||
'project' => $memory->project,
|
||||
'agent_id' => $memory->agent_id,
|
||||
'type' => $memory->type,
|
||||
'tags' => $memory->tags ?? [],
|
||||
'confidence' => $memory->confidence,
|
||||
'source' => $memory->source ?? 'manual',
|
||||
'content' => $memory->content,
|
||||
'created_at' => $memory->created_at?->toIso8601String(),
|
||||
]);
|
||||
$point['vector'] = array_fill(0, 768, 0.125);
|
||||
|
||||
return $point;
|
||||
}
|
||||
|
||||
test('SupersedeForgetIndexCleanup_forget_Good_dispatches_delete_from_index', function (): void {
|
||||
Queue::fake();
|
||||
$memory = cleanupMemory(['indexed_at' => now()]);
|
||||
|
|
@ -40,7 +67,7 @@ test('SupersedeForgetIndexCleanup_forget_Good_dispatches_delete_from_index', fun
|
|||
Queue::assertPushed(DeleteFromIndex::class, fn (DeleteFromIndex $job): bool => $job->memoryId === $memory->id);
|
||||
});
|
||||
|
||||
test('SupersedeForgetIndexCleanup_forget_Bad_skips_delete_from_index_for_never_indexed_memory', function (): void {
|
||||
test('SupersedeForgetIndexCleanup_forget_Bad_dispatches_delete_from_index_for_never_indexed_memory', function (): void {
|
||||
Queue::fake();
|
||||
$memory = cleanupMemory(['indexed_at' => null]);
|
||||
|
||||
|
|
@ -49,7 +76,26 @@ test('SupersedeForgetIndexCleanup_forget_Bad_skips_delete_from_index_for_never_i
|
|||
expect(BrainMemory::find($memory->id))->toBeNull()
|
||||
->and(BrainMemory::withTrashed()->find($memory->id)?->trashed())->toBeTrue();
|
||||
|
||||
Queue::assertNotPushed(DeleteFromIndex::class, fn (DeleteFromIndex $job): bool => $job->memoryId === $memory->id);
|
||||
Queue::assertPushed(DeleteFromIndex::class, fn (DeleteFromIndex $job): bool => $job->memoryId === $memory->id);
|
||||
});
|
||||
|
||||
test('SupersedeForgetIndexCleanup_forget_Ugly_skips_late_index_writes_for_deleted_memory', function (): void {
|
||||
Queue::fake();
|
||||
Http::fake();
|
||||
$brain = cleanupBrainService();
|
||||
$memory = cleanupMemory(['indexed_at' => null]);
|
||||
$staleMemory = BrainMemory::query()->findOrFail($memory->id);
|
||||
|
||||
$brain->forget($memory->id);
|
||||
$brain->qdrantUpsert([cleanupQdrantPoint($brain, $staleMemory)]);
|
||||
$brain->elasticIndex($staleMemory);
|
||||
|
||||
expect(BrainMemory::count())->toBe(0)
|
||||
->and(BrainMemory::withTrashed()->find($memory->id)?->trashed())->toBeTrue();
|
||||
|
||||
Queue::assertPushed(DeleteFromIndex::class, 1);
|
||||
Queue::assertPushed(DeleteFromIndex::class, fn (DeleteFromIndex $job): bool => $job->memoryId === $memory->id);
|
||||
Http::assertNothingSent();
|
||||
});
|
||||
|
||||
test('SupersedeForgetIndexCleanup_supersede_Bad_dispatches_cleanup_for_old_indexed_memory', function (): void {
|
||||
|
|
@ -80,7 +126,7 @@ test('SupersedeForgetIndexCleanup_supersede_Bad_dispatches_cleanup_for_old_index
|
|||
Queue::assertPushed(EmbedMemory::class, fn (EmbedMemory $job): bool => $job->memoryId === $newMemory->id);
|
||||
});
|
||||
|
||||
test('SupersedeForgetIndexCleanup_supersede_Ugly_skips_cleanup_for_never_indexed_memory', function (): void {
|
||||
test('SupersedeForgetIndexCleanup_supersede_Ugly_dispatches_cleanup_for_never_indexed_memory', function (): void {
|
||||
Queue::fake();
|
||||
$workspace = createWorkspace();
|
||||
$oldMemory = cleanupMemory([
|
||||
|
|
@ -104,6 +150,42 @@ test('SupersedeForgetIndexCleanup_supersede_Ugly_skips_cleanup_for_never_indexed
|
|||
->and(BrainMemory::withTrashed()->find($oldMemory->id)?->trashed())->toBeTrue()
|
||||
->and($newMemory->indexed_at)->toBeNull();
|
||||
|
||||
Queue::assertNotPushed(DeleteFromIndex::class, fn (DeleteFromIndex $job): bool => $job->memoryId === $oldMemory->id);
|
||||
Queue::assertPushed(DeleteFromIndex::class, fn (DeleteFromIndex $job): bool => $job->memoryId === $oldMemory->id);
|
||||
Queue::assertPushed(EmbedMemory::class, fn (EmbedMemory $job): bool => $job->memoryId === $newMemory->id);
|
||||
});
|
||||
|
||||
test('SupersedeForgetIndexCleanup_supersede_Ugly_skips_late_index_writes_for_deleted_memory', function (): void {
|
||||
Queue::fake();
|
||||
Http::fake();
|
||||
$brain = cleanupBrainService();
|
||||
$workspace = createWorkspace();
|
||||
$oldMemory = cleanupMemory([
|
||||
'workspace_id' => $workspace->id,
|
||||
'content' => 'Old unindexed memory for late write test.',
|
||||
'indexed_at' => null,
|
||||
]);
|
||||
$staleOldMemory = BrainMemory::query()->findOrFail($oldMemory->id);
|
||||
|
||||
$newMemory = $brain->remember([
|
||||
'workspace_id' => $workspace->id,
|
||||
'agent_id' => 'virgil',
|
||||
'type' => 'observation',
|
||||
'content' => 'Superseding memory for late write test.',
|
||||
'confidence' => 0.9,
|
||||
'org' => 'core',
|
||||
'project' => 'agent',
|
||||
'supersedes_id' => $oldMemory->id,
|
||||
]);
|
||||
|
||||
$brain->qdrantUpsert([cleanupQdrantPoint($brain, $staleOldMemory)]);
|
||||
$brain->elasticIndex($staleOldMemory);
|
||||
|
||||
expect(BrainMemory::count())->toBe(1)
|
||||
->and(BrainMemory::find($oldMemory->id))->toBeNull()
|
||||
->and(BrainMemory::withTrashed()->find($oldMemory->id)?->trashed())->toBeTrue()
|
||||
->and(BrainMemory::find($newMemory->id))->not->toBeNull();
|
||||
|
||||
Queue::assertPushed(DeleteFromIndex::class, fn (DeleteFromIndex $job): bool => $job->memoryId === $oldMemory->id);
|
||||
Queue::assertPushed(EmbedMemory::class, fn (EmbedMemory $job): bool => $job->memoryId === $newMemory->id);
|
||||
Http::assertNothingSent();
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue