Backend: Add groom worker and test stubs #154

Signed-off-by: Michael Mayer <michael@liquidbytes.net>
This commit is contained in:
Michael Mayer
2020-05-26 15:15:14 +02:00
parent 24cc8089fa
commit 968cd71f34
21 changed files with 273 additions and 107 deletions

View File

@@ -193,7 +193,7 @@ func LikePhoto(router *gin.RouterGroup, conf *config.Config) {
return return
} }
if err := m.SetFavorite(true); err != nil{ if err := m.SetFavorite(true); err != nil {
log.Errorf("photo: %s", err.Error()) log.Errorf("photo: %s", err.Error())
c.AbortWithStatusJSON(http.StatusInternalServerError, ErrSaveFailed) c.AbortWithStatusJSON(http.StatusInternalServerError, ErrSaveFailed)
return return
@@ -226,7 +226,7 @@ func DislikePhoto(router *gin.RouterGroup, conf *config.Config) {
return return
} }
if err := m.SetFavorite(false); err != nil{ if err := m.SetFavorite(false); err != nil {
log.Errorf("photo: %s", err.Error()) log.Errorf("photo: %s", err.Error())
c.AbortWithStatusJSON(http.StatusInternalServerError, ErrSaveFailed) c.AbortWithStatusJSON(http.StatusInternalServerError, ErrSaveFailed)
return return

View File

@@ -201,9 +201,10 @@ func (c *Config) Cache() *gc.Cache {
// Shutdown services and workers. // Shutdown services and workers.
func (c *Config) Shutdown() { func (c *Config) Shutdown() {
mutex.Worker.Cancel() mutex.MainWorker.Cancel()
mutex.Share.Cancel() mutex.ShareWorker.Cancel()
mutex.Sync.Cancel() mutex.SyncWorker.Cancel()
mutex.GroomWorker.Cancel()
if err := c.CloseDb(); err != nil { if err := c.CloseDb(); err != nil {
log.Errorf("could not close database connection: %s", err) log.Errorf("could not close database connection: %s", err)

View File

@@ -5,8 +5,14 @@ import (
) )
var ( var (
Db = sync.Mutex{} Db = sync.Mutex{}
Worker = Busy{} MainWorker = Busy{}
Sync = Busy{} SyncWorker = Busy{}
Share = Busy{} ShareWorker = Busy{}
GroomWorker = Busy{}
) )
// WorkersBusy returns true if any worker is busy.
func WorkersBusy() bool {
return MainWorker.Busy() || SyncWorker.Busy() || ShareWorker.Busy() || GroomWorker.Busy()
}

View File

@@ -32,11 +32,11 @@ func NewConvert(conf *config.Config) *Convert {
// Start converts all files in a directory to JPEG if possible. // Start converts all files in a directory to JPEG if possible.
func (c *Convert) Start(path string) error { func (c *Convert) Start(path string) error {
if err := mutex.Worker.Start(); err != nil { if err := mutex.MainWorker.Start(); err != nil {
return err return err
} }
defer mutex.Worker.Stop() defer mutex.MainWorker.Stop()
jobs := make(chan ConvertJob) jobs := make(chan ConvertJob)
@@ -70,7 +70,7 @@ func (c *Convert) Start(path string) error {
} }
}() }()
if mutex.Worker.Canceled() { if mutex.MainWorker.Canceled() {
return errors.New("convert: canceled") return errors.New("convert: canceled")
} }

View File

@@ -58,12 +58,12 @@ func (imp *Import) Start(opt ImportOptions) map[string]bool {
return done return done
} }
if err := mutex.Worker.Start(); err != nil { if err := mutex.MainWorker.Start(); err != nil {
event.Error(fmt.Sprintf("import: %s", err.Error())) event.Error(fmt.Sprintf("import: %s", err.Error()))
return done return done
} }
defer mutex.Worker.Stop() defer mutex.MainWorker.Stop()
if err := ind.tensorFlow.Init(); err != nil { if err := ind.tensorFlow.Init(); err != nil {
log.Errorf("import: %s", err.Error()) log.Errorf("import: %s", err.Error())
@@ -102,7 +102,7 @@ func (imp *Import) Start(opt ImportOptions) map[string]bool {
} }
}() }()
if mutex.Worker.Canceled() { if mutex.MainWorker.Canceled() {
return errors.New("import canceled") return errors.New("import canceled")
} }
@@ -220,7 +220,7 @@ func (imp *Import) Start(opt ImportOptions) map[string]bool {
// Cancel stops the current import operation. // Cancel stops the current import operation.
func (imp *Import) Cancel() { func (imp *Import) Cancel() {
mutex.Worker.Cancel() mutex.MainWorker.Cancel()
} }
// DestinationFilename returns the destination filename of a MediaFile to be imported. // DestinationFilename returns the destination filename of a MediaFile to be imported.

View File

@@ -54,7 +54,7 @@ func (ind *Index) thumbPath() string {
// Cancel stops the current indexing operation. // Cancel stops the current indexing operation.
func (ind *Index) Cancel() { func (ind *Index) Cancel() {
mutex.Worker.Cancel() mutex.MainWorker.Cancel()
} }
// Start indexes media files in the originals directory. // Start indexes media files in the originals directory.
@@ -68,12 +68,12 @@ func (ind *Index) Start(opt IndexOptions) map[string]bool {
return done return done
} }
if err := mutex.Worker.Start(); err != nil { if err := mutex.MainWorker.Start(); err != nil {
event.Error(fmt.Sprintf("index: %s", err.Error())) event.Error(fmt.Sprintf("index: %s", err.Error()))
return done return done
} }
defer mutex.Worker.Stop() defer mutex.MainWorker.Stop()
if err := ind.tensorFlow.Init(); err != nil { if err := ind.tensorFlow.Init(); err != nil {
log.Errorf("index: %s", err.Error()) log.Errorf("index: %s", err.Error())
@@ -112,7 +112,7 @@ func (ind *Index) Start(opt IndexOptions) map[string]bool {
} }
}() }()
if mutex.Worker.Canceled() { if mutex.MainWorker.Canceled() {
return errors.New("indexing canceled") return errors.New("indexing canceled")
} }

View File

@@ -55,14 +55,14 @@ func (prg *Purge) Start(opt PurgeOptions) (purgedFiles map[string]bool, purgedPh
purgedPhotos = make(map[string]bool) purgedPhotos = make(map[string]bool)
originalsPath := prg.originalsPath() originalsPath := prg.originalsPath()
if err := mutex.Worker.Start(); err != nil { if err := mutex.MainWorker.Start(); err != nil {
err = fmt.Errorf("purge: %s", err.Error()) err = fmt.Errorf("purge: %s", err.Error())
event.Error(err.Error()) event.Error(err.Error())
return purgedFiles, purgedPhotos, err return purgedFiles, purgedPhotos, err
} }
defer func() { defer func() {
mutex.Worker.Stop() mutex.MainWorker.Stop()
runtime.GC() runtime.GC()
}() }()
@@ -81,7 +81,7 @@ func (prg *Purge) Start(opt PurgeOptions) (purgedFiles map[string]bool, purgedPh
} }
for _, file := range files { for _, file := range files {
if mutex.Worker.Canceled() { if mutex.MainWorker.Canceled() {
return purgedFiles, purgedPhotos, errors.New("purge canceled") return purgedFiles, purgedPhotos, errors.New("purge canceled")
} }
@@ -107,7 +107,7 @@ func (prg *Purge) Start(opt PurgeOptions) (purgedFiles map[string]bool, purgedPh
} }
} }
if mutex.Worker.Canceled() { if mutex.MainWorker.Canceled() {
return purgedFiles, purgedPhotos, errors.New("purge canceled") return purgedFiles, purgedPhotos, errors.New("purge canceled")
} }
@@ -131,7 +131,7 @@ func (prg *Purge) Start(opt PurgeOptions) (purgedFiles map[string]bool, purgedPh
} }
for _, photo := range photos { for _, photo := range photos {
if mutex.Worker.Canceled() { if mutex.MainWorker.Canceled() {
return purgedFiles, purgedPhotos, errors.New("purge canceled") return purgedFiles, purgedPhotos, errors.New("purge canceled")
} }
@@ -158,7 +158,7 @@ func (prg *Purge) Start(opt PurgeOptions) (purgedFiles map[string]bool, purgedPh
} }
} }
if mutex.Worker.Canceled() { if mutex.MainWorker.Canceled() {
return purgedFiles, purgedPhotos, errors.New("purge canceled") return purgedFiles, purgedPhotos, errors.New("purge canceled")
} }
@@ -182,5 +182,5 @@ func (prg *Purge) Start(opt PurgeOptions) (purgedFiles map[string]bool, purgedPh
// Cancel stops the current purge operation. // Cancel stops the current purge operation.
func (prg *Purge) Cancel() { func (prg *Purge) Cancel() {
mutex.Worker.Cancel() mutex.MainWorker.Cancel()
} }

View File

@@ -24,11 +24,11 @@ func NewResample(conf *config.Config) *Resample {
// Start creates default thumbnails for all files in originalsPath. // Start creates default thumbnails for all files in originalsPath.
func (rs *Resample) Start(force bool) error { func (rs *Resample) Start(force bool) error {
if err := mutex.Worker.Start(); err != nil { if err := mutex.MainWorker.Start(); err != nil {
return err return err
} }
defer mutex.Worker.Stop() defer mutex.MainWorker.Stop()
originalsPath := rs.conf.OriginalsPath() originalsPath := rs.conf.OriginalsPath()
thumbnailsPath := rs.conf.ThumbPath() thumbnailsPath := rs.conf.ThumbPath()
@@ -65,7 +65,7 @@ func (rs *Resample) Start(force bool) error {
} }
}() }()
if mutex.Worker.Canceled() { if mutex.MainWorker.Canceled() {
return errors.New("resample: canceled") return errors.New("resample: canceled")
} }

View File

@@ -1,5 +1,5 @@
/* /*
This package encapsulates JPEG thumbnail generation. This package contains video related types and functions.
Additional information can be found in our Developer Guide: Additional information can be found in our Developer Guide:
@@ -8,12 +8,9 @@ https://github.com/photoprism/photoprism/wiki
package video package video
import ( import (
"github.com/photoprism/photoprism/internal/event"
"github.com/photoprism/photoprism/pkg/fs" "github.com/photoprism/photoprism/pkg/fs"
) )
var log = event.Log
type Type struct { type Type struct {
Format fs.FileType Format fs.FileType
Width int Width int

View File

@@ -0,0 +1,13 @@
package video
import "testing"
func TestTypes(t *testing.T) {
if val := Types[""]; val != TypeMP4 {
t.Fatal("default type should be TypeMP4")
}
if val := Types["mp4"]; val != TypeMP4 {
t.Fatal("mp4 type should be TypeMP4")
}
}

42
internal/workers/groom.go Normal file
View File

@@ -0,0 +1,42 @@
package workers
import (
"github.com/photoprism/photoprism/internal/config"
"github.com/photoprism/photoprism/internal/mutex"
)
// Groom represents a groom worker.
type Groom struct {
conf *config.Config
}
// NewGroom returns a new groom worker.
func NewGroom(conf *config.Config) *Groom {
return &Groom{conf: conf}
}
// logError logs an error message if err is not nil.
func (worker *Groom) logError(err error) {
if err != nil {
log.Errorf("groom: %s", err.Error())
}
}
// logWarn logs a warning message if err is not nil.
func (worker *Groom) logWarn(err error) {
if err != nil {
log.Warnf("groom: %s", err.Error())
}
}
// Start starts the groom worker.
func (worker *Groom) Start() (err error) {
if err := mutex.GroomWorker.Start(); err != nil {
worker.logWarn(err)
return err
}
defer mutex.GroomWorker.Stop()
return err
}

View File

@@ -0,0 +1,31 @@
package workers
import (
"testing"
"github.com/photoprism/photoprism/internal/config"
"github.com/photoprism/photoprism/internal/mutex"
"github.com/stretchr/testify/assert"
)
func TestGroom_Start(t *testing.T) {
conf := config.TestConfig()
worker := NewGroom(conf)
assert.IsType(t, &Groom{}, worker)
if err := mutex.GroomWorker.Start(); err != nil {
t.Fatal(err)
}
if err := worker.Start(); err == nil {
t.Fatal("error expected")
}
mutex.GroomWorker.Stop()
if err := worker.Start(); err != nil {
t.Fatal(err)
}
}

View File

@@ -21,19 +21,26 @@ type Share struct {
conf *config.Config conf *config.Config
} }
// NewShare returns a new service share worker. // NewShare returns a new share worker.
func NewShare(conf *config.Config) *Share { func NewShare(conf *config.Config) *Share {
return &Share{conf: conf} return &Share{conf: conf}
} }
// logError logs an error message if err is not nil.
func (worker *Share) logError(err error) {
if err != nil {
log.Errorf("share: %s", err.Error())
}
}
// Start starts the share worker. // Start starts the share worker.
func (s *Share) Start() (err error) { func (worker *Share) Start() (err error) {
if err := mutex.Share.Start(); err != nil { if err := mutex.ShareWorker.Start(); err != nil {
event.Error(fmt.Sprintf("share: %s", err.Error())) event.Error(fmt.Sprintf("share: %s", err.Error()))
return err return err
} }
defer mutex.Share.Stop() defer mutex.ShareWorker.Stop()
f := form.AccountSearch{ f := form.AccountSearch{
Share: true, Share: true,
@@ -44,7 +51,7 @@ func (s *Share) Start() (err error) {
// Upload newly shared files // Upload newly shared files
for _, a := range accounts { for _, a := range accounts {
if mutex.Share.Canceled() { if mutex.ShareWorker.Canceled() {
return nil return nil
} }
@@ -55,7 +62,7 @@ func (s *Share) Start() (err error) {
files, err := query.FileShares(a.ID, entity.FileShareNew) files, err := query.FileShares(a.ID, entity.FileShareNew)
if err != nil { if err != nil {
log.Errorf("share: %s", err.Error()) worker.logError(err)
continue continue
} }
@@ -68,7 +75,7 @@ func (s *Share) Start() (err error) {
existingDirs := make(map[string]string) existingDirs := make(map[string]string)
for _, file := range files { for _, file := range files {
if mutex.Share.Canceled() { if mutex.ShareWorker.Canceled() {
return nil return nil
} }
@@ -81,7 +88,7 @@ func (s *Share) Start() (err error) {
} }
} }
srcFileName := path.Join(s.conf.OriginalsPath(), file.File.FileName) srcFileName := path.Join(worker.conf.OriginalsPath(), file.File.FileName)
if a.ShareSize != "" { if a.ShareSize != "" {
thumbType, ok := thumb.Types[a.ShareSize] thumbType, ok := thumb.Types[a.ShareSize]
@@ -91,16 +98,16 @@ func (s *Share) Start() (err error) {
continue continue
} }
srcFileName, err = thumb.FromFile(srcFileName, file.File.FileHash, s.conf.ThumbPath(), thumbType.Width, thumbType.Height, thumbType.Options...) srcFileName, err = thumb.FromFile(srcFileName, file.File.FileHash, worker.conf.ThumbPath(), thumbType.Width, thumbType.Height, thumbType.Options...)
if err != nil { if err != nil {
log.Errorf("share: %s", err) worker.logError(err)
continue continue
} }
} }
if err := client.Upload(srcFileName, file.RemoteName); err != nil { if err := client.Upload(srcFileName, file.RemoteName); err != nil {
log.Errorf("share: %s", err.Error()) worker.logError(err)
file.Errors++ file.Errors++
file.Error = err.Error() file.Error = err.Error()
} else { } else {
@@ -114,19 +121,17 @@ func (s *Share) Start() (err error) {
file.Status = entity.FileShareError file.Status = entity.FileShareError
} }
if mutex.Share.Canceled() { if mutex.ShareWorker.Canceled() {
return nil return nil
} }
if err := entity.Db().Save(&file).Error; err != nil { worker.logError(entity.Db().Save(&file).Error)
log.Errorf("share: %s", err.Error())
}
} }
} }
// Remove previously shared files if expired // Remove previously shared files if expired
for _, a := range accounts { for _, a := range accounts {
if mutex.Share.Canceled() { if mutex.ShareWorker.Canceled() {
return nil return nil
} }
@@ -137,7 +142,7 @@ func (s *Share) Start() (err error) {
files, err := query.ExpiredFileShares(a) files, err := query.ExpiredFileShares(a)
if err != nil { if err != nil {
log.Errorf("share: %s", err.Error()) worker.logError(err)
continue continue
} }
@@ -149,7 +154,7 @@ func (s *Share) Start() (err error) {
client := webdav.New(a.AccURL, a.AccUser, a.AccPass) client := webdav.New(a.AccURL, a.AccUser, a.AccPass)
for _, file := range files { for _, file := range files {
if mutex.Share.Canceled() { if mutex.ShareWorker.Canceled() {
return nil return nil
} }
@@ -164,7 +169,7 @@ func (s *Share) Start() (err error) {
} }
if err := entity.Db().Save(&file).Error; err != nil { if err := entity.Db().Save(&file).Error; err != nil {
log.Errorf("share: %s", err.Error()) worker.logError(err)
} }
} }
} }

View File

@@ -0,0 +1,16 @@
package workers
import (
"testing"
"github.com/photoprism/photoprism/internal/config"
"github.com/stretchr/testify/assert"
)
func TestNewShare(t *testing.T) {
conf := config.TestConfig()
worker := NewShare(conf)
assert.IsType(t, &Share{}, worker)
}

View File

@@ -18,28 +18,35 @@ type Sync struct {
conf *config.Config conf *config.Config
} }
// NewSync returns a new service sync worker. // NewSync returns a new sync worker.
func NewSync(conf *config.Config) *Sync { func NewSync(conf *config.Config) *Sync {
return &Sync{ return &Sync{
conf: conf, conf: conf,
} }
} }
// Report logs an error message if err is not nil. // logError logs an error message if err is not nil.
func (s *Sync) report(err error) { func (worker *Sync) logError(err error) {
if err != nil { if err != nil {
log.Errorf("sync: %s", err.Error()) log.Errorf("sync: %s", err.Error())
} }
} }
// logWarn logs a warning message if err is not nil.
func (worker *Sync) logWarn(err error) {
if err != nil {
log.Warnf("sync: %s", err.Error())
}
}
// Start starts the sync worker. // Start starts the sync worker.
func (s *Sync) Start() (err error) { func (worker *Sync) Start() (err error) {
if err := mutex.Sync.Start(); err != nil { if err := mutex.SyncWorker.Start(); err != nil {
event.Error(fmt.Sprintf("sync: %s", err.Error())) event.Error(fmt.Sprintf("sync: %s", err.Error()))
return err return err
} }
defer mutex.Sync.Stop() defer mutex.SyncWorker.Stop()
f := form.AccountSearch{ f := form.AccountSearch{
Sync: true, Sync: true,
@@ -57,7 +64,7 @@ func (s *Sync) Start() (err error) {
a.AccSync = false a.AccSync = false
if err := entity.Db().Save(&a).Error; err != nil { if err := entity.Db().Save(&a).Error; err != nil {
log.Errorf("sync: %s", err.Error()) worker.logError(err)
} else { } else {
log.Warnf("sync: disabled sync, %s failed more than %d times", a.AccName, a.RetryLimit) log.Warnf("sync: disabled sync, %s failed more than %d times", a.AccName, a.RetryLimit)
} }
@@ -74,7 +81,7 @@ func (s *Sync) Start() (err error) {
switch a.SyncStatus { switch a.SyncStatus {
case entity.AccountSyncStatusRefresh: case entity.AccountSyncStatusRefresh:
if complete, err := s.refresh(a); err != nil { if complete, err := worker.refresh(a); err != nil {
accErrors++ accErrors++
accError = err.Error() accError = err.Error()
} else if complete { } else if complete {
@@ -92,7 +99,7 @@ func (s *Sync) Start() (err error) {
} }
} }
case entity.AccountSyncStatusDownload: case entity.AccountSyncStatusDownload:
if complete, err := s.download(a); err != nil { if complete, err := worker.download(a); err != nil {
accErrors++ accErrors++
accError = err.Error() accError = err.Error()
} else if complete { } else if complete {
@@ -106,7 +113,7 @@ func (s *Sync) Start() (err error) {
} }
} }
case entity.AccountSyncStatusUpload: case entity.AccountSyncStatusUpload:
if complete, err := s.upload(a); err != nil { if complete, err := worker.upload(a); err != nil {
accErrors++ accErrors++
accError = err.Error() accError = err.Error()
} else if complete { } else if complete {
@@ -123,17 +130,17 @@ func (s *Sync) Start() (err error) {
syncStatus = entity.AccountSyncStatusRefresh syncStatus = entity.AccountSyncStatusRefresh
} }
if mutex.Sync.Canceled() { if mutex.SyncWorker.Canceled() {
return nil return nil
} }
// Only update the following fields to avoid overwriting other settings // Only update the following fields to avoid overwriting other settings
if err := a.Updates(map[string]interface{}{ if err := a.Updates(map[string]interface{}{
"AccError": accError, "AccError": accError,
"AccErrors": accErrors, "AccErrors": accErrors,
"SyncStatus": syncStatus, "SyncStatus": syncStatus,
"SyncDate": syncDate}); err != nil { "SyncDate": syncDate}); err != nil {
log.Errorf("sync: %s", err.Error()) worker.logError(err)
} else if synced { } else if synced {
event.Publish("sync.synced", event.Data{"account": a}) event.Publish("sync.synced", event.Data{"account": a})
} }

View File

@@ -17,12 +17,12 @@ import (
type Downloads map[string][]entity.FileSync type Downloads map[string][]entity.FileSync
// downloadPath returns a temporary download path. // downloadPath returns a temporary download path.
func (s *Sync) downloadPath() string { func (worker *Sync) downloadPath() string {
return s.conf.TempPath() + "/sync" return worker.conf.TempPath() + "/sync"
} }
// relatedDownloads returns files to be downloaded grouped by prefix. // relatedDownloads returns files to be downloaded grouped by prefix.
func (s *Sync) relatedDownloads(a entity.Account) (result Downloads, err error) { func (worker *Sync) relatedDownloads(a entity.Account) (result Downloads, err error) {
result = make(Downloads) result = make(Downloads)
maxResults := 1000 maxResults := 1000
@@ -35,7 +35,7 @@ func (s *Sync) relatedDownloads(a entity.Account) (result Downloads, err error)
// Group results by directory and base name // Group results by directory and base name
for i, file := range files { for i, file := range files {
k := fs.AbsBase(file.RemoteName, s.conf.Settings().Index.Group) k := fs.AbsBase(file.RemoteName, worker.conf.Settings().Index.Group)
result[k] = append(result[k], file) result[k] = append(result[k], file)
@@ -49,7 +49,7 @@ func (s *Sync) relatedDownloads(a entity.Account) (result Downloads, err error)
} }
// Downloads remote files in batches and imports / indexes them // Downloads remote files in batches and imports / indexes them
func (s *Sync) download(a entity.Account) (complete bool, err error) { func (worker *Sync) download(a entity.Account) (complete bool, err error) {
// Set up index worker // Set up index worker
indexJobs := make(chan photoprism.IndexJob) indexJobs := make(chan photoprism.IndexJob)
@@ -61,10 +61,10 @@ func (s *Sync) download(a entity.Account) (complete bool, err error) {
go photoprism.ImportWorker(importJobs) go photoprism.ImportWorker(importJobs)
defer close(importJobs) defer close(importJobs)
relatedFiles, err := s.relatedDownloads(a) relatedFiles, err := worker.relatedDownloads(a)
if err != nil { if err != nil {
log.Errorf("sync: %s", err.Error()) worker.logError(err)
return false, err return false, err
} }
@@ -81,16 +81,16 @@ func (s *Sync) download(a entity.Account) (complete bool, err error) {
var baseDir string var baseDir string
if a.SyncFilenames { if a.SyncFilenames {
baseDir = s.conf.OriginalsPath() baseDir = worker.conf.OriginalsPath()
} else { } else {
baseDir = fmt.Sprintf("%s/%d", s.downloadPath(), a.ID) baseDir = fmt.Sprintf("%s/%d", worker.downloadPath(), a.ID)
} }
done := make(map[string]bool) done := make(map[string]bool)
for _, files := range relatedFiles { for _, files := range relatedFiles {
for i, file := range files { for i, file := range files {
if mutex.Sync.Canceled() { if mutex.SyncWorker.Canceled() {
return false, nil return false, nil
} }
@@ -106,7 +106,7 @@ func (s *Sync) download(a entity.Account) (complete bool, err error) {
file.Status = entity.FileSyncExists file.Status = entity.FileSyncExists
} else { } else {
if err := client.Download(file.RemoteName, localName, false); err != nil { if err := client.Download(file.RemoteName, localName, false); err != nil {
log.Errorf("sync: %s", err.Error()) worker.logError(err)
file.Errors++ file.Errors++
file.Error = err.Error() file.Error = err.Error()
} else { } else {
@@ -114,13 +114,13 @@ func (s *Sync) download(a entity.Account) (complete bool, err error) {
file.Status = entity.FileSyncDownloaded file.Status = entity.FileSyncDownloaded
} }
if mutex.Sync.Canceled() { if mutex.SyncWorker.Canceled() {
return false, nil return false, nil
} }
} }
if err := entity.Db().Save(&file).Error; err != nil { if err := entity.Db().Save(&file).Error; err != nil {
log.Errorf("sync: %s", err.Error()) worker.logError(err)
} else { } else {
files[i] = file files[i] = file
} }
@@ -137,10 +137,10 @@ func (s *Sync) download(a entity.Account) (complete bool, err error) {
continue continue
} }
related, err := mf.RelatedFiles(s.conf.Settings().Index.Group) related, err := mf.RelatedFiles(worker.conf.Settings().Index.Group)
if err != nil { if err != nil {
log.Warnf("sync: %s", err.Error()) worker.logWarn(err)
continue continue
} }
@@ -180,9 +180,7 @@ func (s *Sync) download(a entity.Account) (complete bool, err error) {
} }
if len(done) > 0 { if len(done) > 0 {
if err := entity.UpdatePhotoCounts(); err != nil { worker.logError(entity.UpdatePhotoCounts())
log.Errorf("sync: %s", err)
}
} }
return false, nil return false, nil

View File

@@ -9,7 +9,7 @@ import (
) )
// Updates the local list of remote files so that they can be downloaded in batches // Updates the local list of remote files so that they can be downloaded in batches
func (s *Sync) refresh(a entity.Account) (complete bool, err error) { func (worker *Sync) refresh(a entity.Account) (complete bool, err error) {
if a.AccType != remote.ServiceWebDAV { if a.AccType != remote.ServiceWebDAV {
return false, nil return false, nil
} }
@@ -26,7 +26,7 @@ func (s *Sync) refresh(a entity.Account) (complete bool, err error) {
dirs := append(subDirs.Abs(), a.SyncPath) dirs := append(subDirs.Abs(), a.SyncPath)
for _, dir := range dirs { for _, dir := range dirs {
if mutex.Sync.Canceled() { if mutex.SyncWorker.Canceled() {
return false, nil return false, nil
} }
@@ -38,7 +38,7 @@ func (s *Sync) refresh(a entity.Account) (complete bool, err error) {
} }
for _, file := range files { for _, file := range files {
if mutex.Sync.Canceled() { if mutex.SyncWorker.Canceled() {
return false, nil return false, nil
} }
@@ -69,11 +69,11 @@ func (s *Sync) refresh(a entity.Account) (complete bool, err error) {
} }
if f.Status == entity.FileSyncIgnore && mediaType == fs.MediaRaw && a.SyncRaw { if f.Status == entity.FileSyncIgnore && mediaType == fs.MediaRaw && a.SyncRaw {
s.report(f.Update("Status", entity.FileSyncNew)) worker.logError(f.Update("Status", entity.FileSyncNew))
} }
if f.Status == entity.FileSyncDownloaded && !f.RemoteDate.Equal(file.Date) { if f.Status == entity.FileSyncDownloaded && !f.RemoteDate.Equal(file.Date) {
s.report(f.Updates(map[string]interface{}{ worker.logError(f.Updates(map[string]interface{}{
"Status": entity.FileSyncNew, "Status": entity.FileSyncNew,
"RemoteDate": file.Date, "RemoteDate": file.Date,
"RemoteSize": file.Size, "RemoteSize": file.Size,

View File

@@ -0,0 +1,16 @@
package workers
import (
"testing"
"github.com/photoprism/photoprism/internal/config"
"github.com/stretchr/testify/assert"
)
func TestNewSync(t *testing.T) {
conf := config.TestConfig()
worker := NewSync(conf)
assert.IsType(t, &Sync{}, worker)
}

View File

@@ -13,7 +13,7 @@ import (
) )
// Uploads local files to a remote account // Uploads local files to a remote account
func (s *Sync) upload(a entity.Account) (complete bool, err error) { func (worker *Sync) upload(a entity.Account) (complete bool, err error) {
maxResults := 250 maxResults := 250
// Get upload file list from database // Get upload file list from database
@@ -33,11 +33,11 @@ func (s *Sync) upload(a entity.Account) (complete bool, err error) {
existingDirs := make(map[string]string) existingDirs := make(map[string]string)
for _, file := range files { for _, file := range files {
if mutex.Sync.Canceled() { if mutex.SyncWorker.Canceled() {
return false, nil return false, nil
} }
fileName := path.Join(s.conf.OriginalsPath(), file.FileName) fileName := path.Join(worker.conf.OriginalsPath(), file.FileName)
remoteName := path.Join(a.SyncPath, file.FileName) remoteName := path.Join(a.SyncPath, file.FileName)
remoteDir := filepath.Dir(remoteName) remoteDir := filepath.Dir(remoteName)
@@ -49,7 +49,7 @@ func (s *Sync) upload(a entity.Account) (complete bool, err error) {
} }
if err := client.Upload(fileName, remoteName); err != nil { if err := client.Upload(fileName, remoteName); err != nil {
log.Errorf("sync: %s", err.Error()) worker.logError(err)
continue // try again next time continue // try again next time
} }
@@ -63,13 +63,11 @@ func (s *Sync) upload(a entity.Account) (complete bool, err error) {
fileSync.Error = "" fileSync.Error = ""
fileSync.Errors = 0 fileSync.Errors = 0
if mutex.Sync.Canceled() { if mutex.SyncWorker.Canceled() {
return false, nil return false, nil
} }
if err := entity.Db().Save(&fileSync).Error; err != nil { worker.logError(entity.Db().Save(&fileSync).Error)
log.Errorf("sync: %s", err.Error())
}
} }
return false, nil return false, nil

View File

@@ -21,10 +21,12 @@ func Start(conf *config.Config) {
case <-stop: case <-stop:
log.Info("shutting down workers") log.Info("shutting down workers")
ticker.Stop() ticker.Stop()
mutex.Share.Cancel() mutex.GroomWorker.Cancel()
mutex.Sync.Cancel() mutex.ShareWorker.Cancel()
mutex.SyncWorker.Cancel()
return return
case <-ticker.C: case <-ticker.C:
StartGroom(conf)
StartShare(conf) StartShare(conf)
StartSync(conf) StartSync(conf)
} }
@@ -37,12 +39,24 @@ func Stop() {
stop <- true stop <- true
} }
// StartGroom runs the groom worker once.
func StartGroom(conf *config.Config) {
if !mutex.WorkersBusy() {
go func() {
worker := NewGroom(conf)
if err := worker.Start(); err != nil {
log.Error(err)
}
}()
}
}
// StartShare runs the share worker once. // StartShare runs the share worker once.
func StartShare(conf *config.Config) { func StartShare(conf *config.Config) {
if !mutex.Share.Busy() { if !mutex.ShareWorker.Busy() {
go func() { go func() {
s := NewShare(conf) worker := NewShare(conf)
if err := s.Start(); err != nil { if err := worker.Start(); err != nil {
log.Error(err) log.Error(err)
} }
}() }()
@@ -51,10 +65,10 @@ func StartShare(conf *config.Config) {
// StartShare runs the sync worker once. // StartShare runs the sync worker once.
func StartSync(conf *config.Config) { func StartSync(conf *config.Config) {
if !mutex.Sync.Busy() { if !mutex.SyncWorker.Busy() {
go func() { go func() {
s := NewSync(conf) worker := NewSync(conf)
if err := s.Start(); err != nil { if err := worker.Start(); err != nil {
log.Error(err) log.Error(err)
} }
}() }()

View File

@@ -0,0 +1,22 @@
package workers
import (
"os"
"testing"
"github.com/photoprism/photoprism/internal/config"
"github.com/sirupsen/logrus"
)
func TestMain(m *testing.M) {
log = logrus.StandardLogger()
log.SetLevel(logrus.DebugLevel)
c := config.TestConfig()
code := m.Run()
_ = c.CloseDb()
os.Exit(code)
}