Skip to content

Stream optional interfaces #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7d500f6
create stream-related types
ahouene Jul 26, 2022
f52f88c
implement ReaderStorage and WriterStorage for fs backend
ahouene Jul 26, 2022
f75ec98
implement ReaderStorage for S3 backend
ahouene Jul 26, 2022
7ced814
add tests for ReaderStorage and WriterStorage
ahouene Jul 26, 2022
0e680df
more robust implementation of general WriterStorage wrapper
ahouene Oct 25, 2022
c4e0889
fix WriterStorage doc typo
ahouene Oct 25, 2022
60bf026
Merge remote-tracking branch 'upstream/main' into stream-optional-int…
ahouene Nov 17, 2022
1c992f8
fix writer wrapper mutex usage
ahouene Nov 17, 2022
a356b79
show that param is name in ReaderStorage
ahouene Feb 15, 2023
6d5554e
replace reader wrapper struct with io.NopCloser
ahouene Feb 15, 2023
08a90c3
fix "mixed name and unnamed parameters" syntax error
ahouene Feb 15, 2023
51c1d60
replace errorWClosed with ErrClosed
ahouene Feb 15, 2023
61ffabb
express intent to check name validity on stream wrapper struct
ahouene Feb 15, 2023
ede6243
rename default `writer` wrapper struct to `fallbackWriter`
ahouene Feb 15, 2023
999a2b1
formatting
ahouene Feb 20, 2023
7676178
s3: use common method for Load and NewReader
ahouene Feb 20, 2023
feebd77
test presence of key after storing with NewWriter
ahouene Feb 20, 2023
cb57661
remove duplicate error check
ahouene Feb 21, 2023
9e6109b
s3 backend: implement StorageWriter
ahouene Feb 22, 2023
c823420
Merge remote-tracking branch 'upstream/main' into stream-optional-int…
ahouene Feb 22, 2023
1e1eea4
apply comment improvement suggestions
ahouene Oct 2, 2023
0cd9451
s3: simplify doStoreReader and doLoadReader
ahouene Oct 2, 2023
9ccf41a
tester: test atomicity of writer, and write/read after close
ahouene Oct 2, 2023
cb03f31
s3: close pipe, better context cancellation handling
ahouene Oct 2, 2023
9e99799
fs backend: write file atomically with NewWriter
ahouene Oct 2, 2023
b355158
Merge branch 'main' into stream-optional-interfaces
ahouene Oct 5, 2023
ce68e5d
s3: move prefix prepending and metrics calls to their correct places
ahouene Oct 5, 2023
39906d2
s3: rename ReaderStorage->StreamReader, WriterStorage->StreamWriter
ahouene Oct 5, 2023
43336e7
s3: revert to default NumThreads in PutObjects
ahouene Oct 5, 2023
9f60df8
s3: prepend global prefix where missing
ahouene Oct 5, 2023
842b09d
s3: make a comment about global prefix use with marker clearer
ahouene Oct 5, 2023
48db5a2
s3: introduce NumMinioThreads to configure NumThreads in minio client
ahouene Oct 5, 2023
fb2d113
s3: move stream logic to separate file, implement ReaderFrom
ahouene Aug 26, 2024
105e38f
fs: move stream logic to separate file
ahouene Aug 26, 2024
0c574cf
s3: remove unfitting io.ReaderFrom implementation
ahouene Aug 27, 2024
845ebd8
s3: stream: start pipe earlier, do not write marker twice
ahouene Aug 27, 2024
dea00a6
stream: catch cancelled ctx early in fs & s3
ahouene Aug 27, 2024
1aac573
stream: remove name validation TODO comment
ahouene Aug 27, 2024
effe5c1
io stream: update README and make a couple comments clearer
ahouene Oct 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

# Simpleblob

[![Go Reference](https://pkg.go.dev/badge/github.com/PowerDNS/simpleblob.svg)](https://pkg.go.dev/github.com/PowerDNS/simpleblob)
Expand All @@ -10,6 +9,9 @@ Simpleblob is a Go module that simplifies the storage of arbitrary data by key f
- `fs`: File storage (one file per blob)
- `memory`: Memory storage (for tests)


## Usage

The interface implemented by the backends is:

```go
Expand Down Expand Up @@ -38,16 +40,33 @@ type Storage struct {
}
```


### `io` interfaces

Reading from or writing to a blob directly can be done using the `NewReader` and `NewWriter` functions.

```go
func NewReader(ctx context.Context, storage Interface, blobName string) (io.ReadCloser, error)
func NewWriter(ctx context.Context, storage Interface, blobName string) (io.WriteCloser, error)
```

The returned ReadCloser or WriteCloser is an optimized implementation if the backend being used implements the `StreamReader` or `StreamWriter` interfaces.
If not, a convenience wrapper for the storage is returned.

| Backend | StreamReader | StreamWriter |
| --- | --- | --- |
| S3 | ✔ | ✔ |
| Filesystem | ✔ | ✔ |
| Memory | ✖ | ✖ |


## Limitations

The interface currently does not support streaming of large blobs. In the future we may provide this by implementing `fs.FS` in the backend for reading, and a similar interface for writing new blobs.


## API Stability

We support the last two stable Go versions, currently 1.17 and 1.18.

From a API consumer point of view, we do not plan any backward incompatible changes before a v1.0.

For storage backends, any future extensions most likely be added with optional interface, similar to the `fs.FS` design. Utility functions that return a compatible implementation will be used for backends that do not implement the interface, when possible.


90 changes: 90 additions & 0 deletions backends/fs/atomic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package fs

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks solid, I guess you have looked at other similar solutions like this one:
https://github.com/google/renameio

just to see we are not missing any corner cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the recommendation. I gave it a look and while it looks like it can handle a broader set of situations (that we don't need here), I didn't see something that would benefit us here.


import (
"fmt"
"os"
"path/filepath"
)

// createAtomic creates a new File. The given fpath is the file path of the final destination.
func createAtomic(fpath string) (*atomicFile, error) {
fpath, err := filepath.Abs(fpath)
if err != nil {
return nil, fmt.Errorf("absolute path for atomic file %q: %w", fpath, err)
}
// Using the PID under the assumption that the same program will not be writing to
// the same path at the same time. An overwrite later on retry is desired, if
// not cleaned properly.
tmp := fmt.Sprintf("%s.%d%s", fpath, os.Getpid(), ignoreSuffix)
file, err := os.Create(tmp)
if err != nil {
return nil, fmt.Errorf("create atomic file %q: %w", fpath, err)
}
return &atomicFile{
file: file,
path: fpath,
tmp: tmp,
}, nil
}

// atomicFile implements an io.WriteCloser that writes to a temp file and moves it
// atomically into place on Close.
type atomicFile struct {
file *os.File // The underlying file being written to.
path string // The final path of the file.
tmp string // The path of the file during write.
}

// Write implements io.Writer
func (f *atomicFile) Write(data []byte) (int, error) {
return f.file.Write(data)
}

// Clean aborts the creation of the file if called before Close. If called
// after Close, it does nothing. This makes it useful in a defer.
func (f *atomicFile) Clean() {
_ = f.file.Close()
_ = os.Remove(f.tmp)
}

// Close closes the temp file and moves it to the final destination.
func (f *atomicFile) Close() error {
var err error
defer func() {
if err != nil {
// The rename did not happen and we're left with
// the temporary file hanging.
_ = os.Remove(f.tmp)
}
}()

// Some of the file content may still be cached by the OS,
// and is not guaranteed to be written to physical device on close.
// Behaviour is inconsistent across devices and C standard libraries.
// Syncing file AND its parent directory (here) ensure this.
// See fsync(2) and open(2).
if err = f.file.Sync(); err != nil {
_ = f.file.Close()
return err
}
if err = f.file.Close(); err != nil {
return err
}

// Move into place
if err = os.Rename(f.tmp, f.path); err != nil {
return err
}

var dir *os.File
dir, err = os.Open(filepath.Dir(f.path))
if err != nil {
return err
}
err = dir.Sync()
if err != nil {
_ = dir.Close()
return err
}
return dir.Close()
}
8 changes: 6 additions & 2 deletions backends/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"github.com/PowerDNS/simpleblob"
)

// ignoreSuffix is the suffix to use internally
// to hide a file from (*Backend).List.
const ignoreSuffix = ".tmp"

// Options describes the storage options for the fs backend
type Options struct {
RootPath string `yaml:"root_path"`
Expand Down Expand Up @@ -71,7 +75,7 @@ func (b *Backend) Store(ctx context.Context, name string, data []byte) error {
return os.ErrPermission
}
fullPath := filepath.Join(b.rootPath, name)
tmpPath := fullPath + ".tmp" // ignored by List()
tmpPath := fullPath + ignoreSuffix // ignored by List()
if err := writeFile(tmpPath, data); err != nil {
return err
}
Expand Down Expand Up @@ -100,7 +104,7 @@ func allowedName(name string) bool {
if strings.HasPrefix(name, ".") {
return false
}
if strings.HasSuffix(name, ".tmp") {
if strings.HasSuffix(name, ignoreSuffix) {
return false // used for our temp files when writing
}
return true
Expand Down
32 changes: 32 additions & 0 deletions backends/fs/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package fs

import (
"context"
"io"
"os"
"path/filepath"
)

// NewReader provides an optimized way to read from named file.
func (b *Backend) NewReader(ctx context.Context, name string) (io.ReadCloser, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
if !allowedName(name) {
return nil, os.ErrPermission
}
fullPath := filepath.Join(b.rootPath, name)
return os.Open(fullPath)
}

// NewWriter provides an optimized way to write to a file.
func (b *Backend) NewWriter(ctx context.Context, name string) (io.WriteCloser, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
if !allowedName(name) {
return nil, os.ErrPermission
}
fullPath := filepath.Join(b.rootPath, name)
return createAtomic(fullPath)
}
1 change: 1 addition & 0 deletions backends/s3/marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func (b *Backend) setMarker(ctx context.Context, name, etag string, isDel bool)
}
nanos := time.Now().UnixNano()
s := fmt.Sprintf("%s:%s:%d:%v", name, etag, nanos, isDel)
// Here, we're not using Store because markerName already has the global prefix.
_, err := b.doStore(ctx, b.markerName, []byte(s))
if err != nil {
return err
Expand Down
58 changes: 45 additions & 13 deletions backends/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ type Options struct {
// DisableContentMd5 defines whether to disable sending the Content-MD5 header
DisableContentMd5 bool `yaml:"disable_send_content_md5"`

// NumMinioThreads defines the number of threads that Minio uses for its workers.
// It defaults to the using the default value defined by the Minio client.
NumMinioThreads uint `yaml:"num_minio_threads"`

// TLS allows customising the TLS configuration
// See https://github.com/PowerDNS/go-tlsconfig for the available options
TLS tlsconfig.Config `yaml:"tls"`
Expand Down Expand Up @@ -152,6 +156,8 @@ func (b *Backend) List(ctx context.Context, prefix string) (blobList simpleblob.
return b.doList(ctx, combinedPrefix)
}

// Using Load, that will itself prepend the global prefix to the marker name.
// So we're using the raw marker name here.
m, err := b.Load(ctx, UpdateMarkerFilename)
exists := !errors.Is(err, os.ErrNotExist)
if err != nil && exists {
Expand Down Expand Up @@ -229,25 +235,44 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis
// Load retrieves the content of the object identified by name from S3 Bucket
// configured in b.
func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) {
// Prepend global prefix
name = b.prependGlobalPrefix(name)

r, err := b.doLoadReader(ctx, name)
if err != nil {
return nil, err
}
defer r.Close()

p, err := io.ReadAll(r)
if err = convertMinioError(err, false); err != nil {
return nil, err
}
return p, nil
}

func (b *Backend) doLoadReader(ctx context.Context, name string) (io.ReadCloser, error) {
metricCalls.WithLabelValues("load").Inc()
metricLastCallTimestamp.WithLabelValues("load").SetToCurrentTime()

obj, err := b.client.GetObject(ctx, b.opt.Bucket, name, minio.GetObjectOptions{})
if err = convertMinioError(err, false); err != nil {
metricCallErrors.WithLabelValues("load").Inc()
return nil, err
} else if obj == nil {
}
if obj == nil {
return nil, os.ErrNotExist
}
defer obj.Close()

p, err := io.ReadAll(obj)
info, err := obj.Stat()
if err = convertMinioError(err, false); err != nil {
metricCallErrors.WithLabelValues("load").Inc()
return nil, err
}
return p, nil
if info.Key == "" {
// minio will return an object with empty fields when name
// is not present in bucket.
return nil, os.ErrNotExist
}
return obj, nil
}

// Store sets the content of the object identified by name to the content
Expand All @@ -263,18 +288,25 @@ func (b *Backend) Store(ctx context.Context, name string, data []byte) error {
return b.setMarker(ctx, name, info.ETag, false)
}

// doStore is a convenience wrapper around doStoreReader.
func (b *Backend) doStore(ctx context.Context, name string, data []byte) (minio.UploadInfo, error) {
return b.doStoreReader(ctx, name, bytes.NewReader(data), int64(len(data)))
}

// doStoreReader stores data with key name in S3, using r as a source for data.
// The value of size may be -1, in case the size is not known.
func (b *Backend) doStoreReader(ctx context.Context, name string, r io.Reader, size int64) (minio.UploadInfo, error) {
metricCalls.WithLabelValues("store").Inc()
metricLastCallTimestamp.WithLabelValues("store").SetToCurrentTime()

putObjectOptions := minio.PutObjectOptions{
NumThreads: 3,
}
if !b.opt.DisableContentMd5 {
putObjectOptions.SendContentMd5 = true
NumThreads: b.opt.NumMinioThreads,
SendContentMd5: !b.opt.DisableContentMd5,
}

info, err := b.client.PutObject(ctx, b.opt.Bucket, name, bytes.NewReader(data), int64(len(data)), putObjectOptions)
// minio accepts size == -1, meaning the size is unknown.
info, err := b.client.PutObject(ctx, b.opt.Bucket, name, r, size, putObjectOptions)
err = convertMinioError(err, false)
if err != nil {
metricCallErrors.WithLabelValues("store").Inc()
}
Expand Down Expand Up @@ -371,9 +403,9 @@ func New(ctx context.Context, opt Options) (*Backend, error) {
case "https":
useSSL = true
case "":
return nil, fmt.Errorf("no scheme provided for endpoint URL '%s', use http or https.", opt.EndpointURL)
return nil, fmt.Errorf("no scheme provided for endpoint URL %q, use http or https", opt.EndpointURL)
default:
return nil, fmt.Errorf("unsupported scheme for S3: '%s', use http or https.", u.Scheme)
return nil, fmt.Errorf("unsupported scheme for S3: %q, use http or https", u.Scheme)
}

creds := credentials.NewStaticV4(opt.AccessKey, opt.SecretKey, "")
Expand Down
Loading
Loading