/* Copyright © 2023 Philipp Wolfer Scotty is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Scotty is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Scotty. If not, see . */ package cli import ( "errors" "fmt" "sync" "time" "github.com/spf13/cobra" "github.com/spf13/viper" "go.uploadedlobster.com/scotty/internal/backends" "go.uploadedlobster.com/scotty/internal/models" "go.uploadedlobster.com/scotty/internal/storage" ) func NewTransferCmd[ E models.Backend, I models.ImportBackend, R models.ListensResult | models.LovesResult, ]( cmd *cobra.Command, db *storage.Database, entity string, ) (TransferCmd[E, I, R], error) { c := TransferCmd[E, I, R]{ cmd: cmd, db: db, entity: entity, } err := c.resolveBackends() if err != nil { return c, err } return c, nil } type TransferCmd[E models.Backend, I models.ImportBackend, R models.ListensResult | models.LovesResult] struct { cmd *cobra.Command db *storage.Database entity string sourceName string targetName string ExpBackend E ImpBackend I } func (c *TransferCmd[E, I, R]) resolveBackends() error { sourceConfig := GetServiceConfigFromFlag(c.cmd, "from") if sourceConfig == nil { cobra.CheckErr(errors.New("failed loading service configuration")) } targetConfig := GetServiceConfigFromFlag(c.cmd, "to") if targetConfig == nil { cobra.CheckErr(errors.New("failed loading service configuration")) } // Initialize backends expBackend, err := backends.ResolveBackend[E](sourceConfig) if err != nil { return err } impBackend, err := backends.ResolveBackend[I](targetConfig) if err != nil { return err } c.sourceName = sourceConfig.Name c.targetName = targetConfig.Name c.ExpBackend = expBackend c.ImpBackend = impBackend return nil } func (c *TransferCmd[E, I, R]) Transfer(exp backends.ExportProcessor[R], imp backends.ImportProcessor[R]) error { fmt.Printf("Transferring %s from %s to %s...\n", c.entity, c.sourceName, c.targetName) // Authenticate backends, if needed config := viper.GetViper() _, err := backends.Authenticate(c.sourceName, c.ExpBackend, *c.db, config) if err != nil { return err } _, err = backends.Authenticate(c.targetName, c.ImpBackend, *c.db, config) if err != nil { return err } // Read timestamp timestamp, err := c.timestamp() if err != nil { return err } fmt.Printf("From timestamp: %v (%v)\n", timestamp, timestamp.Unix()) // Prepare progress bars exportProgress := make(chan models.Progress) importProgress := make(chan models.Progress) var wg sync.WaitGroup progress := progressBar(&wg, exportProgress, importProgress) // Export from source exportChan := make(chan R, 1000) go exp.Process(timestamp, exportChan, exportProgress) // Import into target resultChan := make(chan models.ImportResult) go imp.Process(exportChan, resultChan, importProgress) result := <-resultChan close(exportProgress) wg.Wait() progress.Wait() if result.Error != nil { fmt.Printf("Import failed, last reported timestamp was %v (%v)\n", result.LastTimestamp, result.LastTimestamp.Unix()) return result.Error } fmt.Printf("Imported %v of %v %s into %v.\n", result.ImportCount, result.TotalCount, c.entity, c.targetName) // Update timestamp err = c.updateTimestamp(result, timestamp) if err != nil { return err } // Print errors if len(result.ImportErrors) > 0 { fmt.Printf("\nDuring the import the following errors occurred:\n") for _, err := range result.ImportErrors { fmt.Printf("Error: %v\n", err) } } return nil } func (c *TransferCmd[E, I, R]) timestamp() (time.Time, error) { timestamp := time.Unix(getInt64FromFlag(c.cmd, "timestamp"), 0) if timestamp == time.Unix(0, 0) { timestamp, err := c.db.GetImportTimestamp(c.sourceName, c.targetName, c.entity) return timestamp, err } return timestamp, nil } func (c *TransferCmd[E, I, R]) updateTimestamp(result models.ImportResult, oldTimestamp time.Time) error { if result.LastTimestamp.Unix() < oldTimestamp.Unix() { result.LastTimestamp = oldTimestamp } fmt.Printf("Latest timestamp: %v (%v)\n", result.LastTimestamp, result.LastTimestamp.Unix()) err := c.db.SetImportTimestamp(c.sourceName, c.targetName, c.entity, result.LastTimestamp) return err }