Skip to content

Commit 223250b

Browse files
ahouenewojas
andauthored
io-based stream optional interfaces (#11)
* create stream-related types Created simpleblob.NewReader and simpleblob.NewWriter functions, along with ReaderStorage and WriterStorage interface types. Those functions allow to provide an optimized way to write to and read from backend if those interfaces are satsfied. If an interface is not satisfied by backend, a fallback implementation is used, akin to `fs.GlobFS`. * implement ReaderStorage and WriterStorage for fs backend Provide io.ReadCloser and io.WriteCloser to interact with files to way the `os` package does it. The `os` package has byte-slice based functions that rely on streams under the hood, we're doing the same here. * implement ReaderStorage for S3 backend The Object returned by MinIO client satisfies io.ReadCloser, so this implementation is very straightforward. For writing, the MinIO client takes a reader and needs the full size of the stored object. Thus, we'd need to exhaust the writer into a buffer, take its size and provide it to the client, which is basically what the fallback implementation of NewWriter does. * add tests for ReaderStorage and WriterStorage Confront the results from simpleblob.NewReader and simpleblob.NewWriter with simpleblob.Interface. * more robust implementation of general WriterStorage wrapper * fix WriterStorage doc typo * fix writer wrapper mutex usage * show that param is name in ReaderStorage Co-authored-by: wojas <[email protected]> * replace reader wrapper struct with io.NopCloser * fix "mixed name and unnamed parameters" syntax error * replace errorWClosed with ErrClosed * express intent to check name validity on stream wrapper struct * rename default `writer` wrapper struct to `fallbackWriter` * formatting * s3: use common method for Load and NewReader * test presence of key after storing with NewWriter * remove duplicate error check * s3 backend: implement StorageWriter * apply comment improvement suggestions Co-authored-by: wojas <[email protected]> * s3: simplify doStoreReader and doLoadReader * tester: test atomicity of writer, and write/read after close * s3: close pipe, better context cancellation handling * fs backend: write file atomically with NewWriter * s3: move prefix prepending and metrics calls to their correct places * s3: rename ReaderStorage->StreamReader, WriterStorage->StreamWriter * s3: revert to default NumThreads in PutObjects * s3: prepend global prefix where missing * s3: make a comment about global prefix use with marker clearer * s3: introduce NumMinioThreads to configure NumThreads in minio client * s3: move stream logic to separate file, implement ReaderFrom * fs: move stream logic to separate file * s3: remove unfitting io.ReaderFrom implementation * s3: stream: start pipe earlier, do not write marker twice * stream: catch cancelled ctx early in fs & s3 * stream: remove name validation TODO comment General name validation was dropped in favour of documenting safe names. * io stream: update README and make a couple comments clearer --------- Co-authored-by: wojas <[email protected]>
1 parent 2d5a347 commit 223250b

File tree

9 files changed

+399
-20
lines changed

9 files changed

+399
-20
lines changed

README.md

+24-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
# Simpleblob
32

43
[![Go Reference](https://pkg.go.dev/badge/github.com/PowerDNS/simpleblob.svg)](https://pkg.go.dev/github.com/PowerDNS/simpleblob)
@@ -10,6 +9,9 @@ Simpleblob is a Go module that simplifies the storage of arbitrary data by key f
109
- `fs`: File storage (one file per blob)
1110
- `memory`: Memory storage (for tests)
1211

12+
13+
## Usage
14+
1315
The interface implemented by the backends is:
1416

1517
```go
@@ -38,16 +40,33 @@ type Storage struct {
3840
}
3941
```
4042

43+
44+
### `io` interfaces
45+
46+
Reading from or writing to a blob directly can be done using the `NewReader` and `NewWriter` functions.
47+
48+
```go
49+
func NewReader(ctx context.Context, storage Interface, blobName string) (io.ReadCloser, error)
50+
func NewWriter(ctx context.Context, storage Interface, blobName string) (io.WriteCloser, error)
51+
```
52+
53+
The returned ReadCloser or WriteCloser is an optimized implementation if the backend being used implements the `StreamReader` or `StreamWriter` interfaces.
54+
If not, a convenience wrapper for the storage is returned.
55+
56+
| Backend | StreamReader | StreamWriter |
57+
| --- | --- | --- |
58+
| S3 | ✔ | ✔ |
59+
| Filesystem | ✔ | ✔ |
60+
| Memory | ✖ | ✖ |
61+
62+
4163
## Limitations
4264

4365
The interface currently does not support streaming of large blobs. In the future we may provide this by implementing `fs.FS` in the backend for reading, and a similar interface for writing new blobs.
4466

67+
4568
## API Stability
4669

4770
We support the last two stable Go versions, currently 1.17 and 1.18.
4871

4972
From a API consumer point of view, we do not plan any backward incompatible changes before a v1.0.
50-
51-
For storage backends, any future extensions most likely be added with optional interface, similar to the `fs.FS` design. Utility functions that return a compatible implementation will be used for backends that do not implement the interface, when possible.
52-
53-

backends/fs/atomic.go

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package fs
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"path/filepath"
7+
)
8+
9+
// createAtomic creates a new File. The given fpath is the file path of the final destination.
10+
func createAtomic(fpath string) (*atomicFile, error) {
11+
fpath, err := filepath.Abs(fpath)
12+
if err != nil {
13+
return nil, fmt.Errorf("absolute path for atomic file %q: %w", fpath, err)
14+
}
15+
// Using the PID under the assumption that the same program will not be writing to
16+
// the same path at the same time. An overwrite later on retry is desired, if
17+
// not cleaned properly.
18+
tmp := fmt.Sprintf("%s.%d%s", fpath, os.Getpid(), ignoreSuffix)
19+
file, err := os.Create(tmp)
20+
if err != nil {
21+
return nil, fmt.Errorf("create atomic file %q: %w", fpath, err)
22+
}
23+
return &atomicFile{
24+
file: file,
25+
path: fpath,
26+
tmp: tmp,
27+
}, nil
28+
}
29+
30+
// atomicFile implements an io.WriteCloser that writes to a temp file and moves it
31+
// atomically into place on Close.
32+
type atomicFile struct {
33+
file *os.File // The underlying file being written to.
34+
path string // The final path of the file.
35+
tmp string // The path of the file during write.
36+
}
37+
38+
// Write implements io.Writer
39+
func (f *atomicFile) Write(data []byte) (int, error) {
40+
return f.file.Write(data)
41+
}
42+
43+
// Clean aborts the creation of the file if called before Close. If called
44+
// after Close, it does nothing. This makes it useful in a defer.
45+
func (f *atomicFile) Clean() {
46+
_ = f.file.Close()
47+
_ = os.Remove(f.tmp)
48+
}
49+
50+
// Close closes the temp file and moves it to the final destination.
51+
func (f *atomicFile) Close() error {
52+
var err error
53+
defer func() {
54+
if err != nil {
55+
// The rename did not happen and we're left with
56+
// the temporary file hanging.
57+
_ = os.Remove(f.tmp)
58+
}
59+
}()
60+
61+
// Some of the file content may still be cached by the OS,
62+
// and is not guaranteed to be written to physical device on close.
63+
// Behaviour is inconsistent across devices and C standard libraries.
64+
// Syncing file AND its parent directory (here) ensure this.
65+
// See fsync(2) and open(2).
66+
if err = f.file.Sync(); err != nil {
67+
_ = f.file.Close()
68+
return err
69+
}
70+
if err = f.file.Close(); err != nil {
71+
return err
72+
}
73+
74+
// Move into place
75+
if err = os.Rename(f.tmp, f.path); err != nil {
76+
return err
77+
}
78+
79+
var dir *os.File
80+
dir, err = os.Open(filepath.Dir(f.path))
81+
if err != nil {
82+
return err
83+
}
84+
err = dir.Sync()
85+
if err != nil {
86+
_ = dir.Close()
87+
return err
88+
}
89+
return dir.Close()
90+
}

backends/fs/fs.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ import (
1111
"github.com/PowerDNS/simpleblob"
1212
)
1313

14+
// ignoreSuffix is the suffix to use internally
15+
// to hide a file from (*Backend).List.
16+
const ignoreSuffix = ".tmp"
17+
1418
// Options describes the storage options for the fs backend
1519
type Options struct {
1620
RootPath string `yaml:"root_path"`
@@ -71,7 +75,7 @@ func (b *Backend) Store(ctx context.Context, name string, data []byte) error {
7175
return os.ErrPermission
7276
}
7377
fullPath := filepath.Join(b.rootPath, name)
74-
tmpPath := fullPath + ".tmp" // ignored by List()
78+
tmpPath := fullPath + ignoreSuffix // ignored by List()
7579
if err := writeFile(tmpPath, data); err != nil {
7680
return err
7781
}
@@ -100,7 +104,7 @@ func allowedName(name string) bool {
100104
if strings.HasPrefix(name, ".") {
101105
return false
102106
}
103-
if strings.HasSuffix(name, ".tmp") {
107+
if strings.HasSuffix(name, ignoreSuffix) {
104108
return false // used for our temp files when writing
105109
}
106110
return true

backends/fs/stream.go

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package fs
2+
3+
import (
4+
"context"
5+
"io"
6+
"os"
7+
"path/filepath"
8+
)
9+
10+
// NewReader provides an optimized way to read from named file.
11+
func (b *Backend) NewReader(ctx context.Context, name string) (io.ReadCloser, error) {
12+
if err := ctx.Err(); err != nil {
13+
return nil, err
14+
}
15+
if !allowedName(name) {
16+
return nil, os.ErrPermission
17+
}
18+
fullPath := filepath.Join(b.rootPath, name)
19+
return os.Open(fullPath)
20+
}
21+
22+
// NewWriter provides an optimized way to write to a file.
23+
func (b *Backend) NewWriter(ctx context.Context, name string) (io.WriteCloser, error) {
24+
if err := ctx.Err(); err != nil {
25+
return nil, err
26+
}
27+
if !allowedName(name) {
28+
return nil, os.ErrPermission
29+
}
30+
fullPath := filepath.Join(b.rootPath, name)
31+
return createAtomic(fullPath)
32+
}

backends/s3/marker.go

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ func (b *Backend) setMarker(ctx context.Context, name, etag string, isDel bool)
1818
}
1919
nanos := time.Now().UnixNano()
2020
s := fmt.Sprintf("%s:%s:%d:%v", name, etag, nanos, isDel)
21+
// Here, we're not using Store because markerName already has the global prefix.
2122
_, err := b.doStore(ctx, b.markerName, []byte(s))
2223
if err != nil {
2324
return err

backends/s3/s3.go

+45-13
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ type Options struct {
8787
// DisableContentMd5 defines whether to disable sending the Content-MD5 header
8888
DisableContentMd5 bool `yaml:"disable_send_content_md5"`
8989

90+
// NumMinioThreads defines the number of threads that Minio uses for its workers.
91+
// It defaults to the using the default value defined by the Minio client.
92+
NumMinioThreads uint `yaml:"num_minio_threads"`
93+
9094
// TLS allows customising the TLS configuration
9195
// See https://github.com/PowerDNS/go-tlsconfig for the available options
9296
TLS tlsconfig.Config `yaml:"tls"`
@@ -152,6 +156,8 @@ func (b *Backend) List(ctx context.Context, prefix string) (blobList simpleblob.
152156
return b.doList(ctx, combinedPrefix)
153157
}
154158

159+
// Using Load, that will itself prepend the global prefix to the marker name.
160+
// So we're using the raw marker name here.
155161
m, err := b.Load(ctx, UpdateMarkerFilename)
156162
exists := !errors.Is(err, os.ErrNotExist)
157163
if err != nil && exists {
@@ -229,25 +235,44 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis
229235
// Load retrieves the content of the object identified by name from S3 Bucket
230236
// configured in b.
231237
func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) {
232-
// Prepend global prefix
233238
name = b.prependGlobalPrefix(name)
234239

240+
r, err := b.doLoadReader(ctx, name)
241+
if err != nil {
242+
return nil, err
243+
}
244+
defer r.Close()
245+
246+
p, err := io.ReadAll(r)
247+
if err = convertMinioError(err, false); err != nil {
248+
return nil, err
249+
}
250+
return p, nil
251+
}
252+
253+
func (b *Backend) doLoadReader(ctx context.Context, name string) (io.ReadCloser, error) {
235254
metricCalls.WithLabelValues("load").Inc()
236255
metricLastCallTimestamp.WithLabelValues("load").SetToCurrentTime()
237256

238257
obj, err := b.client.GetObject(ctx, b.opt.Bucket, name, minio.GetObjectOptions{})
239258
if err = convertMinioError(err, false); err != nil {
259+
metricCallErrors.WithLabelValues("load").Inc()
240260
return nil, err
241-
} else if obj == nil {
261+
}
262+
if obj == nil {
242263
return nil, os.ErrNotExist
243264
}
244-
defer obj.Close()
245-
246-
p, err := io.ReadAll(obj)
265+
info, err := obj.Stat()
247266
if err = convertMinioError(err, false); err != nil {
267+
metricCallErrors.WithLabelValues("load").Inc()
248268
return nil, err
249269
}
250-
return p, nil
270+
if info.Key == "" {
271+
// minio will return an object with empty fields when name
272+
// is not present in bucket.
273+
return nil, os.ErrNotExist
274+
}
275+
return obj, nil
251276
}
252277

253278
// Store sets the content of the object identified by name to the content
@@ -263,18 +288,25 @@ func (b *Backend) Store(ctx context.Context, name string, data []byte) error {
263288
return b.setMarker(ctx, name, info.ETag, false)
264289
}
265290

291+
// doStore is a convenience wrapper around doStoreReader.
266292
func (b *Backend) doStore(ctx context.Context, name string, data []byte) (minio.UploadInfo, error) {
293+
return b.doStoreReader(ctx, name, bytes.NewReader(data), int64(len(data)))
294+
}
295+
296+
// doStoreReader stores data with key name in S3, using r as a source for data.
297+
// The value of size may be -1, in case the size is not known.
298+
func (b *Backend) doStoreReader(ctx context.Context, name string, r io.Reader, size int64) (minio.UploadInfo, error) {
267299
metricCalls.WithLabelValues("store").Inc()
268300
metricLastCallTimestamp.WithLabelValues("store").SetToCurrentTime()
269301

270302
putObjectOptions := minio.PutObjectOptions{
271-
NumThreads: 3,
272-
}
273-
if !b.opt.DisableContentMd5 {
274-
putObjectOptions.SendContentMd5 = true
303+
NumThreads: b.opt.NumMinioThreads,
304+
SendContentMd5: !b.opt.DisableContentMd5,
275305
}
276306

277-
info, err := b.client.PutObject(ctx, b.opt.Bucket, name, bytes.NewReader(data), int64(len(data)), putObjectOptions)
307+
// minio accepts size == -1, meaning the size is unknown.
308+
info, err := b.client.PutObject(ctx, b.opt.Bucket, name, r, size, putObjectOptions)
309+
err = convertMinioError(err, false)
278310
if err != nil {
279311
metricCallErrors.WithLabelValues("store").Inc()
280312
}
@@ -371,9 +403,9 @@ func New(ctx context.Context, opt Options) (*Backend, error) {
371403
case "https":
372404
useSSL = true
373405
case "":
374-
return nil, fmt.Errorf("no scheme provided for endpoint URL '%s', use http or https.", opt.EndpointURL)
406+
return nil, fmt.Errorf("no scheme provided for endpoint URL %q, use http or https", opt.EndpointURL)
375407
default:
376-
return nil, fmt.Errorf("unsupported scheme for S3: '%s', use http or https.", u.Scheme)
408+
return nil, fmt.Errorf("unsupported scheme for S3: %q, use http or https", u.Scheme)
377409
}
378410

379411
creds := credentials.NewStaticV4(opt.AccessKey, opt.SecretKey, "")

0 commit comments

Comments
 (0)