From a87c42059f48e750aa11adb28af586f341a9be7e Mon Sep 17 00:00:00 2001 From: Philipp Wolfer Date: Mon, 5 May 2025 17:49:44 +0200 Subject: [PATCH] Use a WaitGroup to wait for both export and import goroutine to finish --- internal/backends/export.go | 11 ++++++++--- internal/backends/import.go | 16 ++++++++++------ internal/cli/progress.go | 3 +-- internal/cli/transfer.go | 10 +++++++--- 4 files changed, 26 insertions(+), 14 deletions(-) diff --git a/internal/backends/export.go b/internal/backends/export.go index c7a1f58..54daafb 100644 --- a/internal/backends/export.go +++ b/internal/backends/export.go @@ -16,6 +16,7 @@ Scotty. If not, see . package backends import ( + "sync" "time" "go.uploadedlobster.com/scotty/internal/models" @@ -23,7 +24,7 @@ import ( type ExportProcessor[T models.ListensResult | models.LovesResult] interface { ExportBackend() models.Backend - Process(oldestTimestamp time.Time, results chan T, progress chan models.TransferProgress) + Process(wg *sync.WaitGroup, oldestTimestamp time.Time, results chan T, progress chan models.TransferProgress) } type ListensExportProcessor struct { @@ -34,7 +35,9 @@ func (p ListensExportProcessor) ExportBackend() models.Backend { return p.Backend } -func (p ListensExportProcessor) Process(oldestTimestamp time.Time, results chan models.ListensResult, progress chan models.TransferProgress) { +func (p ListensExportProcessor) Process(wg *sync.WaitGroup, oldestTimestamp time.Time, results chan models.ListensResult, progress chan models.TransferProgress) { + wg.Add(1) + defer wg.Done() defer close(results) p.Backend.ExportListens(oldestTimestamp, results, progress) } @@ -47,7 +50,9 @@ func (p LovesExportProcessor) ExportBackend() models.Backend { return p.Backend } -func (p LovesExportProcessor) Process(oldestTimestamp time.Time, results chan models.LovesResult, progress chan models.TransferProgress) { +func (p LovesExportProcessor) Process(wg *sync.WaitGroup, oldestTimestamp time.Time, results chan models.LovesResult, progress chan models.TransferProgress) { + wg.Add(1) + defer wg.Done() defer close(results) p.Backend.ExportLoves(oldestTimestamp, results, progress) } diff --git a/internal/backends/import.go b/internal/backends/import.go index d3b86ac..0a2e341 100644 --- a/internal/backends/import.go +++ b/internal/backends/import.go @@ -18,12 +18,14 @@ Scotty. If not, see . package backends import ( + "sync" + "go.uploadedlobster.com/scotty/internal/models" ) type ImportProcessor[T models.ListensResult | models.LovesResult] interface { ImportBackend() models.ImportBackend - Process(results chan T, out chan models.ImportResult, progress chan models.TransferProgress) + Process(wg *sync.WaitGroup, results chan T, out chan models.ImportResult, progress chan models.TransferProgress) Import(export T, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) } @@ -35,8 +37,8 @@ func (p ListensImportProcessor) ImportBackend() models.ImportBackend { return p.Backend } -func (p ListensImportProcessor) Process(results chan models.ListensResult, out chan models.ImportResult, progress chan models.TransferProgress) { - process(p, results, out, progress) +func (p ListensImportProcessor) Process(wg *sync.WaitGroup, results chan models.ListensResult, out chan models.ImportResult, progress chan models.TransferProgress) { + process(wg, p, results, out, progress) } func (p ListensImportProcessor) Import(export models.ListensResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { @@ -64,8 +66,8 @@ func (p LovesImportProcessor) ImportBackend() models.ImportBackend { return p.Backend } -func (p LovesImportProcessor) Process(results chan models.LovesResult, out chan models.ImportResult, progress chan models.TransferProgress) { - process(p, results, out, progress) +func (p LovesImportProcessor) Process(wg *sync.WaitGroup, results chan models.LovesResult, out chan models.ImportResult, progress chan models.TransferProgress) { + process(wg, p, results, out, progress) } func (p LovesImportProcessor) Import(export models.LovesResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { @@ -85,7 +87,9 @@ func (p LovesImportProcessor) Import(export models.LovesResult, result models.Im return importResult, nil } -func process[R models.LovesResult | models.ListensResult, P ImportProcessor[R]](processor P, results chan R, out chan models.ImportResult, progress chan models.TransferProgress) { +func process[R models.LovesResult | models.ListensResult, P ImportProcessor[R]](wg *sync.WaitGroup, processor P, results chan R, out chan models.ImportResult, progress chan models.TransferProgress) { + wg.Add(1) + defer wg.Done() defer close(out) result := models.ImportResult{} p := models.TransferProgress{} diff --git a/internal/cli/progress.go b/internal/cli/progress.go index 6696226..e93ec18 100644 --- a/internal/cli/progress.go +++ b/internal/cli/progress.go @@ -60,8 +60,7 @@ func setupProgressBars(updateChan chan models.TransferProgress) progressBarUpdat return u } -func (u *progressBarUpdater) wait() { - // FIXME: This should probably be closed elsewhere +func (u *progressBarUpdater) close() { close(u.updateChan) u.progress.Wait() } diff --git a/internal/cli/transfer.go b/internal/cli/transfer.go index b16c590..62dd079 100644 --- a/internal/cli/transfer.go +++ b/internal/cli/transfer.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "strconv" + "sync" "time" "github.com/spf13/cobra" @@ -112,15 +113,18 @@ func (c *TransferCmd[E, I, R]) Transfer(exp backends.ExportProcessor[R], imp bac progressChan := make(chan models.TransferProgress) progress := setupProgressBars(progressChan) + wg := &sync.WaitGroup{} + // Export from source exportChan := make(chan R, 1000) - go exp.Process(timestamp, exportChan, progressChan) + go exp.Process(wg, timestamp, exportChan, progressChan) // Import into target resultChan := make(chan models.ImportResult) - go imp.Process(exportChan, resultChan, progressChan) + go imp.Process(wg, exportChan, resultChan, progressChan) result := <-resultChan - progress.wait() + wg.Wait() + progress.close() // Update timestamp err = c.updateTimestamp(&result, timestamp)