Use a WaitGroup to wait for both export and import goroutine to finish

This commit is contained in:
Philipp Wolfer 2025-05-05 17:49:44 +02:00
parent 17cee9cb8b
commit a87c42059f
No known key found for this signature in database
GPG key ID: 8FDF744D4919943B
4 changed files with 26 additions and 14 deletions

View file

@ -16,6 +16,7 @@ Scotty. If not, see <https://www.gnu.org/licenses/>.
package backends package backends
import ( import (
"sync"
"time" "time"
"go.uploadedlobster.com/scotty/internal/models" "go.uploadedlobster.com/scotty/internal/models"
@ -23,7 +24,7 @@ import (
type ExportProcessor[T models.ListensResult | models.LovesResult] interface { type ExportProcessor[T models.ListensResult | models.LovesResult] interface {
ExportBackend() models.Backend ExportBackend() models.Backend
Process(oldestTimestamp time.Time, results chan T, progress chan models.TransferProgress) Process(wg *sync.WaitGroup, oldestTimestamp time.Time, results chan T, progress chan models.TransferProgress)
} }
type ListensExportProcessor struct { type ListensExportProcessor struct {
@ -34,7 +35,9 @@ func (p ListensExportProcessor) ExportBackend() models.Backend {
return p.Backend return p.Backend
} }
func (p ListensExportProcessor) Process(oldestTimestamp time.Time, results chan models.ListensResult, progress chan models.TransferProgress) { func (p ListensExportProcessor) Process(wg *sync.WaitGroup, oldestTimestamp time.Time, results chan models.ListensResult, progress chan models.TransferProgress) {
wg.Add(1)
defer wg.Done()
defer close(results) defer close(results)
p.Backend.ExportListens(oldestTimestamp, results, progress) p.Backend.ExportListens(oldestTimestamp, results, progress)
} }
@ -47,7 +50,9 @@ func (p LovesExportProcessor) ExportBackend() models.Backend {
return p.Backend return p.Backend
} }
func (p LovesExportProcessor) Process(oldestTimestamp time.Time, results chan models.LovesResult, progress chan models.TransferProgress) { func (p LovesExportProcessor) Process(wg *sync.WaitGroup, oldestTimestamp time.Time, results chan models.LovesResult, progress chan models.TransferProgress) {
wg.Add(1)
defer wg.Done()
defer close(results) defer close(results)
p.Backend.ExportLoves(oldestTimestamp, results, progress) p.Backend.ExportLoves(oldestTimestamp, results, progress)
} }

View file

@ -18,12 +18,14 @@ Scotty. If not, see <https://www.gnu.org/licenses/>.
package backends package backends
import ( import (
"sync"
"go.uploadedlobster.com/scotty/internal/models" "go.uploadedlobster.com/scotty/internal/models"
) )
type ImportProcessor[T models.ListensResult | models.LovesResult] interface { type ImportProcessor[T models.ListensResult | models.LovesResult] interface {
ImportBackend() models.ImportBackend ImportBackend() models.ImportBackend
Process(results chan T, out chan models.ImportResult, progress chan models.TransferProgress) Process(wg *sync.WaitGroup, results chan T, out chan models.ImportResult, progress chan models.TransferProgress)
Import(export T, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) Import(export T, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error)
} }
@ -35,8 +37,8 @@ func (p ListensImportProcessor) ImportBackend() models.ImportBackend {
return p.Backend return p.Backend
} }
func (p ListensImportProcessor) Process(results chan models.ListensResult, out chan models.ImportResult, progress chan models.TransferProgress) { func (p ListensImportProcessor) Process(wg *sync.WaitGroup, results chan models.ListensResult, out chan models.ImportResult, progress chan models.TransferProgress) {
process(p, results, out, progress) process(wg, p, results, out, progress)
} }
func (p ListensImportProcessor) Import(export models.ListensResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { func (p ListensImportProcessor) Import(export models.ListensResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
@ -64,8 +66,8 @@ func (p LovesImportProcessor) ImportBackend() models.ImportBackend {
return p.Backend return p.Backend
} }
func (p LovesImportProcessor) Process(results chan models.LovesResult, out chan models.ImportResult, progress chan models.TransferProgress) { func (p LovesImportProcessor) Process(wg *sync.WaitGroup, results chan models.LovesResult, out chan models.ImportResult, progress chan models.TransferProgress) {
process(p, results, out, progress) process(wg, p, results, out, progress)
} }
func (p LovesImportProcessor) Import(export models.LovesResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { func (p LovesImportProcessor) Import(export models.LovesResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
@ -85,7 +87,9 @@ func (p LovesImportProcessor) Import(export models.LovesResult, result models.Im
return importResult, nil return importResult, nil
} }
func process[R models.LovesResult | models.ListensResult, P ImportProcessor[R]](processor P, results chan R, out chan models.ImportResult, progress chan models.TransferProgress) { func process[R models.LovesResult | models.ListensResult, P ImportProcessor[R]](wg *sync.WaitGroup, processor P, results chan R, out chan models.ImportResult, progress chan models.TransferProgress) {
wg.Add(1)
defer wg.Done()
defer close(out) defer close(out)
result := models.ImportResult{} result := models.ImportResult{}
p := models.TransferProgress{} p := models.TransferProgress{}

View file

@ -60,8 +60,7 @@ func setupProgressBars(updateChan chan models.TransferProgress) progressBarUpdat
return u return u
} }
func (u *progressBarUpdater) wait() { func (u *progressBarUpdater) close() {
// FIXME: This should probably be closed elsewhere
close(u.updateChan) close(u.updateChan)
u.progress.Wait() u.progress.Wait()
} }

View file

@ -19,6 +19,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -112,15 +113,18 @@ func (c *TransferCmd[E, I, R]) Transfer(exp backends.ExportProcessor[R], imp bac
progressChan := make(chan models.TransferProgress) progressChan := make(chan models.TransferProgress)
progress := setupProgressBars(progressChan) progress := setupProgressBars(progressChan)
wg := &sync.WaitGroup{}
// Export from source // Export from source
exportChan := make(chan R, 1000) exportChan := make(chan R, 1000)
go exp.Process(timestamp, exportChan, progressChan) go exp.Process(wg, timestamp, exportChan, progressChan)
// Import into target // Import into target
resultChan := make(chan models.ImportResult) resultChan := make(chan models.ImportResult)
go imp.Process(exportChan, resultChan, progressChan) go imp.Process(wg, exportChan, resultChan, progressChan)
result := <-resultChan result := <-resultChan
progress.wait() wg.Wait()
progress.close()
// Update timestamp // Update timestamp
err = c.updateTimestamp(&result, timestamp) err = c.updateTimestamp(&result, timestamp)