mirror of
https://github.com/rclone/rclone.git
synced 2025-12-11 22:14:05 +01:00
union: add :writback to act as a simple cache
This adds a :writeback tag to upstreams. If set on a single upstream then it writes back objects not found into that upstream. Fixes #6934
This commit is contained in:
@@ -17,8 +17,9 @@ import (
|
||||
// This is a wrapped object which returns the Union Fs as its parent
|
||||
type Object struct {
|
||||
*upstream.Object
|
||||
fs *Fs // what this object is part of
|
||||
co []upstream.Entry
|
||||
fs *Fs // what this object is part of
|
||||
co []upstream.Entry
|
||||
writebackMu sync.Mutex
|
||||
}
|
||||
|
||||
// Directory describes a union Directory
|
||||
@@ -34,6 +35,13 @@ type entry interface {
|
||||
candidates() []upstream.Entry
|
||||
}
|
||||
|
||||
// Update o with the contents of newO excluding the lock
|
||||
func (o *Object) update(newO *Object) {
|
||||
o.Object = newO.Object
|
||||
o.fs = newO.fs
|
||||
o.co = newO.co
|
||||
}
|
||||
|
||||
// UnWrapUpstream returns the upstream Object that this Object is wrapping
|
||||
func (o *Object) UnWrapUpstream() *upstream.Object {
|
||||
return o.Object
|
||||
@@ -67,7 +75,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||
return err
|
||||
}
|
||||
// Update current object
|
||||
*o = *newO.(*Object)
|
||||
o.update(newO.(*Object))
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
@@ -175,6 +183,25 @@ func (o *Object) SetTier(tier string) error {
|
||||
return do.SetTier(tier)
|
||||
}
|
||||
|
||||
// Open opens the file for read. Call Close() on the returned io.ReadCloser
|
||||
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
|
||||
// Need some sort of locking to prevent multiple downloads
|
||||
o.writebackMu.Lock()
|
||||
defer o.writebackMu.Unlock()
|
||||
|
||||
// FIXME what if correct object is already in o.co
|
||||
|
||||
newObj, err := o.Object.Writeback(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if newObj != nil {
|
||||
o.Object = newObj
|
||||
o.co = append(o.co, newObj) // FIXME should this append or overwrite or update?
|
||||
}
|
||||
return o.Object.Object.Open(ctx, options...)
|
||||
}
|
||||
|
||||
// ModTime returns the modification date of the directory
|
||||
// It returns the latest ModTime of all candidates
|
||||
func (d *Directory) ModTime(ctx context.Context) (t time.Time) {
|
||||
|
||||
@@ -877,6 +877,10 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
||||
opt: *opt,
|
||||
upstreams: usedUpstreams,
|
||||
}
|
||||
err = upstream.Prepare(f.upstreams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f.actionPolicy, err = policy.Get(opt.ActionPolicy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/cache"
|
||||
"github.com/rclone/rclone/fs/fspath"
|
||||
"github.com/rclone/rclone/fs/operations"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -37,6 +38,8 @@ type Fs struct {
|
||||
cacheMutex sync.RWMutex
|
||||
cacheOnce sync.Once
|
||||
cacheUpdate bool // if the cache is updating
|
||||
writeback bool // writeback to this upstream
|
||||
writebackFs *Fs // if non zero, writeback to this upstream
|
||||
}
|
||||
|
||||
// Directory describes a wrapped Directory
|
||||
@@ -86,6 +89,9 @@ func New(ctx context.Context, remote, root string, opt *common.Options) (*Fs, er
|
||||
f.writable = true
|
||||
f.creatable = false
|
||||
fsPath = fsPath[0 : len(fsPath)-3]
|
||||
} else if strings.HasSuffix(fsPath, ":writeback") {
|
||||
f.writeback = true
|
||||
fsPath = fsPath[0 : len(fsPath)-len(":writeback")]
|
||||
}
|
||||
remote = configName + fsPath
|
||||
rFs, err := cache.Get(ctx, remote)
|
||||
@@ -103,6 +109,29 @@ func New(ctx context.Context, remote, root string, opt *common.Options) (*Fs, er
|
||||
return f, err
|
||||
}
|
||||
|
||||
// Prepare the configured upstreams as a group
|
||||
func Prepare(fses []*Fs) error {
|
||||
writebacks := 0
|
||||
var writebackFs *Fs
|
||||
for _, f := range fses {
|
||||
if f.writeback {
|
||||
writebackFs = f
|
||||
writebacks++
|
||||
}
|
||||
}
|
||||
if writebacks == 0 {
|
||||
return nil
|
||||
} else if writebacks > 1 {
|
||||
return fmt.Errorf("can only have 1 :writeback not %d", writebacks)
|
||||
}
|
||||
for _, f := range fses {
|
||||
if !f.writeback {
|
||||
f.writebackFs = writebackFs
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WrapDirectory wraps an fs.Directory to include the info
|
||||
// of the upstream Fs
|
||||
func (f *Fs) WrapDirectory(e fs.Directory) *Directory {
|
||||
@@ -293,6 +322,28 @@ func (o *Object) Metadata(ctx context.Context) (fs.Metadata, error) {
|
||||
return do.Metadata(ctx)
|
||||
}
|
||||
|
||||
// Writeback writes the object back and returns a new object
|
||||
//
|
||||
// If it returns nil, nil then the original object is OK
|
||||
func (o *Object) Writeback(ctx context.Context) (*Object, error) {
|
||||
if o.f.writebackFs == nil {
|
||||
return nil, nil
|
||||
}
|
||||
newObj, err := operations.Copy(ctx, o.f.writebackFs.Fs, nil, o.Object.Remote(), o.Object)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// newObj could be nil here
|
||||
if newObj == nil {
|
||||
fs.Errorf(o, "nil Object returned from operations.Copy")
|
||||
return nil, nil
|
||||
}
|
||||
return &Object{
|
||||
Object: newObj,
|
||||
f: o.f,
|
||||
}, err
|
||||
}
|
||||
|
||||
// About gets quota information from the Fs
|
||||
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
|
||||
if f.cacheExpiry.Load() <= time.Now().Unix() {
|
||||
|
||||
Reference in New Issue
Block a user