Prepare using a context for export / import

This will allow cancelling the export if the import fails
before the export finished.

For now the context isn't passed on to the actual export functions,
hence there is not yet any cancellation happening.
This commit is contained in:
Philipp Wolfer 2025-05-22 08:48:37 +02:00
parent 536fae6a46
commit 3b545a0fd6
No known key found for this signature in database
GPG key ID: 8FDF744D4919943B
3 changed files with 37 additions and 16 deletions

View file

@ -16,6 +16,7 @@ Scotty. If not, see <https://www.gnu.org/licenses/>.
package backends package backends
import ( import (
"context"
"sync" "sync"
"time" "time"
@ -24,7 +25,7 @@ import (
type ExportProcessor[T models.ListensResult | models.LovesResult] interface { type ExportProcessor[T models.ListensResult | models.LovesResult] interface {
ExportBackend() models.Backend ExportBackend() models.Backend
Process(wg *sync.WaitGroup, oldestTimestamp time.Time, results chan T, progress chan models.TransferProgress) Process(ctx context.Context, wg *sync.WaitGroup, oldestTimestamp time.Time, results chan T, progress chan models.TransferProgress)
} }
type ListensExportProcessor struct { type ListensExportProcessor struct {
@ -35,7 +36,7 @@ func (p ListensExportProcessor) ExportBackend() models.Backend {
return p.Backend return p.Backend
} }
func (p ListensExportProcessor) Process(wg *sync.WaitGroup, oldestTimestamp time.Time, results chan models.ListensResult, progress chan models.TransferProgress) { func (p ListensExportProcessor) Process(ctx context.Context, wg *sync.WaitGroup, oldestTimestamp time.Time, results chan models.ListensResult, progress chan models.TransferProgress) {
wg.Add(1) wg.Add(1)
defer wg.Done() defer wg.Done()
defer close(results) defer close(results)
@ -50,7 +51,7 @@ func (p LovesExportProcessor) ExportBackend() models.Backend {
return p.Backend return p.Backend
} }
func (p LovesExportProcessor) Process(wg *sync.WaitGroup, oldestTimestamp time.Time, results chan models.LovesResult, progress chan models.TransferProgress) { func (p LovesExportProcessor) Process(ctx context.Context, wg *sync.WaitGroup, oldestTimestamp time.Time, results chan models.LovesResult, progress chan models.TransferProgress) {
wg.Add(1) wg.Add(1)
defer wg.Done() defer wg.Done()
defer close(results) defer close(results)

View file

@ -18,6 +18,7 @@ Scotty. If not, see <https://www.gnu.org/licenses/>.
package backends package backends
import ( import (
"context"
"sync" "sync"
"go.uploadedlobster.com/scotty/internal/models" "go.uploadedlobster.com/scotty/internal/models"
@ -25,7 +26,7 @@ import (
type ImportProcessor[T models.ListensResult | models.LovesResult] interface { type ImportProcessor[T models.ListensResult | models.LovesResult] interface {
ImportBackend() models.ImportBackend ImportBackend() models.ImportBackend
Process(wg *sync.WaitGroup, results chan T, out chan models.ImportResult, progress chan models.TransferProgress) Process(ctx context.Context, wg *sync.WaitGroup, results chan T, out chan models.ImportResult, progress chan models.TransferProgress)
Import(export T, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) Import(export T, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error)
} }
@ -37,8 +38,8 @@ func (p ListensImportProcessor) ImportBackend() models.ImportBackend {
return p.Backend return p.Backend
} }
func (p ListensImportProcessor) Process(wg *sync.WaitGroup, results chan models.ListensResult, out chan models.ImportResult, progress chan models.TransferProgress) { func (p ListensImportProcessor) Process(ctx context.Context, wg *sync.WaitGroup, results chan models.ListensResult, out chan models.ImportResult, progress chan models.TransferProgress) {
process(wg, p, results, out, progress) process(ctx, wg, p, results, out, progress)
} }
func (p ListensImportProcessor) Import(export models.ListensResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { func (p ListensImportProcessor) Import(export models.ListensResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
@ -66,8 +67,8 @@ func (p LovesImportProcessor) ImportBackend() models.ImportBackend {
return p.Backend return p.Backend
} }
func (p LovesImportProcessor) Process(wg *sync.WaitGroup, results chan models.LovesResult, out chan models.ImportResult, progress chan models.TransferProgress) { func (p LovesImportProcessor) Process(ctx context.Context, wg *sync.WaitGroup, results chan models.LovesResult, out chan models.ImportResult, progress chan models.TransferProgress) {
process(wg, p, results, out, progress) process(ctx, wg, p, results, out, progress)
} }
func (p LovesImportProcessor) Import(export models.LovesResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { func (p LovesImportProcessor) Import(export models.LovesResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
@ -87,7 +88,12 @@ func (p LovesImportProcessor) Import(export models.LovesResult, result models.Im
return importResult, nil return importResult, nil
} }
func process[R models.LovesResult | models.ListensResult, P ImportProcessor[R]](wg *sync.WaitGroup, processor P, results chan R, out chan models.ImportResult, progress chan models.TransferProgress) { func process[R models.LovesResult | models.ListensResult, P ImportProcessor[R]](
ctx context.Context, wg *sync.WaitGroup,
processor P, results chan R,
out chan models.ImportResult,
progress chan models.TransferProgress,
) {
wg.Add(1) wg.Add(1)
defer wg.Done() defer wg.Done()
defer close(out) defer close(out)
@ -100,14 +106,21 @@ func process[R models.LovesResult | models.ListensResult, P ImportProcessor[R]](
} }
for exportResult := range results { for exportResult := range results {
importResult, err := processor.Import(exportResult, result, out, progress) select {
result.Update(importResult) case <-ctx.Done():
if err != nil {
processor.ImportBackend().FinishImport() processor.ImportBackend().FinishImport()
out <- handleError(result, err, progress) out <- handleError(result, ctx.Err(), progress)
return return
default:
importResult, err := processor.Import(exportResult, result, out, progress)
result.Update(importResult)
if err != nil {
processor.ImportBackend().FinishImport()
out <- handleError(result, err, progress)
return
}
progress <- p.FromImportResult(result, false)
} }
progress <- p.FromImportResult(result, false)
} }
if err := processor.ImportBackend().FinishImport(); err != nil { if err := processor.ImportBackend().FinishImport(); err != nil {

View file

@ -16,6 +16,7 @@ Scotty. If not, see <https://www.gnu.org/licenses/>.
package cli package cli
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"strconv" "strconv"
@ -113,16 +114,22 @@ func (c *TransferCmd[E, I, R]) Transfer(exp backends.ExportProcessor[R], imp bac
progressChan := make(chan models.TransferProgress) progressChan := make(chan models.TransferProgress)
progress := setupProgressBars(progressChan) progress := setupProgressBars(progressChan)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
// Export from source // Export from source
exportChan := make(chan R, 1000) exportChan := make(chan R, 1000)
go exp.Process(wg, timestamp, exportChan, progressChan) go exp.Process(ctx, wg, timestamp, exportChan, progressChan)
// Import into target // Import into target
resultChan := make(chan models.ImportResult) resultChan := make(chan models.ImportResult)
go imp.Process(wg, exportChan, resultChan, progressChan) go imp.Process(ctx, wg, exportChan, resultChan, progressChan)
result := <-resultChan result := <-resultChan
// Once import is done, the context can be cancelled
cancel()
// Wait for all goroutines to finish
wg.Wait() wg.Wait()
progress.close() progress.close()