rest: add context propagation to rest library #3257

This fixes up the calling and propagates the contexts for the backends
which use lib/rest.
This commit is contained in:
Nick Craig-Wood
2019-09-04 20:00:37 +01:00
parent ba1daea072
commit 58a531a203
19 changed files with 448 additions and 439 deletions

View File

@@ -273,11 +273,11 @@ func (f *Fs) shouldRetryNoReauth(resp *http.Response, err error) (bool, error) {
// shouldRetry returns a boolean as to whether this resp and err
// deserve to be retried. It returns the err as a convenience
func (f *Fs) shouldRetry(resp *http.Response, err error) (bool, error) {
func (f *Fs) shouldRetry(ctx context.Context, resp *http.Response, err error) (bool, error) {
if resp != nil && resp.StatusCode == 401 {
fs.Debugf(f, "Unauthorized: %v", err)
// Reauth
authErr := f.authorizeAccount()
authErr := f.authorizeAccount(ctx)
if authErr != nil {
err = authErr
}
@@ -393,7 +393,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
fs.Debugf(f, "Setting test header \"%s: %s\"", testModeHeader, testMode)
}
f.fillBufferTokens()
err = f.authorizeAccount()
err = f.authorizeAccount(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to authorize account")
}
@@ -431,7 +431,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
// authorizeAccount gets the API endpoint and auth token. Can be used
// for reauthentication too.
func (f *Fs) authorizeAccount() error {
func (f *Fs) authorizeAccount(ctx context.Context) error {
f.authMu.Lock()
defer f.authMu.Unlock()
opts := rest.Opts{
@@ -443,7 +443,7 @@ func (f *Fs) authorizeAccount() error {
ExtraHeaders: map[string]string{"Authorization": ""}, // unset the Authorization for this request
}
err := f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, nil, &f.info)
resp, err := f.srv.CallJSON(ctx, &opts, nil, &f.info)
return f.shouldRetryNoReauth(resp, err)
})
if err != nil {
@@ -466,10 +466,10 @@ func (f *Fs) hasPermission(permission string) bool {
// getUploadURL returns the upload info with the UploadURL and the AuthorizationToken
//
// This should be returned with returnUploadURL when finished
func (f *Fs) getUploadURL(bucket string) (upload *api.GetUploadURLResponse, err error) {
func (f *Fs) getUploadURL(ctx context.Context, bucket string) (upload *api.GetUploadURLResponse, err error) {
f.uploadMu.Lock()
defer f.uploadMu.Unlock()
bucketID, err := f.getBucketID(bucket)
bucketID, err := f.getBucketID(ctx, bucket)
if err != nil {
return nil, err
}
@@ -489,8 +489,8 @@ func (f *Fs) getUploadURL(bucket string) (upload *api.GetUploadURLResponse, err
BucketID: bucketID,
}
err = f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &request, &upload)
return f.shouldRetry(resp, err)
resp, err := f.srv.CallJSON(ctx, &opts, &request, &upload)
return f.shouldRetry(ctx, resp, err)
})
if err != nil {
return nil, errors.Wrap(err, "failed to get upload URL")
@@ -609,7 +609,7 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
if !recurse {
delimiter = "/"
}
bucketID, err := f.getBucketID(bucket)
bucketID, err := f.getBucketID(ctx, bucket)
if err != nil {
return err
}
@@ -636,8 +636,8 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
for {
var response api.ListFileNamesResponse
err := f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &request, &response)
return f.shouldRetry(resp, err)
resp, err := f.srv.CallJSON(ctx, &opts, &request, &response)
return f.shouldRetry(ctx, resp, err)
})
if err != nil {
return err
@@ -727,7 +727,7 @@ func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addB
// listBuckets returns all the buckets to out
func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) {
err = f.listBucketsToFn(func(bucket *api.Bucket) error {
err = f.listBucketsToFn(ctx, func(bucket *api.Bucket) error {
d := fs.NewDir(bucket.Name, time.Time{})
entries = append(entries, d)
return nil
@@ -820,7 +820,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
type listBucketFn func(*api.Bucket) error
// listBucketsToFn lists the buckets to the function supplied
func (f *Fs) listBucketsToFn(fn listBucketFn) error {
func (f *Fs) listBucketsToFn(ctx context.Context, fn listBucketFn) error {
var account = api.ListBucketsRequest{
AccountID: f.info.AccountID,
BucketID: f.info.Allowed.BucketID,
@@ -832,8 +832,8 @@ func (f *Fs) listBucketsToFn(fn listBucketFn) error {
Path: "/b2_list_buckets",
}
err := f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &account, &response)
return f.shouldRetry(resp, err)
resp, err := f.srv.CallJSON(ctx, &opts, &account, &response)
return f.shouldRetry(ctx, resp, err)
})
if err != nil {
return err
@@ -862,14 +862,14 @@ func (f *Fs) listBucketsToFn(fn listBucketFn) error {
// getbucketType finds the bucketType for the current bucket name
// can be one of allPublic. allPrivate, or snapshot
func (f *Fs) getbucketType(bucket string) (bucketType string, err error) {
func (f *Fs) getbucketType(ctx context.Context, bucket string) (bucketType string, err error) {
f.bucketTypeMutex.Lock()
bucketType = f._bucketType[bucket]
f.bucketTypeMutex.Unlock()
if bucketType != "" {
return bucketType, nil
}
err = f.listBucketsToFn(func(bucket *api.Bucket) error {
err = f.listBucketsToFn(ctx, func(bucket *api.Bucket) error {
// listBucketsToFn reads bucket Types
return nil
})
@@ -897,14 +897,14 @@ func (f *Fs) clearBucketType(bucket string) {
}
// getBucketID finds the ID for the current bucket name
func (f *Fs) getBucketID(bucket string) (bucketID string, err error) {
func (f *Fs) getBucketID(ctx context.Context, bucket string) (bucketID string, err error) {
f.bucketIDMutex.Lock()
bucketID = f._bucketID[bucket]
f.bucketIDMutex.Unlock()
if bucketID != "" {
return bucketID, nil
}
err = f.listBucketsToFn(func(bucket *api.Bucket) error {
err = f.listBucketsToFn(ctx, func(bucket *api.Bucket) error {
// listBucketsToFn sets IDs
return nil
})
@@ -970,15 +970,15 @@ func (f *Fs) makeBucket(ctx context.Context, bucket string) error {
}
var response api.Bucket
err := f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &request, &response)
return f.shouldRetry(resp, err)
resp, err := f.srv.CallJSON(ctx, &opts, &request, &response)
return f.shouldRetry(ctx, resp, err)
})
if err != nil {
if apiErr, ok := err.(*api.Error); ok {
if apiErr.Code == "duplicate_bucket_name" {
// Check this is our bucket - buckets are globally unique and this
// might be someone elses.
_, getBucketErr := f.getBucketID(bucket)
_, getBucketErr := f.getBucketID(ctx, bucket)
if getBucketErr == nil {
// found so it is our bucket
return nil
@@ -1009,7 +1009,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error {
Method: "POST",
Path: "/b2_delete_bucket",
}
bucketID, err := f.getBucketID(bucket)
bucketID, err := f.getBucketID(ctx, bucket)
if err != nil {
return err
}
@@ -1019,8 +1019,8 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error {
}
var response api.Bucket
err = f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &request, &response)
return f.shouldRetry(resp, err)
resp, err := f.srv.CallJSON(ctx, &opts, &request, &response)
return f.shouldRetry(ctx, resp, err)
})
if err != nil {
return errors.Wrap(err, "failed to delete bucket")
@@ -1038,8 +1038,8 @@ func (f *Fs) Precision() time.Duration {
}
// hide hides a file on the remote
func (f *Fs) hide(bucket, bucketPath string) error {
bucketID, err := f.getBucketID(bucket)
func (f *Fs) hide(ctx context.Context, bucket, bucketPath string) error {
bucketID, err := f.getBucketID(ctx, bucket)
if err != nil {
return err
}
@@ -1053,8 +1053,8 @@ func (f *Fs) hide(bucket, bucketPath string) error {
}
var response api.File
err = f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &request, &response)
return f.shouldRetry(resp, err)
resp, err := f.srv.CallJSON(ctx, &opts, &request, &response)
return f.shouldRetry(ctx, resp, err)
})
if err != nil {
if apiErr, ok := err.(*api.Error); ok {
@@ -1070,7 +1070,7 @@ func (f *Fs) hide(bucket, bucketPath string) error {
}
// deleteByID deletes a file version given Name and ID
func (f *Fs) deleteByID(ID, Name string) error {
func (f *Fs) deleteByID(ctx context.Context, ID, Name string) error {
opts := rest.Opts{
Method: "POST",
Path: "/b2_delete_file_version",
@@ -1081,8 +1081,8 @@ func (f *Fs) deleteByID(ID, Name string) error {
}
var response api.File
err := f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &request, &response)
return f.shouldRetry(resp, err)
resp, err := f.srv.CallJSON(ctx, &opts, &request, &response)
return f.shouldRetry(ctx, resp, err)
})
if err != nil {
return errors.Wrapf(err, "failed to delete %q", Name)
@@ -1132,7 +1132,7 @@ func (f *Fs) purge(ctx context.Context, bucket, directory string, oldOnly bool)
continue
}
tr := accounting.Stats(ctx).NewCheckingTransfer(oi)
err = f.deleteByID(object.ID, object.Name)
err = f.deleteByID(ctx, object.ID, object.Name)
checkErr(err)
tr.Done(err)
}
@@ -1205,7 +1205,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
fs.Debugf(src, "Can't copy - not same remote type")
return nil, fs.ErrorCantCopy
}
destBucketID, err := f.getBucketID(dstBucket)
destBucketID, err := f.getBucketID(ctx, dstBucket)
if err != nil {
return nil, err
}
@@ -1221,8 +1221,8 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
}
var response api.FileInfo
err = f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &request, &response)
return f.shouldRetry(resp, err)
resp, err := f.srv.CallJSON(ctx, &opts, &request, &response)
return f.shouldRetry(ctx, resp, err)
})
if err != nil {
return nil, err
@@ -1245,7 +1245,7 @@ func (f *Fs) Hashes() hash.Set {
// getDownloadAuthorization returns authorization token for downloading
// without account.
func (f *Fs) getDownloadAuthorization(bucket, remote string) (authorization string, err error) {
func (f *Fs) getDownloadAuthorization(ctx context.Context, bucket, remote string) (authorization string, err error) {
validDurationInSeconds := time.Duration(f.opt.DownloadAuthorizationDuration).Nanoseconds() / 1e9
if validDurationInSeconds <= 0 || validDurationInSeconds > 604800 {
return "", errors.New("--b2-download-auth-duration must be between 1 sec and 1 week")
@@ -1253,7 +1253,7 @@ func (f *Fs) getDownloadAuthorization(bucket, remote string) (authorization stri
if !f.hasPermission("shareFiles") {
return "", errors.New("sharing a file link requires the shareFiles permission")
}
bucketID, err := f.getBucketID(bucket)
bucketID, err := f.getBucketID(ctx, bucket)
if err != nil {
return "", err
}
@@ -1268,8 +1268,8 @@ func (f *Fs) getDownloadAuthorization(bucket, remote string) (authorization stri
}
var response api.GetDownloadAuthorizationResponse
err = f.pacer.Call(func() (bool, error) {
resp, err := f.srv.CallJSON(&opts, &request, &response)
return f.shouldRetry(resp, err)
resp, err := f.srv.CallJSON(ctx, &opts, &request, &response)
return f.shouldRetry(ctx, resp, err)
})
if err != nil {
return "", errors.Wrap(err, "failed to get download authorization")
@@ -1301,12 +1301,12 @@ func (f *Fs) PublicLink(ctx context.Context, remote string) (link string, err er
}
absPath := "/" + bucketPath
link = RootURL + "/file/" + urlEncode(bucket) + absPath
bucketType, err := f.getbucketType(bucket)
bucketType, err := f.getbucketType(ctx, bucket)
if err != nil {
return "", err
}
if bucketType == "allPrivate" || bucketType == "snapshot" {
AuthorizationToken, err := f.getDownloadAuthorization(bucket, remote)
AuthorizationToken, err := f.getDownloadAuthorization(ctx, bucket, remote)
if err != nil {
return "", err
}
@@ -1505,8 +1505,8 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
}
var response api.FileInfo
err = o.fs.pacer.Call(func() (bool, error) {
resp, err := o.fs.srv.CallJSON(&opts, &request, &response)
return o.fs.shouldRetry(resp, err)
resp, err := o.fs.srv.CallJSON(ctx, &opts, &request, &response)
return o.fs.shouldRetry(ctx, resp, err)
})
if err != nil {
return err
@@ -1604,8 +1604,8 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
}
var resp *http.Response
err = o.fs.pacer.Call(func() (bool, error) {
resp, err = o.fs.srv.Call(&opts)
return o.fs.shouldRetry(resp, err)
resp, err = o.fs.srv.Call(ctx, &opts)
return o.fs.shouldRetry(ctx, resp, err)
})
if err != nil {
return nil, errors.Wrap(err, "failed to open for download")
@@ -1701,7 +1701,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
o.fs.putUploadBlock(buf)
return err
}
return up.Stream(buf)
return up.Stream(ctx, buf)
} else if err == io.EOF || err == io.ErrUnexpectedEOF {
fs.Debugf(o, "File has %d bytes, which makes only one chunk. Using direct upload.", n)
defer o.fs.putUploadBlock(buf)
@@ -1715,7 +1715,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
if err != nil {
return err
}
return up.Upload()
return up.Upload(ctx)
}
modTime := src.ModTime(ctx)
@@ -1729,7 +1729,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
}
// Get upload URL
upload, err := o.fs.getUploadURL(bucket)
upload, err := o.fs.getUploadURL(ctx, bucket)
if err != nil {
return err
}
@@ -1807,8 +1807,8 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
var response api.FileInfo
// Don't retry, return a retry error instead
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
resp, err := o.fs.srv.CallJSON(&opts, nil, &response)
retry, err := o.fs.shouldRetry(resp, err)
resp, err := o.fs.srv.CallJSON(ctx, &opts, nil, &response)
retry, err := o.fs.shouldRetry(ctx, resp, err)
// On retryable error clear UploadURL
if retry {
fs.Debugf(o, "Clearing upload URL because of error: %v", err)
@@ -1829,9 +1829,9 @@ func (o *Object) Remove(ctx context.Context) error {
return errNotWithVersions
}
if o.fs.opt.HardDelete {
return o.fs.deleteByID(o.id, bucketPath)
return o.fs.deleteByID(ctx, o.id, bucketPath)
}
return o.fs.hide(bucket, bucketPath)
return o.fs.hide(ctx, bucket, bucketPath)
}
// MimeType of an Object if known, "" otherwise