chunker: prevent chunk corruption, survive meta-like input

This commit is contained in:
Ivan Andreev
2019-10-09 12:21:45 +03:00
committed by Nick Craig-Wood
parent 7aa2b4191c
commit 9049bb62ca
4 changed files with 506 additions and 79 deletions

View File

@@ -98,6 +98,14 @@ const optimizeFirstChunk = false
// revealHidden is a stub until chunker lands the `reveal hidden` option.
const revealHidden = false
// Prevent memory overflow due to specially crafted chunk name
const maxSafeChunkNumber = 10000000
// standard chunker errors
var (
ErrChunkOverflow = errors.New("chunk number overflow")
)
// Note: metadata logic is tightly coupled with chunker code in many
// places, eg. in checks whether a file should have meta object or is
// eligible for chunking.
@@ -176,18 +184,17 @@ falling back to SHA1 if unsupported. Requires "simplejson".`,
Help: `Similar to "md5quick" but prefers SHA1 over MD5. Requires "simplejson".`,
}},
}, {
Name: "fail_on_bad_chunks",
Name: "fail_hard",
Advanced: true,
Default: false,
Help: `The list command might encounter files with missinng or invalid chunks.
This boolean flag tells what rclone should do in such cases.`,
Help: `Choose how chunker should handle files with missing or invalid chunks.`,
Examples: []fs.OptionExample{
{
Value: "true",
Help: "Fail with error.",
Help: "Report errors and abort current command.",
}, {
Value: "false",
Help: "Silently ignore invalid object.",
Help: "Warn user, skip incomplete file and proceed.",
},
},
}},
@@ -231,6 +238,7 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) {
root: rpath,
opt: *opt,
}
f.dirSort = true // processEntries requires that meta Objects prerun data chunks atm.
switch opt.MetaFormat {
case "none":
@@ -298,13 +306,13 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) {
// Options defines the configuration for this backend
type Options struct {
Remote string `config:"remote"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
NameFormat string `config:"name_format"`
StartFrom int `config:"start_from"`
MetaFormat string `config:"meta_format"`
HashType string `config:"hash_type"`
FailOnBadChunks bool `config:"fail_on_bad_chunks"`
Remote string `config:"remote"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
NameFormat string `config:"name_format"`
StartFrom int `config:"start_from"`
MetaFormat string `config:"meta_format"`
HashType string `config:"hash_type"`
FailHard bool `config:"fail_hard"`
}
// Fs represents a wrapped fs.Fs
@@ -322,6 +330,7 @@ type Fs struct {
nameRegexp *regexp.Regexp // regular expression to match chunk names
opt Options // copy of Options
features *fs.Features // optional features
dirSort bool // reserved for future, ignored
}
// setChunkNameFormat converts pattern based chunk name format
@@ -454,6 +463,20 @@ func (f *Fs) parseChunkName(filePath string) (mainPath string, chunkNo int, ctrl
return
}
// forbidChunk prints error message or raises error if file is chunk.
// First argument sets log prefix, use `false` to suppress message.
func (f *Fs) forbidChunk(o interface{}, filePath string) error {
if mainPath, _, _, _ := f.parseChunkName(filePath); mainPath != "" {
if f.opt.FailHard {
return fmt.Errorf("chunk overlap with %q", mainPath)
}
if boolVal, isBool := o.(bool); !isBool || boolVal {
fs.Errorf(o, "chunk overlap with %q", mainPath)
}
}
return nil
}
// List the objects and directories in dir into entries.
// The entries can be returned in any order but should be
// for a complete directory.
@@ -480,7 +503,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
if err != nil {
return nil, err
}
return f.chunkEntries(ctx, entries, f.opt.FailOnBadChunks)
return f.processEntries(ctx, entries, dir)
}
// ListR lists the objects and directories of the Fs starting
@@ -498,11 +521,11 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
// immediately.
//
// Don't implement this unless you have a more efficient way
// of listing recursively that doing a directory traversal.
// of listing recursively than doing a directory traversal.
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
do := f.base.Features().ListR
return do(ctx, dir, func(entries fs.DirEntries) error {
newEntries, err := f.chunkEntries(ctx, entries, f.opt.FailOnBadChunks)
newEntries, err := f.processEntries(ctx, entries, dir)
if err != nil {
return err
}
@@ -510,13 +533,15 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
})
}
// chunkEntries is called by List(R). It assembles chunk entries from
// wrapped remote into composite directory entries.
func (f *Fs) chunkEntries(ctx context.Context, origEntries fs.DirEntries, hardErrors bool) (chunkedEntries fs.DirEntries, err error) {
// processEntries assembles chunk entries into composite entries
func (f *Fs) processEntries(ctx context.Context, origEntries fs.DirEntries, dirPath string) (newEntries fs.DirEntries, err error) {
// sort entries, so that meta objects (if any) appear before their chunks
sortedEntries := make(fs.DirEntries, len(origEntries))
copy(sortedEntries, origEntries)
sort.Sort(sortedEntries)
sortedEntries := origEntries
if f.dirSort {
sortedEntries := make(fs.DirEntries, len(origEntries))
copy(sortedEntries, origEntries)
sort.Sort(sortedEntries)
}
byRemote := make(map[string]*Object)
badEntry := make(map[string]bool)
@@ -554,7 +579,7 @@ func (f *Fs) chunkEntries(ctx context.Context, origEntries fs.DirEntries, hardEr
}
}
if err := mainObject.addChunk(entry, chunkNo); err != nil {
if hardErrors {
if f.opt.FailHard {
return nil, err
}
badEntry[mainRemote] = true
@@ -570,7 +595,7 @@ func (f *Fs) chunkEntries(ctx context.Context, origEntries fs.DirEntries, hardEr
wrapDir.SetRemote(entry.Remote())
tempEntries = append(tempEntries, wrapDir)
default:
if hardErrors {
if f.opt.FailHard {
return nil, fmt.Errorf("Unknown object type %T", entry)
}
fs.Debugf(f, "unknown object type %T", entry)
@@ -581,7 +606,7 @@ func (f *Fs) chunkEntries(ctx context.Context, origEntries fs.DirEntries, hardEr
if object, ok := entry.(*Object); ok {
remote := object.Remote()
if isSubdir[remote] {
if hardErrors {
if f.opt.FailHard {
return nil, fmt.Errorf("%q is both meta object and directory", remote)
}
badEntry[remote] = true // fall thru
@@ -591,17 +616,20 @@ func (f *Fs) chunkEntries(ctx context.Context, origEntries fs.DirEntries, hardEr
continue
}
if err := object.validate(); err != nil {
if hardErrors {
if f.opt.FailHard {
return nil, err
}
fs.Debugf(f, "invalid chunks in object %q", remote)
continue
}
}
chunkedEntries = append(chunkedEntries, entry)
newEntries = append(newEntries, entry)
}
return chunkedEntries, nil
if f.dirSort {
sort.Sort(newEntries)
}
return newEntries, nil
}
// NewObject finds the Object at remote.
@@ -615,8 +643,8 @@ func (f *Fs) chunkEntries(ctx context.Context, origEntries fs.DirEntries, hardEr
// but opening even a small file can be slow on some backends.
//
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
if mainRemote, _, _, _ := f.parseChunkName(remote); mainRemote != "" {
return nil, fmt.Errorf("%q should be meta object, not a chunk", remote)
if err := f.forbidChunk(false, remote); err != nil {
return nil, errors.Wrap(err, "can't access")
}
var (
@@ -734,12 +762,12 @@ func (o *Object) readMetadata(ctx context.Context) error {
if err != nil {
return err
}
_ = reader.Close() // ensure file handle is freed on windows
switch o.f.opt.MetaFormat {
case "simplejson":
metaInfo, err := unmarshalSimpleJSON(ctx, metaObject, metadata)
metaInfo, err := unmarshalSimpleJSON(ctx, metaObject, metadata, true)
if err != nil {
// TODO: in a rare case we might mistake a small file for metadata
return errors.Wrap(err, "invalid metadata")
}
if o.size != metaInfo.Size() || len(o.chunks) != metaInfo.nChunks {
@@ -775,8 +803,12 @@ func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, remote st
baseRemote := remote
// Transfer chunks data
for chunkNo := 0; !c.done; chunkNo++ {
tempRemote := f.makeChunkName(baseRemote, chunkNo, "", xactNo)
for c.chunkNo = 0; !c.done; c.chunkNo++ {
if c.chunkNo > maxSafeChunkNumber {
return nil, ErrChunkOverflow
}
tempRemote := f.makeChunkName(baseRemote, c.chunkNo, "", xactNo)
size := c.sizeLeft
if size > c.chunkSize {
size = c.chunkSize
@@ -785,7 +817,7 @@ func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, remote st
// If a single chunk is expected, avoid the extra rename operation
chunkRemote := tempRemote
if c.expectSingle && chunkNo == 0 && optimizeFirstChunk {
if c.expectSingle && c.chunkNo == 0 && optimizeFirstChunk {
chunkRemote = baseRemote
}
info := f.wrapInfo(src, chunkRemote, size)
@@ -836,8 +868,17 @@ func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, remote st
return nil, fmt.Errorf("Incorrect upload size %d != %d", c.readCount, c.sizeTotal)
}
// Finalize the non-chunked object
if len(c.chunks) == 1 {
// Check for input that looks like valid metadata
needMeta := len(c.chunks) > 1
if c.readCount <= maxMetadataSize && len(c.chunks) == 1 {
_, err := unmarshalSimpleJSON(ctx, c.chunks[0], c.smallHead, false)
needMeta = err == nil
}
// Finalize small object as non-chunked.
// This can be bypassed, and single chunk with metadata will be
// created due to unsafe input.
if !needMeta && f.useMeta {
// If previous object was chunked, remove its chunks
f.removeOldChunks(ctx, baseRemote)
@@ -918,10 +959,12 @@ type chunkingReader struct {
readCount int64
chunkSize int64
chunkLimit int64
chunkNo int
err error
done bool
chunks []fs.Object
expectSingle bool
smallHead []byte
fs *Fs
hasher gohash.Hash
md5 string
@@ -1001,6 +1044,9 @@ func (c *chunkingReader) Read(buf []byte) (bytesRead int, err error) {
return
}
c.accountBytes(int64(bytesRead))
if c.chunkNo == 0 && c.expectSingle && bytesRead > 0 && c.readCount <= maxMetadataSize {
c.smallHead = append(c.smallHead, buf[:bytesRead]...)
}
if bytesRead == 0 && c.sizeLeft == 0 {
err = io.EOF // Force EOF when no data left.
}
@@ -1048,16 +1094,25 @@ func (f *Fs) removeOldChunks(ctx context.Context, remote string) {
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
if err := f.forbidChunk(src, src.Remote()); err != nil {
return nil, errors.Wrap(err, "refusing to put")
}
return f.put(ctx, in, src, src.Remote(), options, f.base.Put)
}
// PutStream uploads to the remote path with the modTime given of indeterminate size
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
if err := f.forbidChunk(src, src.Remote()); err != nil {
return nil, errors.Wrap(err, "refusing to upload")
}
return f.put(ctx, in, src, src.Remote(), options, f.base.Features().PutStream)
}
// Update in to the object with the modTime given of the given size
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
if err := o.f.forbidChunk(o, o.Remote()); err != nil {
return errors.Wrap(err, "update refused")
}
if err := o.readMetadata(ctx); err != nil {
// refuse to update a file of unsupported format
return errors.Wrap(err, "refusing to update")
@@ -1080,13 +1135,12 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
//
// This will create a duplicate if we upload a new file without
// checking to see if there is one already - use Put() for that.
// TODO: really split stream here
func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
do := f.base.Features().PutUnchecked
if do == nil {
return nil, errors.New("can't PutUnchecked")
}
// TODO: handle options and chunking!
// TODO: handle range/limit options and really chunk stream here!
o, err := do(ctx, in, f.wrapInfo(src, "", -1))
if err != nil {
return nil, err
@@ -1117,6 +1171,9 @@ func (f *Fs) Hashes() hash.Set {
//
// Shouldn't return an error if it already exists
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
if err := f.forbidChunk(dir, dir); err != nil {
return errors.Wrap(err, "can't mkdir")
}
return f.base.Mkdir(ctx, dir)
}
@@ -1181,6 +1238,11 @@ func (f *Fs) Purge(ctx context.Context) error {
// the `delete hidden` flag above or at least the user has been warned.
//
func (o *Object) Remove(ctx context.Context) (err error) {
if err := o.f.forbidChunk(o, o.Remote()); err != nil {
// operations.Move can still call Remove if chunker's Move refuses
// to corrupt file in hard mode. Hence, refuse to Remove, too.
return errors.Wrap(err, "refuse to corrupt")
}
if err := o.readMetadata(ctx); err != nil {
// Proceed but warn user that unexpected things can happen.
fs.Errorf(o, "Removing a file with unsupported metadata: %v", err)
@@ -1206,6 +1268,9 @@ func (o *Object) Remove(ctx context.Context) (err error) {
// copyOrMove implements copy or move
func (f *Fs) copyOrMove(ctx context.Context, o *Object, remote string, do copyMoveFn, md5, sha1, opName string) (fs.Object, error) {
if err := f.forbidChunk(o, remote); err != nil {
return nil, errors.Wrapf(err, "can't %s", opName)
}
if !o.isComposite() {
fs.Debugf(o, "%s non-chunked object...", opName)
oResult, err := do(ctx, o.mainChunk(), remote) // chain operation to a single wrapped chunk
@@ -1493,6 +1558,9 @@ func (o *Object) addChunk(chunk fs.Object, chunkNo int) error {
o.chunks = append(o.chunks, chunk)
return nil
}
if chunkNo > maxSafeChunkNumber {
return ErrChunkOverflow
}
if chunkNo > len(o.chunks) {
newChunks := make([]fs.Object, (chunkNo + 1), (chunkNo+1)*2)
copy(newChunks, o.chunks)
@@ -1897,20 +1965,31 @@ func (o *Object) ID() string {
// Meta format `simplejson`
type metaSimpleJSON struct {
Version int `json:"ver"`
Size int64 `json:"size"` // total size of data chunks
NChunks int `json:"nchunks"` // number of data chunks
MD5 string `json:"md5"`
SHA1 string `json:"sha1"`
// required core fields
Version *int `json:"ver"`
Size *int64 `json:"size"` // total size of data chunks
ChunkNum *int `json:"nchunks"` // number of data chunks
// optional extra fields
MD5 string `json:"md5,omitempty"`
SHA1 string `json:"sha1,omitempty"`
}
// marshalSimpleJSON
//
// Current implementation creates metadata in two cases:
// - for files larger than chunk size
// - if file contents can be mistaken as meta object
//
func marshalSimpleJSON(ctx context.Context, size int64, nChunks int, md5, sha1 string) ([]byte, error) {
version := metadataVersion
metadata := metaSimpleJSON{
Version: metadataVersion,
Size: size,
NChunks: nChunks,
MD5: md5,
SHA1: sha1,
// required core fields
Version: &version,
Size: &size,
ChunkNum: &nChunks,
// optional extra fields
MD5: md5,
SHA1: sha1,
}
data, err := json.Marshal(&metadata)
if err == nil && data != nil && len(data) >= maxMetadataSize {
@@ -1920,6 +1999,7 @@ func marshalSimpleJSON(ctx context.Context, size int64, nChunks int, md5, sha1 s
return data, err
}
// unmarshalSimpleJSON
// Note: only metadata format version 1 is supported atm.
//
// Current implementation creates metadata only for files larger than
@@ -1931,22 +2011,37 @@ func marshalSimpleJSON(ctx context.Context, size int64, nChunks int, md5, sha1 s
// handled by current implementation.
// The version check below will then explicitly ask user to upgrade rclone.
//
func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte) (info *ObjectInfo, err error) {
if len(data) > maxMetadataSize {
func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte, strictChecks bool) (info *ObjectInfo, err error) {
// Be strict about JSON format
// to reduce possibility that a random small file resembles metadata.
if data != nil && len(data) > maxMetadataSize {
return nil, errors.New("too big")
}
if data == nil || len(data) < 2 || data[0] != '{' || data[len(data)-1] != '}' {
return nil, errors.New("invalid json")
}
var metadata metaSimpleJSON
err = json.Unmarshal(data, &metadata)
if err != nil {
return nil, err
}
// Basic fields are strictly required
// to reduce possibility that a random small file resembles metadata.
if metadata.Version == nil || metadata.Size == nil || metadata.ChunkNum == nil {
return nil, errors.New("missing required field")
}
// Perform strict checks, avoid corruption of future metadata formats.
if metadata.Size < 0 {
if *metadata.Version < 1 {
return nil, errors.New("wrong version")
}
if *metadata.Size < 0 {
return nil, errors.New("negative file size")
}
if metadata.NChunks <= 0 {
return nil, errors.New("wrong number of chunks")
if *metadata.ChunkNum < 0 {
return nil, errors.New("negative number of chunks")
}
if *metadata.ChunkNum > maxSafeChunkNumber {
return nil, ErrChunkOverflow
}
if metadata.MD5 != "" {
_, err = hex.DecodeString(metadata.MD5)
@@ -1960,18 +2055,20 @@ func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte)
return nil, errors.New("wrong sha1 hash")
}
}
if metadata.Version <= 0 {
return nil, errors.New("wrong version number")
// ChunkNum is allowed to be 0 in future versions
if *metadata.ChunkNum < 1 && *metadata.Version <= metadataVersion {
return nil, errors.New("wrong number of chunks")
}
if metadata.Version != metadataVersion {
return nil, errors.Errorf("version %d is not supported, please upgrade rclone", metadata.Version)
// Non-strict mode also accepts future metadata versions
if *metadata.Version > metadataVersion && strictChecks {
return nil, fmt.Errorf("version %d is not supported, please upgrade rclone", metadata.Version)
}
var nilFs *Fs // nil object triggers appropriate type method
info = nilFs.wrapInfo(metaObject, "", metadata.Size)
info = nilFs.wrapInfo(metaObject, "", *metadata.Size)
info.nChunks = *metadata.ChunkNum
info.md5 = metadata.MD5
info.sha1 = metadata.SHA1
info.nChunks = metadata.NChunks
return info, nil
}