Skip to content

Stream optional interfaces #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7d500f6
create stream-related types
ahouene Jul 26, 2022
f52f88c
implement ReaderStorage and WriterStorage for fs backend
ahouene Jul 26, 2022
f75ec98
implement ReaderStorage for S3 backend
ahouene Jul 26, 2022
7ced814
add tests for ReaderStorage and WriterStorage
ahouene Jul 26, 2022
0e680df
more robust implementation of general WriterStorage wrapper
ahouene Oct 25, 2022
c4e0889
fix WriterStorage doc typo
ahouene Oct 25, 2022
60bf026
Merge remote-tracking branch 'upstream/main' into stream-optional-int…
ahouene Nov 17, 2022
1c992f8
fix writer wrapper mutex usage
ahouene Nov 17, 2022
a356b79
show that param is name in ReaderStorage
ahouene Feb 15, 2023
6d5554e
replace reader wrapper struct with io.NopCloser
ahouene Feb 15, 2023
08a90c3
fix "mixed name and unnamed parameters" syntax error
ahouene Feb 15, 2023
51c1d60
replace errorWClosed with ErrClosed
ahouene Feb 15, 2023
61ffabb
express intent to check name validity on stream wrapper struct
ahouene Feb 15, 2023
ede6243
rename default `writer` wrapper struct to `fallbackWriter`
ahouene Feb 15, 2023
999a2b1
formatting
ahouene Feb 20, 2023
7676178
s3: use common method for Load and NewReader
ahouene Feb 20, 2023
feebd77
test presence of key after storing with NewWriter
ahouene Feb 20, 2023
cb57661
remove duplicate error check
ahouene Feb 21, 2023
9e6109b
s3 backend: implement StorageWriter
ahouene Feb 22, 2023
c823420
Merge remote-tracking branch 'upstream/main' into stream-optional-int…
ahouene Feb 22, 2023
1e1eea4
apply comment improvement suggestions
ahouene Oct 2, 2023
0cd9451
s3: simplify doStoreReader and doLoadReader
ahouene Oct 2, 2023
9ccf41a
tester: test atomicity of writer, and write/read after close
ahouene Oct 2, 2023
cb03f31
s3: close pipe, better context cancellation handling
ahouene Oct 2, 2023
9e99799
fs backend: write file atomically with NewWriter
ahouene Oct 2, 2023
b355158
Merge branch 'main' into stream-optional-interfaces
ahouene Oct 5, 2023
ce68e5d
s3: move prefix prepending and metrics calls to their correct places
ahouene Oct 5, 2023
39906d2
s3: rename ReaderStorage->StreamReader, WriterStorage->StreamWriter
ahouene Oct 5, 2023
43336e7
s3: revert to default NumThreads in PutObjects
ahouene Oct 5, 2023
9f60df8
s3: prepend global prefix where missing
ahouene Oct 5, 2023
842b09d
s3: make a comment about global prefix use with marker clearer
ahouene Oct 5, 2023
48db5a2
s3: introduce NumMinioThreads to configure NumThreads in minio client
ahouene Oct 5, 2023
fb2d113
s3: move stream logic to separate file, implement ReaderFrom
ahouene Aug 26, 2024
105e38f
fs: move stream logic to separate file
ahouene Aug 26, 2024
0c574cf
s3: remove unfitting io.ReaderFrom implementation
ahouene Aug 27, 2024
845ebd8
s3: stream: start pipe earlier, do not write marker twice
ahouene Aug 27, 2024
dea00a6
stream: catch cancelled ctx early in fs & s3
ahouene Aug 27, 2024
1aac573
stream: remove name validation TODO comment
ahouene Aug 27, 2024
effe5c1
io stream: update README and make a couple comments clearer
ahouene Oct 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions backends/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fs
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -89,6 +90,25 @@ func (b *Backend) Delete(ctx context.Context, name string) error {
return err
}

// NewReader provides an optimized way to read from named file.
func (b *Backend) NewReader(ctx context.Context, name string) (io.ReadCloser, error) {
if !allowedName(name) {
return nil, os.ErrPermission
}
fullPath := filepath.Join(b.rootPath, name)
return os.Open(fullPath)
}


// NewWriter provides an optimized way to write to a file.
func (b *Backend) NewWriter(ctx context.Context, name string) (io.WriteCloser, error) {
if !allowedName(name) {
return nil, os.ErrPermission
}
fullPath := filepath.Join(b.rootPath, name)
return os.OpenFile(fullPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
}

func allowedName(name string) bool {
// TODO: Make shared and test for rejection
if strings.Contains(name, "/") {
Expand Down
21 changes: 21 additions & 0 deletions backends/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,27 @@ func New(ctx context.Context, opt Options) (*Backend, error) {
return b, nil
}

// NewReader satisfies ReaderStorage and provides an optimized, streamed way
// to fetch a blob from S3 server.
func (b *Backend) NewReader(ctx context.Context, name string) (io.ReadCloser, error) {
obj, err := b.client.GetObject(ctx, b.opt.Bucket, name, minio.GetObjectOptions{})
if err = convertMinioError(err); err != nil {
return nil, err
}
if obj == nil {
return nil, os.ErrNotExist
}

info, err := obj.Stat()
if err = convertMinioError(err); err != nil {
return nil, err
}
if info.Key != name {
return nil, os.ErrNotExist
}
return obj, nil
}

// convertMinioError takes an error, possibly a minio.ErrorResponse
// and turns it into a well known error when possible.
// If error is not well known, it is returned as is.
Expand Down
99 changes: 99 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package simpleblob

import (
"bytes"
"context"
"errors"
"io"
"sync"
)

// A ReaderStorage is an Interface providing an optimized way to create an io.ReadCloser.
type ReaderStorage interface {
Interface
// NewReader returns an io.ReadCloser, allowing stream reading
// of named value from the underlying backend.
NewReader(context.Context, string) (io.ReadCloser, error)
}

// A WriterStorage is an Interface providing an optimized way to create an io.WriteCloser.
type WriterStorage interface {
Interface
// NewWriter returns an io.WriteCloser, allowing stream writing
// to named key in the underlying backend.
NewWriter(context.Context, string) (io.WriteCloser, error)
}

// A reader wraps a backend to satisfy io.ReadCloser.
type reader struct {
st Interface
*bytes.Reader
}

// Close signifies operations on reader are over.
func (*reader) Close() error {
return nil
}

// NewReader returns an optimized io.ReadCloser for backend if available,
// else a basic buffered implementation.
func NewReader(ctx context.Context, st Interface, name string) (io.ReadCloser, error) {
if sst, ok := st.(ReaderStorage); ok {
return sst.NewReader(ctx, name)
}
b, err := st.Load(ctx, name)
if err != nil {
return nil, err
}
return &reader{st, bytes.NewReader(b)}, nil
}

// A writer wraps a backend to satisfy io.WriteCloser.
// The bytes written to it are buffered, then sent to backend when closed.
type writer struct {
st Interface
ctx context.Context
name string
closed bool
buf bytes.Buffer
mu sync.Mutex
}

var errWClosed = errors.New("WriterStorage closed")

// Write appends p to the data ready to be stored.
//
// Content will only be sent to backend when w.Close is called.
func (w *writer) Write(p []byte) (int, error) {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return 0, errWClosed
}
return w.buf.Write(p)
}

// Close signifies operations on writer are over.
// The file is sent to backend when called.
func (w *writer) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.closed {
return errWClosed
}
w.closed = true
return w.st.Store(w.ctx, w.name, w.buf.Bytes())
}

// NewWriter returns an optimized io.WriteCloser for backend if available,
// else a basic buffered implementation.
func NewWriter(ctx context.Context, st Interface, name string) (io.WriteCloser, error) {
if sst, ok := st.(WriterStorage); ok {
return sst.NewWriter(ctx, name)
}
return &writer{
st: st,
ctx: ctx,
name: name,
}, nil
}
26 changes: 26 additions & 0 deletions tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tester

import (
"context"
"io"
"os"
"testing"

Expand Down Expand Up @@ -51,6 +52,14 @@ func DoBackendTests(t *testing.T, b simpleblob.Interface) {
assert.NoError(t, err)
assert.Equal(t, data, []byte("foo"))

// Reader should get the same data as Load
r, err := simpleblob.NewReader(ctx, b, "foo-1")
assert.NoError(t, err)
p, err := io.ReadAll(r)
assert.NoError(t, err)
assert.Equal(t, data, p)
assert.NoError(t, r.Close())

// Check overwritten data
data, err = b.Load(ctx, "bar-1")
assert.NoError(t, err)
Expand All @@ -68,9 +77,26 @@ func DoBackendTests(t *testing.T, b simpleblob.Interface) {
assert.NoError(t, err)
assert.Equal(t, data, []byte("foo"))

// Writer stores data
w, err := simpleblob.NewWriter(ctx, b, "fizz")
assert.NoError(t, err)
assert.NotNil(t, w)
buzz := []byte("buzz")
n, err := w.Write(buzz)
assert.NoError(t, err)
assert.NoError(t, w.Close())
assert.EqualValues(t, len(buzz), n)
data, err = b.Load(ctx, "fizz")
assert.NoError(t, err)
assert.Equal(t, buzz, data)

// Load non-existing
_, err = b.Load(ctx, "does-not-exist")
assert.ErrorIs(t, err, os.ErrNotExist)
// With Reader
r, err = simpleblob.NewReader(ctx, b, "does-not-exist")
assert.ErrorIs(t, err, os.ErrNotExist)
assert.Nil(t, r)

// Delete existing
err = b.Delete(ctx, "foo-1")
Expand Down