Files
rclone/backend/shade/upload.go
jhasse-shade 175d4bc553
Some checks failed
build / windows (push) Has been cancelled
build / other_os (push) Has been cancelled
build / mac_amd64 (push) Has been cancelled
build / mac_arm64 (push) Has been cancelled
build / linux (push) Has been cancelled
build / go1.24 (push) Has been cancelled
build / linux_386 (push) Has been cancelled
build / lint (push) Has been cancelled
build / android-all (push) Has been cancelled
Build & Push Docker Images / Build Docker Image for linux/386 (push) Has been cancelled
Build & Push Docker Images / Build Docker Image for linux/amd64 (push) Has been cancelled
Build & Push Docker Images / Build Docker Image for linux/arm/v6 (push) Has been cancelled
Build & Push Docker Images / Build Docker Image for linux/arm/v7 (push) Has been cancelled
Build & Push Docker Images / Build Docker Image for linux/arm64 (push) Has been cancelled
Build & Push Docker Images / Merge & Push Final Docker Image (push) Has been cancelled
Add Shade backend
2025-12-09 17:08:57 +00:00

337 lines
8.4 KiB
Go

//multipart upload for shade
package shade
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/url"
"path"
"sort"
"sync"
"github.com/rclone/rclone/backend/shade/api"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/chunksize"
"github.com/rclone/rclone/lib/multipart"
"github.com/rclone/rclone/lib/rest"
)
var warnStreamUpload sync.Once
type shadeChunkWriter struct {
initToken string
chunkSize int64
size int64
f *Fs
o *Object
completedParts []api.CompletedPart
completedPartsMu sync.Mutex
}
// uploadMultipart handles multipart upload for larger files
func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, options ...fs.OpenOption) error {
chunkWriter, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{
Open: o.fs,
OpenOptions: options,
})
if err != nil {
return err
}
var shadeWriter = chunkWriter.(*shadeChunkWriter)
o.size = shadeWriter.size
return nil
}
// OpenChunkWriter returns the chunk size and a ChunkWriter
//
// Pass in the remote and the src object
// You can also use options to hint at the desired chunk size
func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) {
// Temporary Object under construction
o := &Object{
fs: f,
remote: remote,
}
uploadParts := f.opt.MaxUploadParts
if uploadParts < 1 {
uploadParts = 1
} else if uploadParts > maxUploadParts {
uploadParts = maxUploadParts
}
size := src.Size()
fs.FixRangeOption(options, size)
// calculate size of parts
chunkSize := f.opt.ChunkSize
// size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize
// buffers here (default 64 MB). With a maximum number of parts (10,000) this will be a file of
// 640 GB.
if size == -1 {
warnStreamUpload.Do(func() {
fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v",
chunkSize, fs.SizeSuffix(int64(chunkSize)*int64(uploadParts)))
})
} else {
chunkSize = chunksize.Calculator(src, size, uploadParts, chunkSize)
}
token, err := o.fs.refreshJWTToken(ctx)
if err != nil {
return info, nil, fmt.Errorf("failed to get token: %w", err)
}
err = f.ensureParentDirectories(ctx, remote)
if err != nil {
return info, nil, fmt.Errorf("failed to ensure parent directories: %w", err)
}
fullPath := remote
if f.root != "" {
fullPath = path.Join(f.root, remote)
}
// Initiate multipart upload
type initRequest struct {
Path string `json:"path"`
PartSize int64 `json:"partSize"`
}
reqBody := initRequest{
Path: fullPath,
PartSize: int64(chunkSize),
}
var initResp struct {
Token string `json:"token"`
}
opts := rest.Opts{
Method: "POST",
Path: fmt.Sprintf("/%s/upload/multipart", o.fs.drive),
RootURL: o.fs.endpoint,
ExtraHeaders: map[string]string{
"Authorization": "Bearer " + token,
},
Options: options,
}
err = o.fs.pacer.Call(func() (bool, error) {
res, err := o.fs.srv.CallJSON(ctx, &opts, reqBody, &initResp)
if err != nil {
return res != nil && res.StatusCode == http.StatusTooManyRequests, err
}
return false, nil
})
if err != nil {
return info, nil, fmt.Errorf("failed to initiate multipart upload: %w", err)
}
chunkWriter := &shadeChunkWriter{
initToken: initResp.Token,
chunkSize: int64(chunkSize),
size: size,
f: f,
o: o,
}
info = fs.ChunkWriterInfo{
ChunkSize: int64(chunkSize),
Concurrency: f.opt.Concurrency,
LeavePartsOnError: false,
}
return info, chunkWriter, err
}
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
func (s *shadeChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (bytesWritten int64, err error) {
token, err := s.f.refreshJWTToken(ctx)
if err != nil {
return 0, err
}
// Read chunk
var chunk bytes.Buffer
n, err := io.Copy(&chunk, reader)
if n == 0 {
return 0, nil
}
if err != nil {
return 0, fmt.Errorf("failed to read chunk: %w", err)
}
// Get presigned URL for this part
var partURL api.PartURL
partOpts := rest.Opts{
Method: "POST",
Path: fmt.Sprintf("/%s/upload/multipart/part/%d?token=%s", s.f.drive, chunkNumber+1, url.QueryEscape(s.initToken)),
RootURL: s.f.endpoint,
ExtraHeaders: map[string]string{
"Authorization": "Bearer " + token,
},
}
err = s.f.pacer.Call(func() (bool, error) {
res, err := s.f.srv.CallJSON(ctx, &partOpts, nil, &partURL)
if err != nil {
return res != nil && res.StatusCode == http.StatusTooManyRequests, err
}
return false, nil
})
if err != nil {
return 0, fmt.Errorf("failed to get part URL: %w", err)
}
opts := rest.Opts{
Method: "PUT",
RootURL: partURL.URL,
Body: &chunk,
ContentType: "",
ContentLength: &n,
}
// Add headers
var uploadRes *http.Response
if len(partURL.Headers) > 0 {
opts.ExtraHeaders = make(map[string]string)
for k, v := range partURL.Headers {
opts.ExtraHeaders[k] = v
}
}
err = s.f.pacer.Call(func() (bool, error) {
uploadRes, err = s.f.srv.Call(ctx, &opts)
if err != nil {
return uploadRes != nil && uploadRes.StatusCode == http.StatusTooManyRequests, err
}
return false, nil
})
if err != nil {
return 0, fmt.Errorf("failed to upload part %d: %w", chunk, err)
}
if uploadRes.StatusCode != http.StatusOK && uploadRes.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(uploadRes.Body)
fs.CheckClose(uploadRes.Body, &err)
return 0, fmt.Errorf("part upload failed with status %d: %s", uploadRes.StatusCode, string(body))
}
// Get ETag from response
etag := uploadRes.Header.Get("ETag")
fs.CheckClose(uploadRes.Body, &err)
s.completedPartsMu.Lock()
defer s.completedPartsMu.Unlock()
s.completedParts = append(s.completedParts, api.CompletedPart{
PartNumber: int32(chunkNumber + 1),
ETag: etag,
})
return n, nil
}
// Close complete chunked writer finalising the file.
func (s *shadeChunkWriter) Close(ctx context.Context) error {
// Complete multipart upload
sort.Slice(s.completedParts, func(i, j int) bool {
return s.completedParts[i].PartNumber < s.completedParts[j].PartNumber
})
type completeRequest struct {
Parts []api.CompletedPart `json:"parts"`
}
var completeBody completeRequest
if s.completedParts == nil {
completeBody = completeRequest{Parts: []api.CompletedPart{}}
} else {
completeBody = completeRequest{Parts: s.completedParts}
}
token, err := s.f.refreshJWTToken(ctx)
if err != nil {
return err
}
completeOpts := rest.Opts{
Method: "POST",
Path: fmt.Sprintf("/%s/upload/multipart/complete?token=%s", s.f.drive, url.QueryEscape(s.initToken)),
RootURL: s.f.endpoint,
ExtraHeaders: map[string]string{
"Authorization": "Bearer " + token,
},
}
var response http.Response
err = s.f.pacer.Call(func() (bool, error) {
res, err := s.f.srv.CallJSON(ctx, &completeOpts, completeBody, &response)
if err != nil && res == nil {
return false, err
}
if res.StatusCode == http.StatusTooManyRequests {
return true, err // Retry on 429
}
if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(res.Body)
return false, fmt.Errorf("complete multipart failed with status %d: %s", res.StatusCode, string(body))
}
return false, nil
})
if err != nil {
return fmt.Errorf("failed to complete multipart upload: %w", err)
}
return nil
}
// Abort chunk write
//
// You can and should call Abort without calling Close.
func (s *shadeChunkWriter) Abort(ctx context.Context) error {
token, err := s.f.refreshJWTToken(ctx)
if err != nil {
return err
}
opts := rest.Opts{
Method: "POST",
Path: fmt.Sprintf("/%s/upload/abort/multipart?token=%s", s.f.drive, url.QueryEscape(s.initToken)),
RootURL: s.f.endpoint,
ExtraHeaders: map[string]string{
"Authorization": "Bearer " + token,
},
}
err = s.f.pacer.Call(func() (bool, error) {
res, err := s.f.srv.Call(ctx, &opts)
if err != nil {
fs.Debugf(s.f, "Failed to abort multipart upload: %v", err)
return false, nil // Don't retry abort
}
if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusCreated {
fs.Debugf(s.f, "Abort returned status %d", res.StatusCode)
}
return false, nil
})
if err != nil {
return fmt.Errorf("failed to abort multipart upload: %w", err)
}
return nil
}