Transfer command refactoring

Make transfer command processing even more modular
This commit is contained in:
Philipp Wolfer 2023-12-05 08:10:38 +01:00
parent 7c85ba05ab
commit a9e07915ce
No known key found for this signature in database
GPG key ID: 8FDF744D4919943B
7 changed files with 236 additions and 179 deletions

44
internal/cli/common.go Normal file
View file

@ -0,0 +1,44 @@
/*
Copyright © 2023 Philipp Wolfer <phw@uploadedlobster.com>
Scotty is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later version.
Scotty is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
Scotty. If not, see <https://www.gnu.org/licenses/>.
*/
package cli
import (
"fmt"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
func GetConfigFromFlag(cmd *cobra.Command, flagName string) (string, *viper.Viper) {
configName := cmd.Flag(flagName).Value.String()
var config *viper.Viper
servicesConfig := viper.Sub("service")
if servicesConfig != nil {
config = servicesConfig.Sub(configName)
}
if config == nil {
cobra.CheckErr(fmt.Sprintf("Invalid source configuration \"%s\"", configName))
}
return configName, config
}
func getInt64FromFlag(cmd *cobra.Command, flagName string) (result int64) {
result, err := cmd.Flags().GetInt64(flagName)
if err != nil {
result = 0
}
return
}

79
internal/cli/progress.go Normal file
View file

@ -0,0 +1,79 @@
/*
Copyright © 2023 Philipp Wolfer <phw@uploadedlobster.com>
This file is part of Scotty.
Scotty is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later version.
Scotty is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
Scotty. If not, see <https://www.gnu.org/licenses/>.
*/
package cli
import (
"sync"
"time"
"github.com/fatih/color"
"github.com/vbauerster/mpb/v8"
"github.com/vbauerster/mpb/v8/decor"
"go.uploadedlobster.com/scotty/internal/models"
)
func progressBar(wg *sync.WaitGroup, exportProgress chan models.Progress, importProgress chan models.Progress) *mpb.Progress {
p := mpb.New(
mpb.WithWaitGroup(wg),
mpb.WithOutput(color.Output),
// mpb.WithWidth(64),
mpb.WithAutoRefresh(),
)
exportBar := setupProgressBar(p, "exporting")
importBar := setupProgressBar(p, "importing")
go updateProgressBar(exportBar, wg, exportProgress)
go updateProgressBar(importBar, wg, importProgress)
return p
}
func setupProgressBar(p *mpb.Progress, name string) *mpb.Bar {
green := color.New(color.FgGreen).SprintFunc()
return p.New(0,
mpb.BarStyle(),
mpb.PrependDecorators(
decor.Name(" "),
decor.OnComplete(
decor.Spinner(nil, decor.WC{W: 2, C: decor.DidentRight}),
green("✓ "),
),
decor.Name(name, decor.WCSyncWidthR),
),
mpb.AppendDecorators(
decor.OnComplete(
decor.EwmaETA(decor.ET_STYLE_GO, 0, decor.WC{C: decor.DSyncWidth}),
"done",
),
// decor.OnComplete(decor.Percentage(decor.WC{W: 5, C: decor.DSyncWidthR}), "done"),
decor.Name(" "),
),
)
}
func updateProgressBar(bar *mpb.Bar, wg *sync.WaitGroup, progressChan chan models.Progress) {
wg.Add(1)
defer wg.Done()
lastIterTime := time.Now()
for progress := range progressChan {
oldIterTime := lastIterTime
lastIterTime = time.Now()
bar.EwmaSetCurrent(progress.Elapsed, lastIterTime.Sub(oldIterTime))
bar.SetTotal(progress.Total, progress.Completed)
}
}

161
internal/cli/transfer.go Normal file
View file

@ -0,0 +1,161 @@
/*
Copyright © 2023 Philipp Wolfer <phw@uploadedlobster.com>
Scotty is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later version.
Scotty is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
Scotty. If not, see <https://www.gnu.org/licenses/>.
*/
package cli
import (
"fmt"
"sync"
"time"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uploadedlobster.com/scotty/internal/backends"
"go.uploadedlobster.com/scotty/internal/models"
"go.uploadedlobster.com/scotty/internal/storage"
)
func NewTransferCmd[
E models.Backend,
I models.ImportBackend,
R models.ListensResult | models.LovesResult,
](
cmd *cobra.Command,
db *storage.Database,
entity string,
) (TransferCmd[E, I, R], error) {
c := TransferCmd[E, I, R]{
cmd: cmd,
db: db,
entity: entity,
}
err := c.resolveBackends()
if err != nil {
return c, err
}
return c, nil
}
type TransferCmd[E models.Backend, I models.ImportBackend, R models.ListensResult | models.LovesResult] struct {
cmd *cobra.Command
db *storage.Database
entity string
sourceName string
targetName string
ExpBackend E
ImpBackend I
}
func (c *TransferCmd[E, I, R]) resolveBackends() error {
sourceName, sourceConfig := GetConfigFromFlag(c.cmd, "from")
targetName, targetConfig := GetConfigFromFlag(c.cmd, "to")
// Initialize backends
expBackend, err := backends.ResolveBackend[E](sourceConfig)
if err != nil {
return err
}
impBackend, err := backends.ResolveBackend[I](targetConfig)
if err != nil {
return err
}
c.sourceName = sourceName
c.targetName = targetName
c.ExpBackend = expBackend
c.ImpBackend = impBackend
return nil
}
func (c *TransferCmd[E, I, R]) Transfer(exp backends.ExportProcessor[R], imp backends.ImportProcessor[R]) error {
fmt.Printf("Transferring %s from %s to %s...\n", c.entity, c.sourceName, c.targetName)
// Authenticate backends, if needed
config := viper.GetViper()
_, err := backends.Authenticate(c.sourceName, c.ExpBackend, *c.db, config)
if err != nil {
return err
}
_, err = backends.Authenticate(c.targetName, c.ImpBackend, *c.db, config)
if err != nil {
return err
}
// Read timestamp
timestamp, err := c.timestamp()
if err != nil {
return err
}
fmt.Printf("From timestamp: %v (%v)\n", timestamp, timestamp.Unix())
// Prepare progress bars
exportProgress := make(chan models.Progress)
importProgress := make(chan models.Progress)
var wg sync.WaitGroup
progress := progressBar(&wg, exportProgress, importProgress)
// Export from source
exportChan := make(chan R, 1000)
go exp.Process(timestamp, exportChan, exportProgress)
// Import into target
resultChan := make(chan models.ImportResult)
go imp.Process(exportChan, resultChan, importProgress)
result := <-resultChan
close(exportProgress)
wg.Wait()
progress.Wait()
if result.Error != nil {
fmt.Printf("Import failed, last reported timestamp was %v (%v)\n", result.LastTimestamp, result.LastTimestamp.Unix())
return result.Error
}
fmt.Printf("Imported %v of %v %s into %v.\n",
result.ImportCount, result.TotalCount, c.entity, c.targetName)
// Update timestamp
err = c.updateTimestamp(result, timestamp)
if err != nil {
return err
}
// Print errors
if len(result.ImportErrors) > 0 {
fmt.Printf("\nDuring the import the following errors occurred:\n")
for _, err := range result.ImportErrors {
fmt.Printf("Error: %v\n", err)
}
}
return nil
}
func (c *TransferCmd[E, I, R]) timestamp() (time.Time, error) {
timestamp := time.Unix(getInt64FromFlag(c.cmd, "timestamp"), 0)
if timestamp == time.Unix(0, 0) {
timestamp, err := c.db.GetImportTimestamp(c.sourceName, c.targetName, c.entity)
return timestamp, err
}
return timestamp, nil
}
func (c *TransferCmd[E, I, R]) updateTimestamp(result models.ImportResult, oldTimestamp time.Time) error {
if result.LastTimestamp.Unix() < oldTimestamp.Unix() {
result.LastTimestamp = oldTimestamp
}
fmt.Printf("Latest timestamp: %v (%v)\n", result.LastTimestamp, result.LastTimestamp.Unix())
err := c.db.SetImportTimestamp(c.sourceName, c.targetName, c.entity, result.LastTimestamp)
return err
}