mirror of
https://git.sr.ht/~phw/scotty
synced 2025-05-31 10:58:35 +02:00
Pass context to import backends
This commit is contained in:
parent
26d9f5e840
commit
4a66e3d432
8 changed files with 71 additions and 42 deletions
|
@ -17,6 +17,7 @@ Scotty. If not, see <https://www.gnu.org/licenses/>.
|
|||
package dump
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.uploadedlobster.com/scotty/internal/config"
|
||||
|
@ -36,27 +37,37 @@ func (b *DumpBackend) InitConfig(config *config.ServiceConfig) error {
|
|||
func (b *DumpBackend) StartImport() error { return nil }
|
||||
func (b *DumpBackend) FinishImport() error { return nil }
|
||||
|
||||
func (b *DumpBackend) ImportListens(export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
func (b *DumpBackend) ImportListens(ctx context.Context, export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
for _, listen := range export.Items {
|
||||
importResult.UpdateTimestamp(listen.ListenedAt)
|
||||
importResult.ImportCount += 1
|
||||
msg := fmt.Sprintf("🎶 %v: \"%v\" by %v (%v)",
|
||||
listen.ListenedAt, listen.TrackName, listen.ArtistName(), listen.RecordingMBID)
|
||||
importResult.Log(models.Info, msg)
|
||||
progress <- models.TransferProgress{}.FromImportResult(importResult, false)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return importResult, ctx.Err()
|
||||
default:
|
||||
importResult.UpdateTimestamp(listen.ListenedAt)
|
||||
importResult.ImportCount += 1
|
||||
msg := fmt.Sprintf("🎶 %v: \"%v\" by %v (%v)",
|
||||
listen.ListenedAt, listen.TrackName, listen.ArtistName(), listen.RecordingMBID)
|
||||
importResult.Log(models.Info, msg)
|
||||
progress <- models.TransferProgress{}.FromImportResult(importResult, false)
|
||||
}
|
||||
}
|
||||
|
||||
return importResult, nil
|
||||
}
|
||||
|
||||
func (b *DumpBackend) ImportLoves(export models.LovesResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
func (b *DumpBackend) ImportLoves(ctx context.Context, export models.LovesResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
for _, love := range export.Items {
|
||||
importResult.UpdateTimestamp(love.Created)
|
||||
importResult.ImportCount += 1
|
||||
msg := fmt.Sprintf("❤️ %v: \"%v\" by %v (%v)",
|
||||
love.Created, love.TrackName, love.ArtistName(), love.RecordingMBID)
|
||||
importResult.Log(models.Info, msg)
|
||||
progress <- models.TransferProgress{}.FromImportResult(importResult, false)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return importResult, ctx.Err()
|
||||
default:
|
||||
importResult.UpdateTimestamp(love.Created)
|
||||
importResult.ImportCount += 1
|
||||
msg := fmt.Sprintf("❤️ %v: \"%v\" by %v (%v)",
|
||||
love.Created, love.TrackName, love.ArtistName(), love.RecordingMBID)
|
||||
importResult.Log(models.Info, msg)
|
||||
progress <- models.TransferProgress{}.FromImportResult(importResult, false)
|
||||
}
|
||||
}
|
||||
|
||||
return importResult, nil
|
||||
|
|
|
@ -27,7 +27,7 @@ import (
|
|||
type ImportProcessor[T models.ListensResult | models.LovesResult] interface {
|
||||
ImportBackend() models.ImportBackend
|
||||
Process(ctx context.Context, wg *sync.WaitGroup, results chan T, out chan models.ImportResult, progress chan models.TransferProgress)
|
||||
Import(export T, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error)
|
||||
Import(ctx context.Context, export T, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error)
|
||||
}
|
||||
|
||||
type ListensImportProcessor struct {
|
||||
|
@ -42,7 +42,7 @@ func (p ListensImportProcessor) Process(ctx context.Context, wg *sync.WaitGroup,
|
|||
process(ctx, wg, p, results, out, progress)
|
||||
}
|
||||
|
||||
func (p ListensImportProcessor) Import(export models.ListensResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
func (p ListensImportProcessor) Import(ctx context.Context, export models.ListensResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
if export.Error != nil {
|
||||
return result, export.Error
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ func (p ListensImportProcessor) Import(export models.ListensResult, result model
|
|||
} else {
|
||||
result.TotalCount += len(export.Items)
|
||||
}
|
||||
importResult, err := p.Backend.ImportListens(export, result, progress)
|
||||
importResult, err := p.Backend.ImportListens(ctx, export, result, progress)
|
||||
if err != nil {
|
||||
return importResult, err
|
||||
}
|
||||
|
@ -71,7 +71,7 @@ func (p LovesImportProcessor) Process(ctx context.Context, wg *sync.WaitGroup, r
|
|||
process(ctx, wg, p, results, out, progress)
|
||||
}
|
||||
|
||||
func (p LovesImportProcessor) Import(export models.LovesResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
func (p LovesImportProcessor) Import(ctx context.Context, export models.LovesResult, result models.ImportResult, out chan models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
if export.Error != nil {
|
||||
return result, export.Error
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ func (p LovesImportProcessor) Import(export models.LovesResult, result models.Im
|
|||
} else {
|
||||
result.TotalCount += len(export.Items)
|
||||
}
|
||||
importResult, err := p.Backend.ImportLoves(export, result, progress)
|
||||
importResult, err := p.Backend.ImportLoves(ctx, export, result, progress)
|
||||
if err != nil {
|
||||
return importResult, err
|
||||
}
|
||||
|
@ -112,7 +112,7 @@ func process[R models.LovesResult | models.ListensResult, P ImportProcessor[R]](
|
|||
out <- handleError(result, ctx.Err(), progress)
|
||||
return
|
||||
default:
|
||||
importResult, err := processor.Import(exportResult, result, out, progress)
|
||||
importResult, err := processor.Import(ctx, exportResult, result, out, progress)
|
||||
result.Update(importResult)
|
||||
if err != nil {
|
||||
processor.ImportBackend().FinishImport()
|
||||
|
|
|
@ -121,12 +121,17 @@ func (b *JSPFBackend) ExportListens(ctx context.Context, oldestTimestamp time.Ti
|
|||
results <- models.ListensResult{Items: listens}
|
||||
}
|
||||
|
||||
func (b *JSPFBackend) ImportListens(export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
func (b *JSPFBackend) ImportListens(ctx context.Context, export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
for _, listen := range export.Items {
|
||||
track := listenAsTrack(listen)
|
||||
b.playlist.Tracks = append(b.playlist.Tracks, track)
|
||||
importResult.ImportCount += 1
|
||||
importResult.UpdateTimestamp(listen.ListenedAt)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return importResult, ctx.Err()
|
||||
default:
|
||||
track := listenAsTrack(listen)
|
||||
b.playlist.Tracks = append(b.playlist.Tracks, track)
|
||||
importResult.ImportCount += 1
|
||||
importResult.UpdateTimestamp(listen.ListenedAt)
|
||||
}
|
||||
}
|
||||
|
||||
progress <- models.TransferProgress{}.FromImportResult(importResult, false)
|
||||
|
@ -160,12 +165,17 @@ func (b *JSPFBackend) ExportLoves(ctx context.Context, oldestTimestamp time.Time
|
|||
results <- models.LovesResult{Items: loves}
|
||||
}
|
||||
|
||||
func (b *JSPFBackend) ImportLoves(export models.LovesResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
func (b *JSPFBackend) ImportLoves(ctx context.Context, export models.LovesResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
for _, love := range export.Items {
|
||||
track := loveAsTrack(love)
|
||||
b.playlist.Tracks = append(b.playlist.Tracks, track)
|
||||
importResult.ImportCount += 1
|
||||
importResult.UpdateTimestamp(love.Created)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return importResult, ctx.Err()
|
||||
default:
|
||||
track := loveAsTrack(love)
|
||||
b.playlist.Tracks = append(b.playlist.Tracks, track)
|
||||
importResult.ImportCount += 1
|
||||
importResult.UpdateTimestamp(love.Created)
|
||||
}
|
||||
}
|
||||
|
||||
progress <- models.TransferProgress{}.FromImportResult(importResult, false)
|
||||
|
|
|
@ -192,9 +192,15 @@ out:
|
|||
progress <- p
|
||||
}
|
||||
|
||||
func (b *LastfmApiBackend) ImportListens(export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
func (b *LastfmApiBackend) ImportListens(ctx context.Context, export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
total := len(export.Items)
|
||||
for i := 0; i < total; i += MaxListensPerSubmission {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return importResult, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
listens := export.Items[i:min(i+MaxListensPerSubmission, total)]
|
||||
count := len(listens)
|
||||
if count == 0 {
|
||||
|
@ -354,8 +360,14 @@ out:
|
|||
results <- models.LovesResult{Items: loves, Total: totalCount}
|
||||
}
|
||||
|
||||
func (b *LastfmApiBackend) ImportLoves(export models.LovesResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
func (b *LastfmApiBackend) ImportLoves(ctx context.Context, export models.LovesResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
for _, love := range export.Items {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return importResult, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
err := b.client.Track.Love(lastfm.P{
|
||||
"track": love.TrackName,
|
||||
"artist": love.ArtistName(),
|
||||
|
|
|
@ -135,8 +135,7 @@ func (b *ListenBrainzApiBackend) ExportListens(ctx context.Context, oldestTimest
|
|||
progress <- p
|
||||
}
|
||||
|
||||
func (b *ListenBrainzApiBackend) ImportListens(export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
ctx := context.TODO()
|
||||
func (b *ListenBrainzApiBackend) ImportListens(ctx context.Context, export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
total := len(export.Items)
|
||||
p := models.TransferProgress{}.FromImportResult(importResult, false)
|
||||
for i := 0; i < total; i += MaxListensPerRequest {
|
||||
|
@ -273,8 +272,7 @@ out:
|
|||
}
|
||||
}
|
||||
|
||||
func (b *ListenBrainzApiBackend) ImportLoves(export models.LovesResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
ctx := context.TODO()
|
||||
func (b *ListenBrainzApiBackend) ImportLoves(ctx context.Context, export models.LovesResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
if len(b.existingMBIDs) == 0 {
|
||||
existingLovesChan := make(chan models.LovesResult)
|
||||
go b.exportLoves(ctx, time.Unix(0, 0), existingLovesChan)
|
||||
|
|
|
@ -112,9 +112,7 @@ out:
|
|||
results <- models.ListensResult{Items: listens}
|
||||
}
|
||||
|
||||
func (b *MalojaApiBackend) ImportListens(export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
ctx := context.TODO()
|
||||
|
||||
func (b *MalojaApiBackend) ImportListens(ctx context.Context, export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
p := models.TransferProgress{}.FromImportResult(importResult, false)
|
||||
for _, listen := range export.Items {
|
||||
scrobble := NewScrobble{
|
||||
|
|
|
@ -167,7 +167,7 @@ func (b *ScrobblerLogBackend) ExportListens(ctx context.Context, oldestTimestamp
|
|||
results <- models.ListensResult{Items: listens}
|
||||
}
|
||||
|
||||
func (b *ScrobblerLogBackend) ImportListens(export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
func (b *ScrobblerLogBackend) ImportListens(ctx context.Context, export models.ListensResult, importResult models.ImportResult, progress chan models.TransferProgress) (models.ImportResult, error) {
|
||||
records := make([]scrobblerlog.Record, len(export.Items))
|
||||
for i, listen := range export.Items {
|
||||
records[i] = listenToRecord(listen)
|
||||
|
|
|
@ -64,7 +64,7 @@ type ListensImport interface {
|
|||
ImportBackend
|
||||
|
||||
// Imports the given list of listens.
|
||||
ImportListens(export ListensResult, importResult ImportResult, progress chan TransferProgress) (ImportResult, error)
|
||||
ImportListens(ctx context.Context, export ListensResult, importResult ImportResult, progress chan TransferProgress) (ImportResult, error)
|
||||
}
|
||||
|
||||
// Must be implemented by services supporting the export of loves.
|
||||
|
@ -82,5 +82,5 @@ type LovesImport interface {
|
|||
ImportBackend
|
||||
|
||||
// Imports the given list of loves.
|
||||
ImportLoves(export LovesResult, importResult ImportResult, progress chan TransferProgress) (ImportResult, error)
|
||||
ImportLoves(ctx context.Context, export LovesResult, importResult ImportResult, progress chan TransferProgress) (ImportResult, error)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue