-
Notifications
You must be signed in to change notification settings - Fork 7
Stream optional interfaces #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
39 commits
Select commit
Hold shift + click to select a range
7d500f6
create stream-related types
ahouene f52f88c
implement ReaderStorage and WriterStorage for fs backend
ahouene f75ec98
implement ReaderStorage for S3 backend
ahouene 7ced814
add tests for ReaderStorage and WriterStorage
ahouene 0e680df
more robust implementation of general WriterStorage wrapper
ahouene c4e0889
fix WriterStorage doc typo
ahouene 60bf026
Merge remote-tracking branch 'upstream/main' into stream-optional-int…
ahouene 1c992f8
fix writer wrapper mutex usage
ahouene a356b79
show that param is name in ReaderStorage
ahouene 6d5554e
replace reader wrapper struct with io.NopCloser
ahouene 08a90c3
fix "mixed name and unnamed parameters" syntax error
ahouene 51c1d60
replace errorWClosed with ErrClosed
ahouene 61ffabb
express intent to check name validity on stream wrapper struct
ahouene ede6243
rename default `writer` wrapper struct to `fallbackWriter`
ahouene 999a2b1
formatting
ahouene 7676178
s3: use common method for Load and NewReader
ahouene feebd77
test presence of key after storing with NewWriter
ahouene cb57661
remove duplicate error check
ahouene 9e6109b
s3 backend: implement StorageWriter
ahouene c823420
Merge remote-tracking branch 'upstream/main' into stream-optional-int…
ahouene 1e1eea4
apply comment improvement suggestions
ahouene 0cd9451
s3: simplify doStoreReader and doLoadReader
ahouene 9ccf41a
tester: test atomicity of writer, and write/read after close
ahouene cb03f31
s3: close pipe, better context cancellation handling
ahouene 9e99799
fs backend: write file atomically with NewWriter
ahouene b355158
Merge branch 'main' into stream-optional-interfaces
ahouene ce68e5d
s3: move prefix prepending and metrics calls to their correct places
ahouene 39906d2
s3: rename ReaderStorage->StreamReader, WriterStorage->StreamWriter
ahouene 43336e7
s3: revert to default NumThreads in PutObjects
ahouene 9f60df8
s3: prepend global prefix where missing
ahouene 842b09d
s3: make a comment about global prefix use with marker clearer
ahouene 48db5a2
s3: introduce NumMinioThreads to configure NumThreads in minio client
ahouene fb2d113
s3: move stream logic to separate file, implement ReaderFrom
ahouene 105e38f
fs: move stream logic to separate file
ahouene 0c574cf
s3: remove unfitting io.ReaderFrom implementation
ahouene 845ebd8
s3: stream: start pipe earlier, do not write marker twice
ahouene dea00a6
stream: catch cancelled ctx early in fs & s3
ahouene 1aac573
stream: remove name validation TODO comment
ahouene effe5c1
io stream: update README and make a couple comments clearer
ahouene File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
package fs | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
"path/filepath" | ||
) | ||
|
||
// createAtomic creates a new File. The given fpath is the file path of the final destination. | ||
func createAtomic(fpath string) (*atomicFile, error) { | ||
fpath, err := filepath.Abs(fpath) | ||
if err != nil { | ||
return nil, fmt.Errorf("absolute path for atomic file %q: %w", fpath, err) | ||
} | ||
// Using the PID under the assumption that the same program will not be writing to | ||
// the same path at the same time. An overwrite later on retry is desired, if | ||
// not cleaned properly. | ||
tmp := fmt.Sprintf("%s.%d%s", fpath, os.Getpid(), ignoreSuffix) | ||
file, err := os.Create(tmp) | ||
if err != nil { | ||
return nil, fmt.Errorf("create atomic file %q: %w", fpath, err) | ||
} | ||
return &atomicFile{ | ||
file: file, | ||
path: fpath, | ||
tmp: tmp, | ||
}, nil | ||
} | ||
|
||
// atomicFile implements an io.WriteCloser that writes to a temp file and moves it | ||
// atomically into place on Close. | ||
type atomicFile struct { | ||
file *os.File // The underlying file being written to. | ||
path string // The final path of the file. | ||
tmp string // The path of the file during write. | ||
} | ||
|
||
// Write implements io.Writer | ||
func (f *atomicFile) Write(data []byte) (int, error) { | ||
return f.file.Write(data) | ||
} | ||
|
||
// Clean aborts the creation of the file if called before Close. If called | ||
// after Close, it does nothing. This makes it useful in a defer. | ||
func (f *atomicFile) Clean() { | ||
_ = f.file.Close() | ||
_ = os.Remove(f.tmp) | ||
} | ||
|
||
// Close closes the temp file and moves it to the final destination. | ||
func (f *atomicFile) Close() error { | ||
var err error | ||
defer func() { | ||
if err != nil { | ||
// The rename did not happen and we're left with | ||
// the temporary file hanging. | ||
_ = os.Remove(f.tmp) | ||
} | ||
}() | ||
|
||
// Some of the file content may still be cached by the OS, | ||
// and is not guaranteed to be written to physical device on close. | ||
// Behaviour is inconsistent across devices and C standard libraries. | ||
// Syncing file AND its parent directory (here) ensure this. | ||
// See fsync(2) and open(2). | ||
if err = f.file.Sync(); err != nil { | ||
_ = f.file.Close() | ||
return err | ||
} | ||
if err = f.file.Close(); err != nil { | ||
return err | ||
} | ||
|
||
// Move into place | ||
if err = os.Rename(f.tmp, f.path); err != nil { | ||
return err | ||
} | ||
|
||
var dir *os.File | ||
dir, err = os.Open(filepath.Dir(f.path)) | ||
if err != nil { | ||
return err | ||
} | ||
err = dir.Sync() | ||
if err != nil { | ||
_ = dir.Close() | ||
return err | ||
} | ||
return dir.Close() | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
package fs | ||
|
||
import ( | ||
"context" | ||
"io" | ||
"os" | ||
"path/filepath" | ||
) | ||
|
||
// NewReader provides an optimized way to read from named file. | ||
func (b *Backend) NewReader(ctx context.Context, name string) (io.ReadCloser, error) { | ||
if err := ctx.Err(); err != nil { | ||
return nil, err | ||
} | ||
if !allowedName(name) { | ||
return nil, os.ErrPermission | ||
} | ||
fullPath := filepath.Join(b.rootPath, name) | ||
return os.Open(fullPath) | ||
} | ||
|
||
// NewWriter provides an optimized way to write to a file. | ||
func (b *Backend) NewWriter(ctx context.Context, name string) (io.WriteCloser, error) { | ||
if err := ctx.Err(); err != nil { | ||
return nil, err | ||
} | ||
if !allowedName(name) { | ||
return nil, os.ErrPermission | ||
} | ||
fullPath := filepath.Join(b.rootPath, name) | ||
return createAtomic(fullPath) | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks solid, I guess you have looked at other similar solutions like this one:
https://github.com/google/renameio
just to see we are not missing any corner cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the recommendation. I gave it a look and while it looks like it can handle a broader set of situations (that we don't need here), I didn't see something that would benefit us here.