dropbox: add polling support - fixes #2949

This implements polling support for the Dropbox backend. The Dropbox SDK dependency had to be updated due to an auth issue, which was fixed on Jan 12 2021. A secondary internal Dropbox service was created to handle unauthorized SDK requests, as is necessary when using the ListFolderLongpoll function/endpoint. The config variable was renamed to cfg to avoid potential conflicts with the imported config package.
This commit is contained in:
Robert Thomas
2021-02-24 09:33:31 +00:00
committed by GitHub
parent f6265fbeff
commit e5aa92c922
3 changed files with 163 additions and 14 deletions

View File

@@ -219,11 +219,11 @@ shared folder.`,
// as invalid characters.
// Testing revealed names with trailing spaces and the DEL character don't work.
// Also encode invalid UTF-8 bytes as json doesn't handle them properly.
Default: (encoder.Base |
Default: encoder.Base |
encoder.EncodeBackSlash |
encoder.EncodeDel |
encoder.EncodeRightSpace |
encoder.EncodeInvalidUtf8),
encoder.EncodeInvalidUtf8,
}}...),
})
}
@@ -242,8 +242,10 @@ type Fs struct {
name string // name of this remote
root string // the path we are working on
opt Options // parsed options
ci *fs.ConfigInfo // global config
features *fs.Features // optional features
srv files.Client // the connection to the dropbox server
svc files.Client // the connection to the dropbox server (unauthorized)
sharing sharing.Client // as above, but for generating sharing links
users users.Client // as above, but for accessing user information
team team.Client // for the Teams API
@@ -367,22 +369,29 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
return nil, errors.Wrap(err, "failed to configure dropbox")
}
ci := fs.GetConfig(ctx)
f := &Fs{
name: name,
opt: *opt,
ci: ci,
pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
}
config := dropbox.Config{
cfg := dropbox.Config{
LogLevel: dropbox.LogOff, // logging in the SDK: LogOff, LogDebug, LogInfo
Client: oAuthClient, // maybe???
HeaderGenerator: f.headerGenerator,
}
// unauthorized config for endpoints that fail with auth
ucfg := dropbox.Config{
LogLevel: dropbox.LogOff, // logging in the SDK: LogOff, LogDebug, LogInfo
}
// NOTE: needs to be created pre-impersonation so we can look up the impersonated user
f.team = team.New(config)
f.team = team.New(cfg)
if opt.Impersonate != "" {
user := team.UserSelectorArg{
Email: opt.Impersonate,
}
@@ -397,12 +406,13 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
return nil, errors.Wrapf(err, "invalid dropbox team member: %q", opt.Impersonate)
}
config.AsMemberID = memberIds[0].MemberInfo.Profile.MemberProfile.TeamMemberId
cfg.AsMemberID = memberIds[0].MemberInfo.Profile.MemberProfile.TeamMemberId
}
f.srv = files.New(config)
f.sharing = sharing.New(config)
f.users = users.New(config)
f.srv = files.New(cfg)
f.svc = files.New(ucfg)
f.sharing = sharing.New(cfg)
f.users = users.New(cfg)
f.features = (&fs.Features{
CaseInsensitive: true,
ReadMimeType: false,
@@ -661,7 +671,7 @@ func (f *Fs) findSharedFolder(name string) (id string, err error) {
return "", fs.ErrorDirNotFound
}
// mountSharedFolders mount a shared folder to the root namespace
// mountSharedFolder mount a shared folder to the root namespace
func (f *Fs) mountSharedFolder(id string) error {
arg := sharing.MountFolderArg{
SharedFolderId: id,
@@ -673,7 +683,7 @@ func (f *Fs) mountSharedFolder(id string) error {
return err
}
// listSharedFolders lists shared the user as access to (note this means individual
// listReceivedFiles lists shared the user as access to (note this means individual
// files not files contained in shared folders)
func (f *Fs) listReceivedFiles() (entries fs.DirEntries, err error) {
started := false
@@ -1191,6 +1201,144 @@ func (f *Fs) About(ctx context.Context) (usage *fs.Usage, err error) {
return usage, nil
}
// ChangeNotify calls the passed function with a path that has had changes.
// If the implementation uses polling, it should adhere to the given interval.
//
// Automatically restarts itself in case of unexpected behavior of the remote.
//
// Close the returned channel to stop being notified.
func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) {
go func() {
// get the StartCursor early so all changes from now on get processed
startCursor, err := f.changeNotifyCursor()
if err != nil {
fs.Infof(f, "Failed to get StartCursor: %s", err)
}
var ticker *time.Ticker
var tickerC <-chan time.Time
for {
select {
case pollInterval, ok := <-pollIntervalChan:
if !ok {
if ticker != nil {
ticker.Stop()
}
return
}
if ticker != nil {
ticker.Stop()
ticker, tickerC = nil, nil
}
if pollInterval != 0 {
ticker = time.NewTicker(pollInterval)
tickerC = ticker.C
}
case <-tickerC:
if startCursor == "" {
startCursor, err = f.changeNotifyCursor()
if err != nil {
fs.Infof(f, "Failed to get StartCursor: %s", err)
continue
}
}
fs.Debugf(f, "Checking for changes on remote")
startCursor, err = f.changeNotifyRunner(ctx, notifyFunc, startCursor)
if err != nil {
fs.Infof(f, "Change notify listener failure: %s", err)
}
}
}
}()
}
func (f *Fs) changeNotifyCursor() (cursor string, err error) {
var startCursor *files.ListFolderGetLatestCursorResult
err = f.pacer.Call(func() (bool, error) {
startCursor, err = f.srv.ListFolderGetLatestCursor(&files.ListFolderArg{Path: f.opt.Enc.FromStandardPath(f.slashRoot), Recursive: true})
return shouldRetry(err)
})
if err != nil {
return
}
return startCursor.Cursor, nil
}
func (f *Fs) changeNotifyRunner(ctx context.Context, notifyFunc func(string, fs.EntryType), startCursor string) (newCursor string, err error) {
cursor := startCursor
var res *files.ListFolderLongpollResult
// Dropbox sets a timeout range of 30 - 480
timeout := uint64(f.ci.Timeout / time.Second)
if timeout > 480 {
timeout = 480
}
err = f.pacer.Call(func() (bool, error) {
args := files.ListFolderLongpollArg{
Cursor: cursor,
Timeout: timeout,
}
res, err = f.svc.ListFolderLongpoll(&args)
return shouldRetry(err)
})
if err != nil {
return
}
if !res.Changes {
return cursor, nil
}
if res.Backoff != 0 {
fs.Debugf(f, "Waiting to poll for %d seconds", res.Backoff)
time.Sleep(time.Duration(res.Backoff) * time.Second)
}
for {
var changeList *files.ListFolderResult
arg := files.ListFolderContinueArg{
Cursor: cursor,
}
err = f.pacer.Call(func() (bool, error) {
changeList, err = f.srv.ListFolderContinue(&arg)
return shouldRetry(err)
})
if err != nil {
return "", errors.Wrap(err, "list continue")
}
cursor = changeList.Cursor
var entryType fs.EntryType
for _, entry := range changeList.Entries {
entryPath := ""
switch info := entry.(type) {
case *files.FolderMetadata:
entryType = fs.EntryDirectory
entryPath = strings.TrimLeft(info.PathDisplay, f.slashRootSlash)
case *files.FileMetadata:
entryType = fs.EntryObject
entryPath = strings.TrimLeft(info.PathDisplay, f.slashRootSlash)
case *files.DeletedMetadata:
entryType = fs.EntryObject
entryPath = strings.TrimLeft(info.PathDisplay, f.slashRootSlash)
default:
fs.Errorf(entry, "dropbox ChangeNotify: ignoring unknown EntryType %T", entry)
continue
}
if entryPath != "" {
notifyFunc(entryPath, entryType)
}
}
if !changeList.HasMore {
break
}
}
return cursor, nil
}
// Hashes returns the supported hash sets.
func (f *Fs) Hashes() hash.Set {
return hash.Set(DbHashType)