From 4a66e3d43285ecf4179eb146f3346122b18e99e3 Mon Sep 17 00:00:00 2001 From: Philipp Wolfer Date: Thu, 22 May 2025 11:20:09 +0200 Subject: [PATCH] Pass context to import backends --- internal/backends/dump/dump.go | 39 ++++++++++++------- internal/backends/import.go | 12 +++--- internal/backends/jspf/jspf.go | 30 +++++++++----- internal/backends/lastfm/lastfm.go | 16 +++++++- .../backends/listenbrainz/listenbrainz.go | 6 +-- internal/backends/maloja/maloja.go | 4 +- .../backends/scrobblerlog/scrobblerlog.go | 2 +- internal/models/interfaces.go | 4 +- 8 files changed, 71 insertions(+), 42 deletions(-) diff --git a/internal/backends/dump/dump.go b/internal/backends/dump/dump.go index add8711..14583f6 100644 --- a/internal/backends/dump/dump.go +++ b/internal/backends/dump/dump.go @@ -17,6 +17,7 @@ Scotty. If not, see . package dump import ( + "context" "fmt" "go.uploadedlobster.com/scotty/internal/config" @@ -36,27 +37,37 @@ func (b *DumpBackend) InitConfig(config *config.ServiceConfig) error { func (b *DumpBackend) StartImport() error { return nil } func (b *DumpBackend) FinishImport() error { return nil } -func (b *DumpBackend) ImportListens(export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { +func (b *DumpBackend) ImportListens(ctx context.Context, export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { for _, listen := range export.Items { - importResult.UpdateTimestamp(listen.ListenedAt) - importResult.ImportCount += 1 - msg := fmt.Sprintf("🎶 %v: \"%v\" by %v (%v)", - listen.ListenedAt, listen.TrackName, listen.ArtistName(), listen.RecordingMBID) - importResult.Log(models.Info, msg) - progress <- models.TransferProgress{}.FromImportResult(importResult, false) + select { + case <-ctx.Done(): + return importResult, ctx.Err() + default: + importResult.UpdateTimestamp(listen.ListenedAt) + importResult.ImportCount += 1 + msg := fmt.Sprintf("🎶 %v: \"%v\" by %v (%v)", + listen.ListenedAt, listen.TrackName, listen.ArtistName(), listen.RecordingMBID) + importResult.Log(models.Info, msg) + progress <- models.TransferProgress{}.FromImportResult(importResult, false) + } } return importResult, nil } -func (b *DumpBackend) ImportLoves(export models.LovesResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { +func (b *DumpBackend) ImportLoves(ctx context.Context, export models.LovesResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { for _, love := range export.Items { - importResult.UpdateTimestamp(love.Created) - importResult.ImportCount += 1 - msg := fmt.Sprintf("❤️ %v: \"%v\" by %v (%v)", - love.Created, love.TrackName, love.ArtistName(), love.RecordingMBID) - importResult.Log(models.Info, msg) - progress <- models.TransferProgress{}.FromImportResult(importResult, false) + select { + case <-ctx.Done(): + return importResult, ctx.Err() + default: + importResult.UpdateTimestamp(love.Created) + importResult.ImportCount += 1 + msg := fmt.Sprintf("❤️ %v: \"%v\" by %v (%v)", + love.Created, love.TrackName, love.ArtistName(), love.RecordingMBID) + importResult.Log(models.Info, msg) + progress <- models.TransferProgress{}.FromImportResult(importResult, false) + } } return importResult, nil diff --git a/internal/backends/import.go b/internal/backends/import.go index e7006bd..3d77b44 100644 --- a/internal/backends/import.go +++ b/internal/backends/import.go @@ -27,7 +27,7 @@ import ( type ImportProcessor[T models.ListensResult | models.LovesResult] interface { ImportBackend() models.ImportBackend Process(ctx context.Context, wg *sync.WaitGroup, results chan T, out chan models.ImportResult, progress chan models.TransferProgress) - Import(export T, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) + Import(ctx context.Context, export T, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) } type ListensImportProcessor struct { @@ -42,7 +42,7 @@ func (p ListensImportProcessor) Process(ctx context.Context, wg *sync.WaitGroup, process(ctx, wg, p, results, out, progress) } -func (p ListensImportProcessor) Import(export models.ListensResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { +func (p ListensImportProcessor) Import(ctx context.Context, export models.ListensResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { if export.Error != nil { return result, export.Error } @@ -52,7 +52,7 @@ func (p ListensImportProcessor) Import(export models.ListensResult, result model } else { result.TotalCount += len(export.Items) } - importResult, err := p.Backend.ImportListens(export, result, progress) + importResult, err := p.Backend.ImportListens(ctx, export, result, progress) if err != nil { return importResult, err } @@ -71,7 +71,7 @@ func (p LovesImportProcessor) Process(ctx context.Context, wg *sync.WaitGroup, r process(ctx, wg, p, results, out, progress) } -func (p LovesImportProcessor) Import(export models.LovesResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { +func (p LovesImportProcessor) Import(ctx context.Context, export models.LovesResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { if export.Error != nil { return result, export.Error } @@ -81,7 +81,7 @@ func (p LovesImportProcessor) Import(export models.LovesResult, result models.Im } else { result.TotalCount += len(export.Items) } - importResult, err := p.Backend.ImportLoves(export, result, progress) + importResult, err := p.Backend.ImportLoves(ctx, export, result, progress) if err != nil { return importResult, err } @@ -112,7 +112,7 @@ func process[R models.LovesResult | models.ListensResult, P ImportProcessor[R]]( out <- handleError(result, ctx.Err(), progress) return default: - importResult, err := processor.Import(exportResult, result, out, progress) + importResult, err := processor.Import(ctx, exportResult, result, out, progress) result.Update(importResult) if err != nil { processor.ImportBackend().FinishImport() diff --git a/internal/backends/jspf/jspf.go b/internal/backends/jspf/jspf.go index 77fed4b..a8a1929 100644 --- a/internal/backends/jspf/jspf.go +++ b/internal/backends/jspf/jspf.go @@ -121,12 +121,17 @@ func (b *JSPFBackend) ExportListens(ctx context.Context, oldestTimestamp time.Ti results <- models.ListensResult{Items: listens} } -func (b *JSPFBackend) ImportListens(export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { +func (b *JSPFBackend) ImportListens(ctx context.Context, export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { for _, listen := range export.Items { - track := listenAsTrack(listen) - b.playlist.Tracks = append(b.playlist.Tracks, track) - importResult.ImportCount += 1 - importResult.UpdateTimestamp(listen.ListenedAt) + select { + case <-ctx.Done(): + return importResult, ctx.Err() + default: + track := listenAsTrack(listen) + b.playlist.Tracks = append(b.playlist.Tracks, track) + importResult.ImportCount += 1 + importResult.UpdateTimestamp(listen.ListenedAt) + } } progress <- models.TransferProgress{}.FromImportResult(importResult, false) @@ -160,12 +165,17 @@ func (b *JSPFBackend) ExportLoves(ctx context.Context, oldestTimestamp time.Time results <- models.LovesResult{Items: loves} } -func (b *JSPFBackend) ImportLoves(export models.LovesResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { +func (b *JSPFBackend) ImportLoves(ctx context.Context, export models.LovesResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { for _, love := range export.Items { - track := loveAsTrack(love) - b.playlist.Tracks = append(b.playlist.Tracks, track) - importResult.ImportCount += 1 - importResult.UpdateTimestamp(love.Created) + select { + case <-ctx.Done(): + return importResult, ctx.Err() + default: + track := loveAsTrack(love) + b.playlist.Tracks = append(b.playlist.Tracks, track) + importResult.ImportCount += 1 + importResult.UpdateTimestamp(love.Created) + } } progress <- models.TransferProgress{}.FromImportResult(importResult, false) diff --git a/internal/backends/lastfm/lastfm.go b/internal/backends/lastfm/lastfm.go index 3de75d1..afc1fa3 100644 --- a/internal/backends/lastfm/lastfm.go +++ b/internal/backends/lastfm/lastfm.go @@ -192,9 +192,15 @@ out: progress <- p } -func (b *LastfmApiBackend) ImportListens(export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { +func (b *LastfmApiBackend) ImportListens(ctx context.Context, export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { total := len(export.Items) for i := 0; i < total; i += MaxListensPerSubmission { + select { + case <-ctx.Done(): + return importResult, ctx.Err() + default: + } + listens := export.Items[i:min(i+MaxListensPerSubmission, total)] count := len(listens) if count == 0 { @@ -354,8 +360,14 @@ out: results <- models.LovesResult{Items: loves, Total: totalCount} } -func (b *LastfmApiBackend) ImportLoves(export models.LovesResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { +func (b *LastfmApiBackend) ImportLoves(ctx context.Context, export models.LovesResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { for _, love := range export.Items { + select { + case <-ctx.Done(): + return importResult, ctx.Err() + default: + } + err := b.client.Track.Love(lastfm.P{ "track": love.TrackName, "artist": love.ArtistName(), diff --git a/internal/backends/listenbrainz/listenbrainz.go b/internal/backends/listenbrainz/listenbrainz.go index ca1c0f0..bf46c22 100644 --- a/internal/backends/listenbrainz/listenbrainz.go +++ b/internal/backends/listenbrainz/listenbrainz.go @@ -135,8 +135,7 @@ func (b *ListenBrainzApiBackend) ExportListens(ctx context.Context, oldestTimest progress <- p } -func (b *ListenBrainzApiBackend) ImportListens(export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { - ctx := context.TODO() +func (b *ListenBrainzApiBackend) ImportListens(ctx context.Context, export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { total := len(export.Items) p := models.TransferProgress{}.FromImportResult(importResult, false) for i := 0; i < total; i += MaxListensPerRequest { @@ -273,8 +272,7 @@ out: } } -func (b *ListenBrainzApiBackend) ImportLoves(export models.LovesResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { - ctx := context.TODO() +func (b *ListenBrainzApiBackend) ImportLoves(ctx context.Context, export models.LovesResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { if len(b.existingMBIDs) == 0 { existingLovesChan := make(chan models.LovesResult) go b.exportLoves(ctx, time.Unix(0, 0), existingLovesChan) diff --git a/internal/backends/maloja/maloja.go b/internal/backends/maloja/maloja.go index 8642924..f082d9b 100644 --- a/internal/backends/maloja/maloja.go +++ b/internal/backends/maloja/maloja.go @@ -112,9 +112,7 @@ out: results <- models.ListensResult{Items: listens} } -func (b *MalojaApiBackend) ImportListens(export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { - ctx := context.TODO() - +func (b *MalojaApiBackend) ImportListens(ctx context.Context, export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { p := models.TransferProgress{}.FromImportResult(importResult, false) for _, listen := range export.Items { scrobble := NewScrobble{ diff --git a/internal/backends/scrobblerlog/scrobblerlog.go b/internal/backends/scrobblerlog/scrobblerlog.go index c7eb636..6d331ce 100644 --- a/internal/backends/scrobblerlog/scrobblerlog.go +++ b/internal/backends/scrobblerlog/scrobblerlog.go @@ -167,7 +167,7 @@ func (b *ScrobblerLogBackend) ExportListens(ctx context.Context, oldestTimestamp results <- models.ListensResult{Items: listens} } -func (b *ScrobblerLogBackend) ImportListens(export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { +func (b *ScrobblerLogBackend) ImportListens(ctx context.Context, export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) { records := make([]scrobblerlog.Record, len(export.Items)) for i, listen := range export.Items { records[i] = listenToRecord(listen) diff --git a/internal/models/interfaces.go b/internal/models/interfaces.go index 2b45d18..2f4beaf 100644 --- a/internal/models/interfaces.go +++ b/internal/models/interfaces.go @@ -64,7 +64,7 @@ type ListensImport interface { ImportBackend // Imports the given list of listens. - ImportListens(export ListensResult, importResult ImportResult, progress chan TransferProgress) (ImportResult, error) + ImportListens(ctx context.Context, export ListensResult, importResult ImportResult, progress chan TransferProgress) (ImportResult, error) } // Must be implemented by services supporting the export of loves. @@ -82,5 +82,5 @@ type LovesImport interface { ImportBackend // Imports the given list of loves. - ImportLoves(export LovesResult, importResult ImportResult, progress chan TransferProgress) (ImportResult, error) + ImportLoves(ctx context.Context, export LovesResult, importResult ImportResult, progress chan TransferProgress) (ImportResult, error) }