// Package s3 provides an S3-backed implementation of the io.Medium interface. package s3 import ( "bytes" "context" "fmt" goio "io" "io/fs" "os" "path" "strings" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" coreerr "forge.lthn.ai/core/go/pkg/framework/core" ) // s3API is the subset of the S3 client API used by this package. // This allows for interface-based mocking in tests. type s3API interface { GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) DeleteObjects(ctx context.Context, params *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) HeadObject(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) CopyObject(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) } // Medium is an S3-backed storage backend implementing the io.Medium interface. type Medium struct { client s3API bucket string prefix string } // Option configures a Medium. type Option func(*Medium) // WithPrefix sets an optional key prefix for all operations. func WithPrefix(prefix string) Option { return func(m *Medium) { // Ensure prefix ends with "/" if non-empty if prefix != "" && !strings.HasSuffix(prefix, "/") { prefix += "/" } m.prefix = prefix } } // WithClient sets the S3 client for dependency injection. func WithClient(client *s3.Client) Option { return func(m *Medium) { m.client = client } } // withAPI sets the s3API interface directly (for testing with mocks). func withAPI(api s3API) Option { return func(m *Medium) { m.client = api } } // New creates a new S3 Medium for the given bucket. func New(bucket string, opts ...Option) (*Medium, error) { if bucket == "" { return nil, coreerr.E("s3.New", "bucket name is required", nil) } m := &Medium{bucket: bucket} for _, opt := range opts { opt(m) } if m.client == nil { return nil, coreerr.E("s3.New", "S3 client is required (use WithClient option)", nil) } return m, nil } // key returns the full S3 object key for a given path. func (m *Medium) key(p string) string { // Clean the path using a leading "/" to sandbox traversal attempts, // then strip the "/" prefix. This ensures ".." can't escape. clean := path.Clean("/" + p) if clean == "/" { clean = "" } clean = strings.TrimPrefix(clean, "/") if m.prefix == "" { return clean } if clean == "" { return m.prefix } return m.prefix + clean } // Read retrieves the content of a file as a string. func (m *Medium) Read(p string) (string, error) { key := m.key(p) if key == "" { return "", coreerr.E("s3.Read", "path is required", os.ErrInvalid) } out, err := m.client.GetObject(context.Background(), &s3.GetObjectInput{ Bucket: aws.String(m.bucket), Key: aws.String(key), }) if err != nil { return "", coreerr.E("s3.Read", "failed to get object: "+key, err) } defer out.Body.Close() data, err := goio.ReadAll(out.Body) if err != nil { return "", coreerr.E("s3.Read", "failed to read body: "+key, err) } return string(data), nil } // Write saves the given content to a file, overwriting it if it exists. func (m *Medium) Write(p, content string) error { key := m.key(p) if key == "" { return coreerr.E("s3.Write", "path is required", os.ErrInvalid) } _, err := m.client.PutObject(context.Background(), &s3.PutObjectInput{ Bucket: aws.String(m.bucket), Key: aws.String(key), Body: strings.NewReader(content), }) if err != nil { return coreerr.E("s3.Write", "failed to put object: "+key, err) } return nil } // EnsureDir is a no-op for S3 (S3 has no real directories). func (m *Medium) EnsureDir(_ string) error { return nil } // IsFile checks if a path exists and is a regular file (not a "directory" prefix). func (m *Medium) IsFile(p string) bool { key := m.key(p) if key == "" { return false } // A "file" in S3 is an object whose key does not end with "/" if strings.HasSuffix(key, "/") { return false } _, err := m.client.HeadObject(context.Background(), &s3.HeadObjectInput{ Bucket: aws.String(m.bucket), Key: aws.String(key), }) return err == nil } // FileGet is a convenience function that reads a file from the medium. func (m *Medium) FileGet(p string) (string, error) { return m.Read(p) } // FileSet is a convenience function that writes a file to the medium. func (m *Medium) FileSet(p, content string) error { return m.Write(p, content) } // Delete removes a single object. func (m *Medium) Delete(p string) error { key := m.key(p) if key == "" { return coreerr.E("s3.Delete", "path is required", os.ErrInvalid) } _, err := m.client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ Bucket: aws.String(m.bucket), Key: aws.String(key), }) if err != nil { return coreerr.E("s3.Delete", "failed to delete object: "+key, err) } return nil } // DeleteAll removes all objects under the given prefix. func (m *Medium) DeleteAll(p string) error { key := m.key(p) if key == "" { return coreerr.E("s3.DeleteAll", "path is required", os.ErrInvalid) } // First, try deleting the exact key _, _ = m.client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ Bucket: aws.String(m.bucket), Key: aws.String(key), }) // Then delete all objects under the prefix prefix := key if !strings.HasSuffix(prefix, "/") { prefix += "/" } paginator := true var continuationToken *string for paginator { listOut, err := m.client.ListObjectsV2(context.Background(), &s3.ListObjectsV2Input{ Bucket: aws.String(m.bucket), Prefix: aws.String(prefix), ContinuationToken: continuationToken, }) if err != nil { return coreerr.E("s3.DeleteAll", "failed to list objects: "+prefix, err) } if len(listOut.Contents) == 0 { break } objects := make([]types.ObjectIdentifier, len(listOut.Contents)) for i, obj := range listOut.Contents { objects[i] = types.ObjectIdentifier{Key: obj.Key} } _, err = m.client.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ Bucket: aws.String(m.bucket), Delete: &types.Delete{Objects: objects, Quiet: aws.Bool(true)}, }) if err != nil { return coreerr.E("s3.DeleteAll", "failed to delete objects", err) } if listOut.IsTruncated != nil && *listOut.IsTruncated { continuationToken = listOut.NextContinuationToken } else { paginator = false } } return nil } // Rename moves an object by copying then deleting the original. func (m *Medium) Rename(oldPath, newPath string) error { oldKey := m.key(oldPath) newKey := m.key(newPath) if oldKey == "" || newKey == "" { return coreerr.E("s3.Rename", "both old and new paths are required", os.ErrInvalid) } copySource := m.bucket + "/" + oldKey _, err := m.client.CopyObject(context.Background(), &s3.CopyObjectInput{ Bucket: aws.String(m.bucket), CopySource: aws.String(copySource), Key: aws.String(newKey), }) if err != nil { return coreerr.E("s3.Rename", "failed to copy object: "+oldKey+" -> "+newKey, err) } _, err = m.client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ Bucket: aws.String(m.bucket), Key: aws.String(oldKey), }) if err != nil { return coreerr.E("s3.Rename", "failed to delete source object: "+oldKey, err) } return nil } // List returns directory entries for the given path using ListObjectsV2 with delimiter. func (m *Medium) List(p string) ([]fs.DirEntry, error) { prefix := m.key(p) if prefix != "" && !strings.HasSuffix(prefix, "/") { prefix += "/" } var entries []fs.DirEntry listOut, err := m.client.ListObjectsV2(context.Background(), &s3.ListObjectsV2Input{ Bucket: aws.String(m.bucket), Prefix: aws.String(prefix), Delimiter: aws.String("/"), }) if err != nil { return nil, coreerr.E("s3.List", "failed to list objects: "+prefix, err) } // Common prefixes are "directories" for _, cp := range listOut.CommonPrefixes { if cp.Prefix == nil { continue } name := strings.TrimPrefix(*cp.Prefix, prefix) name = strings.TrimSuffix(name, "/") if name == "" { continue } entries = append(entries, &dirEntry{ name: name, isDir: true, mode: fs.ModeDir | 0755, info: &fileInfo{ name: name, isDir: true, mode: fs.ModeDir | 0755, }, }) } // Contents are "files" (excluding the prefix itself) for _, obj := range listOut.Contents { if obj.Key == nil { continue } name := strings.TrimPrefix(*obj.Key, prefix) if name == "" || strings.Contains(name, "/") { continue } var size int64 if obj.Size != nil { size = *obj.Size } var modTime time.Time if obj.LastModified != nil { modTime = *obj.LastModified } entries = append(entries, &dirEntry{ name: name, isDir: false, mode: 0644, info: &fileInfo{ name: name, size: size, mode: 0644, modTime: modTime, }, }) } return entries, nil } // Stat returns file information for the given path using HeadObject. func (m *Medium) Stat(p string) (fs.FileInfo, error) { key := m.key(p) if key == "" { return nil, coreerr.E("s3.Stat", "path is required", os.ErrInvalid) } out, err := m.client.HeadObject(context.Background(), &s3.HeadObjectInput{ Bucket: aws.String(m.bucket), Key: aws.String(key), }) if err != nil { return nil, coreerr.E("s3.Stat", "failed to head object: "+key, err) } var size int64 if out.ContentLength != nil { size = *out.ContentLength } var modTime time.Time if out.LastModified != nil { modTime = *out.LastModified } name := path.Base(key) return &fileInfo{ name: name, size: size, mode: 0644, modTime: modTime, }, nil } // Open opens the named file for reading. func (m *Medium) Open(p string) (fs.File, error) { key := m.key(p) if key == "" { return nil, coreerr.E("s3.Open", "path is required", os.ErrInvalid) } out, err := m.client.GetObject(context.Background(), &s3.GetObjectInput{ Bucket: aws.String(m.bucket), Key: aws.String(key), }) if err != nil { return nil, coreerr.E("s3.Open", "failed to get object: "+key, err) } data, err := goio.ReadAll(out.Body) out.Body.Close() if err != nil { return nil, coreerr.E("s3.Open", "failed to read body: "+key, err) } var size int64 if out.ContentLength != nil { size = *out.ContentLength } var modTime time.Time if out.LastModified != nil { modTime = *out.LastModified } return &s3File{ name: path.Base(key), content: data, size: size, modTime: modTime, }, nil } // Create creates or truncates the named file. Returns a writer that // uploads the content on Close. func (m *Medium) Create(p string) (goio.WriteCloser, error) { key := m.key(p) if key == "" { return nil, coreerr.E("s3.Create", "path is required", os.ErrInvalid) } return &s3WriteCloser{ medium: m, key: key, }, nil } // Append opens the named file for appending. It downloads the existing // content (if any) and re-uploads the combined content on Close. func (m *Medium) Append(p string) (goio.WriteCloser, error) { key := m.key(p) if key == "" { return nil, coreerr.E("s3.Append", "path is required", os.ErrInvalid) } var existing []byte out, err := m.client.GetObject(context.Background(), &s3.GetObjectInput{ Bucket: aws.String(m.bucket), Key: aws.String(key), }) if err == nil { existing, _ = goio.ReadAll(out.Body) out.Body.Close() } return &s3WriteCloser{ medium: m, key: key, data: existing, }, nil } // ReadStream returns a reader for the file content. func (m *Medium) ReadStream(p string) (goio.ReadCloser, error) { key := m.key(p) if key == "" { return nil, coreerr.E("s3.ReadStream", "path is required", os.ErrInvalid) } out, err := m.client.GetObject(context.Background(), &s3.GetObjectInput{ Bucket: aws.String(m.bucket), Key: aws.String(key), }) if err != nil { return nil, coreerr.E("s3.ReadStream", "failed to get object: "+key, err) } return out.Body, nil } // WriteStream returns a writer for the file content. Content is uploaded on Close. func (m *Medium) WriteStream(p string) (goio.WriteCloser, error) { return m.Create(p) } // Exists checks if a path exists (file or directory prefix). func (m *Medium) Exists(p string) bool { key := m.key(p) if key == "" { return false } // Check as an exact object _, err := m.client.HeadObject(context.Background(), &s3.HeadObjectInput{ Bucket: aws.String(m.bucket), Key: aws.String(key), }) if err == nil { return true } // Check as a "directory" prefix prefix := key if !strings.HasSuffix(prefix, "/") { prefix += "/" } listOut, err := m.client.ListObjectsV2(context.Background(), &s3.ListObjectsV2Input{ Bucket: aws.String(m.bucket), Prefix: aws.String(prefix), MaxKeys: aws.Int32(1), }) if err != nil { return false } return len(listOut.Contents) > 0 || len(listOut.CommonPrefixes) > 0 } // IsDir checks if a path exists and is a directory (has objects under it as a prefix). func (m *Medium) IsDir(p string) bool { key := m.key(p) if key == "" { return false } prefix := key if !strings.HasSuffix(prefix, "/") { prefix += "/" } listOut, err := m.client.ListObjectsV2(context.Background(), &s3.ListObjectsV2Input{ Bucket: aws.String(m.bucket), Prefix: aws.String(prefix), MaxKeys: aws.Int32(1), }) if err != nil { return false } return len(listOut.Contents) > 0 || len(listOut.CommonPrefixes) > 0 } // --- Internal types --- // fileInfo implements fs.FileInfo for S3 objects. type fileInfo struct { name string size int64 mode fs.FileMode modTime time.Time isDir bool } func (fi *fileInfo) Name() string { return fi.name } func (fi *fileInfo) Size() int64 { return fi.size } func (fi *fileInfo) Mode() fs.FileMode { return fi.mode } func (fi *fileInfo) ModTime() time.Time { return fi.modTime } func (fi *fileInfo) IsDir() bool { return fi.isDir } func (fi *fileInfo) Sys() any { return nil } // dirEntry implements fs.DirEntry for S3 listings. type dirEntry struct { name string isDir bool mode fs.FileMode info fs.FileInfo } func (de *dirEntry) Name() string { return de.name } func (de *dirEntry) IsDir() bool { return de.isDir } func (de *dirEntry) Type() fs.FileMode { return de.mode.Type() } func (de *dirEntry) Info() (fs.FileInfo, error) { return de.info, nil } // s3File implements fs.File for S3 objects. type s3File struct { name string content []byte offset int64 size int64 modTime time.Time } func (f *s3File) Stat() (fs.FileInfo, error) { return &fileInfo{ name: f.name, size: int64(len(f.content)), mode: 0644, modTime: f.modTime, }, nil } func (f *s3File) Read(b []byte) (int, error) { if f.offset >= int64(len(f.content)) { return 0, goio.EOF } n := copy(b, f.content[f.offset:]) f.offset += int64(n) return n, nil } func (f *s3File) Close() error { return nil } // s3WriteCloser buffers writes and uploads to S3 on Close. type s3WriteCloser struct { medium *Medium key string data []byte } func (w *s3WriteCloser) Write(p []byte) (int, error) { w.data = append(w.data, p...) return len(p), nil } func (w *s3WriteCloser) Close() error { _, err := w.medium.client.PutObject(context.Background(), &s3.PutObjectInput{ Bucket: aws.String(w.medium.bucket), Key: aws.String(w.key), Body: bytes.NewReader(w.data), }) if err != nil { return fmt.Errorf("s3: failed to upload on close: %w", err) } return nil }