mirror of
https://git.sr.ht/~phw/scotty
synced 2025-04-16 01:59:29 +02:00
refactor: generic common cmd processing
This commit is contained in:
parent
a87686af57
commit
7c85ba05ab
5 changed files with 182 additions and 149 deletions
116
cmd/common.go
116
cmd/common.go
|
@ -18,9 +18,14 @@ package cmd
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
"go.uploadedlobster.com/scotty/internal/backends"
|
||||
"go.uploadedlobster.com/scotty/internal/models"
|
||||
"go.uploadedlobster.com/scotty/internal/storage"
|
||||
)
|
||||
|
||||
func getConfigFromFlag(cmd *cobra.Command, flagName string) (string, *viper.Viper) {
|
||||
|
@ -43,3 +48,114 @@ func getInt64FromFlag(cmd *cobra.Command, flagName string) (result int64) {
|
|||
}
|
||||
return
|
||||
}
|
||||
|
||||
type backendInfo[T models.Backend, R models.ListensResult | models.LovesResult] struct {
|
||||
configName string
|
||||
backend T
|
||||
}
|
||||
|
||||
type exportBackendInfo[T models.Backend, R models.ListensResult | models.LovesResult] struct {
|
||||
backendInfo[T, R]
|
||||
processor backends.ExportProcessor[R]
|
||||
}
|
||||
|
||||
type importBackendInfo[T models.Backend, R models.ListensResult | models.LovesResult] struct {
|
||||
backendInfo[T, R]
|
||||
processor backends.ImportProcessor[R]
|
||||
}
|
||||
|
||||
func resolveBackends[E models.Backend, I models.ImportBackend, R models.ListensResult | models.LovesResult](cmd *cobra.Command) (*exportBackendInfo[E, R], *importBackendInfo[I, R], error) {
|
||||
sourceName, sourceConfig := getConfigFromFlag(cmd, "from")
|
||||
targetName, targetConfig := getConfigFromFlag(cmd, "to")
|
||||
// Initialize backends
|
||||
exportBackend, err := backends.ResolveBackend[E](sourceConfig)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
importBackend, err := backends.ResolveBackend[I](targetConfig)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
exportInfo := exportBackendInfo[E, R]{
|
||||
backendInfo: backendInfo[E, R]{
|
||||
configName: sourceName,
|
||||
backend: exportBackend,
|
||||
},
|
||||
}
|
||||
|
||||
importInfo := importBackendInfo[I, R]{
|
||||
backendInfo: backendInfo[I, R]{
|
||||
configName: targetName,
|
||||
backend: importBackend,
|
||||
},
|
||||
}
|
||||
|
||||
return &exportInfo, &importInfo, nil
|
||||
}
|
||||
|
||||
func cmdExportImport[E models.Backend, I models.ImportBackend, R models.ListensResult | models.LovesResult](cmd *cobra.Command, entity string, exp *exportBackendInfo[E, R], imp *importBackendInfo[I, R]) {
|
||||
sourceName := exp.configName
|
||||
targetName := imp.configName
|
||||
fmt.Printf("Transferring %s from %s to %s...\n", entity, sourceName, targetName)
|
||||
|
||||
// Setup database
|
||||
db, err := storage.New(viper.GetString("database"))
|
||||
cobra.CheckErr(err)
|
||||
|
||||
// Authenticate backends, if needed
|
||||
config := viper.GetViper()
|
||||
_, err = backends.Authenticate(sourceName, exp.backend, db, config)
|
||||
cobra.CheckErr(err)
|
||||
|
||||
_, err = backends.Authenticate(targetName, imp.backend, db, config)
|
||||
cobra.CheckErr(err)
|
||||
|
||||
// Read timestamp
|
||||
timestamp := time.Unix(getInt64FromFlag(cmd, "timestamp"), 0)
|
||||
if timestamp == time.Unix(0, 0) {
|
||||
timestamp, err = db.GetImportTimestamp(sourceName, targetName, entity)
|
||||
cobra.CheckErr(err)
|
||||
}
|
||||
fmt.Printf("From timestamp: %v (%v)\n", timestamp, timestamp.Unix())
|
||||
|
||||
// Prepare progress bars
|
||||
exportProgress := make(chan models.Progress)
|
||||
importProgress := make(chan models.Progress)
|
||||
var wg sync.WaitGroup
|
||||
progress := progressBar(&wg, exportProgress, importProgress)
|
||||
|
||||
// Export from source
|
||||
exportChan := make(chan R, 1000)
|
||||
go exp.processor.Process(timestamp, exportChan, exportProgress)
|
||||
|
||||
// Import into target
|
||||
resultChan := make(chan models.ImportResult)
|
||||
go imp.processor.Process(exportChan, resultChan, importProgress)
|
||||
result := <-resultChan
|
||||
close(exportProgress)
|
||||
wg.Wait()
|
||||
progress.Wait()
|
||||
if result.Error != nil {
|
||||
fmt.Printf("Import failed, last reported timestamp was %v (%v)\n", result.LastTimestamp, result.LastTimestamp.Unix())
|
||||
cobra.CheckErr(result.Error)
|
||||
}
|
||||
fmt.Printf("Imported %v of %v %s into %v.\n",
|
||||
result.ImportCount, result.TotalCount, entity, targetName)
|
||||
|
||||
// Update timestamp
|
||||
if result.LastTimestamp.Unix() < timestamp.Unix() {
|
||||
result.LastTimestamp = timestamp
|
||||
}
|
||||
fmt.Printf("Latest timestamp: %v (%v)\n", result.LastTimestamp, result.LastTimestamp.Unix())
|
||||
err = db.SetImportTimestamp(sourceName, targetName, entity, result.LastTimestamp)
|
||||
cobra.CheckErr(err)
|
||||
|
||||
// Print errors
|
||||
if len(result.ImportErrors) > 0 {
|
||||
fmt.Printf("\nDuring the import the following errors occurred:\n")
|
||||
for _, err := range result.ImportErrors {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,15 +17,9 @@ Scotty. If not, see <https://www.gnu.org/licenses/>.
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
"go.uploadedlobster.com/scotty/internal/backends"
|
||||
"go.uploadedlobster.com/scotty/internal/models"
|
||||
"go.uploadedlobster.com/scotty/internal/storage"
|
||||
)
|
||||
|
||||
// listensCmd represents the listens command
|
||||
|
@ -34,77 +28,15 @@ var listensCmd = &cobra.Command{
|
|||
Short: "Transfer listens between two services",
|
||||
Long: `Transfers listens between two configured services.`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
sourceName, sourceConfig := getConfigFromFlag(cmd, "from")
|
||||
targetName, targetConfig := getConfigFromFlag(cmd, "to")
|
||||
fmt.Printf("Transferring listens from %s to %s...\n", sourceName, targetName)
|
||||
|
||||
// Setup database
|
||||
db, err := storage.New(viper.GetString("database"))
|
||||
cobra.CheckErr(err)
|
||||
|
||||
// Initialize backends
|
||||
exportBackend, err := backends.ResolveBackend[models.ListensExport](sourceConfig)
|
||||
cobra.CheckErr(err)
|
||||
importBackend, err := backends.ResolveBackend[models.ListensImport](targetConfig)
|
||||
cobra.CheckErr(err)
|
||||
|
||||
// Authenticate backends, if needed
|
||||
_, err = backends.Authenticate(sourceName, exportBackend, db, viper.GetViper())
|
||||
cobra.CheckErr(err)
|
||||
|
||||
_, err = backends.Authenticate(targetName, importBackend, db, viper.GetViper())
|
||||
cobra.CheckErr(err)
|
||||
|
||||
// Read timestamp
|
||||
timestamp := time.Unix(getInt64FromFlag(cmd, "timestamp"), 0)
|
||||
if timestamp == time.Unix(0, 0) {
|
||||
timestamp, err = db.GetImportTimestamp(sourceName, targetName, "listens")
|
||||
exp, imp, err := resolveBackends[models.ListensExport, models.ListensImport, models.ListensResult](cmd)
|
||||
cobra.CheckErr(err)
|
||||
exp.processor = backends.ListensExportProcessor{
|
||||
Backend: exp.backend,
|
||||
}
|
||||
fmt.Printf("From timestamp: %v (%v)\n", timestamp, timestamp.Unix())
|
||||
|
||||
// Prepare progress bars
|
||||
exportProgress := make(chan models.Progress)
|
||||
importProgress := make(chan models.Progress)
|
||||
var wg sync.WaitGroup
|
||||
progress := progressBar(&wg, exportProgress, importProgress)
|
||||
|
||||
// Export from source
|
||||
exportChan := make(chan models.ListensResult, 1000)
|
||||
go exportBackend.ExportListens(timestamp, exportChan, exportProgress)
|
||||
|
||||
// Import into target
|
||||
resultChan := make(chan models.ImportResult)
|
||||
var processor = backends.ListensImportProcessor{
|
||||
Backend: importBackend,
|
||||
}
|
||||
go processor.Process(exportChan, resultChan, importProgress)
|
||||
result := <-resultChan
|
||||
close(exportProgress)
|
||||
wg.Wait()
|
||||
progress.Wait()
|
||||
if result.Error != nil {
|
||||
fmt.Printf("Import failed, last reported timestamp was %v (%v)\n", result.LastTimestamp, result.LastTimestamp.Unix())
|
||||
cobra.CheckErr(result.Error)
|
||||
}
|
||||
fmt.Printf("Imported %v of %v listens into %v.\n",
|
||||
result.ImportCount, result.TotalCount, targetName)
|
||||
|
||||
// Update timestamp
|
||||
if result.LastTimestamp.Unix() < timestamp.Unix() {
|
||||
result.LastTimestamp = timestamp
|
||||
}
|
||||
fmt.Printf("Latest timestamp: %v (%v)\n", result.LastTimestamp, result.LastTimestamp.Unix())
|
||||
err = db.SetImportTimestamp(sourceName, targetName, "listens", result.LastTimestamp)
|
||||
cobra.CheckErr(err)
|
||||
|
||||
// Print errors
|
||||
if len(result.ImportErrors) > 0 {
|
||||
fmt.Printf("\nDuring the import the following errors occurred:\n")
|
||||
for _, err := range result.ImportErrors {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
}
|
||||
imp.processor = backends.ListensImportProcessor{
|
||||
Backend: imp.backend,
|
||||
}
|
||||
cmdExportImport(cmd, "listens", exp, imp)
|
||||
},
|
||||
}
|
||||
|
||||
|
|
80
cmd/loves.go
80
cmd/loves.go
|
@ -17,15 +17,9 @@ Scotty. If not, see <https://www.gnu.org/licenses/>.
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
"go.uploadedlobster.com/scotty/internal/backends"
|
||||
"go.uploadedlobster.com/scotty/internal/models"
|
||||
"go.uploadedlobster.com/scotty/internal/storage"
|
||||
)
|
||||
|
||||
// lovesCmd represents the loves command
|
||||
|
@ -34,77 +28,15 @@ var lovesCmd = &cobra.Command{
|
|||
Short: "Transfer loves between two services",
|
||||
Long: `Transfers loves between two configured services.`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
sourceName, sourceConfig := getConfigFromFlag(cmd, "from")
|
||||
targetName, targetConfig := getConfigFromFlag(cmd, "to")
|
||||
fmt.Printf("Transferring loves from %s to %s...\n", sourceName, targetName)
|
||||
|
||||
// Setup database
|
||||
db, err := storage.New(viper.GetString("database"))
|
||||
cobra.CheckErr(err)
|
||||
|
||||
// Initialize backends
|
||||
exportBackend, err := backends.ResolveBackend[models.LovesExport](sourceConfig)
|
||||
cobra.CheckErr(err)
|
||||
importBackend, err := backends.ResolveBackend[models.LovesImport](targetConfig)
|
||||
cobra.CheckErr(err)
|
||||
|
||||
// Authenticate backends, if needed
|
||||
_, err = backends.Authenticate(sourceName, exportBackend, db, viper.GetViper())
|
||||
cobra.CheckErr(err)
|
||||
|
||||
_, err = backends.Authenticate(targetName, importBackend, db, viper.GetViper())
|
||||
cobra.CheckErr(err)
|
||||
|
||||
// Read timestamp
|
||||
timestamp := time.Unix(getInt64FromFlag(cmd, "timestamp"), 0)
|
||||
if timestamp == time.Unix(0, 0) {
|
||||
timestamp, err = db.GetImportTimestamp(sourceName, targetName, "loves")
|
||||
exp, imp, err := resolveBackends[models.LovesExport, models.LovesImport, models.LovesResult](cmd)
|
||||
cobra.CheckErr(err)
|
||||
exp.processor = backends.LovesExportProcessor{
|
||||
Backend: exp.backend,
|
||||
}
|
||||
fmt.Printf("From timestamp: %v (%v)\n", timestamp, timestamp.Unix())
|
||||
|
||||
// Prepare progress bars
|
||||
exportProgress := make(chan models.Progress)
|
||||
importProgress := make(chan models.Progress)
|
||||
var wg sync.WaitGroup
|
||||
progress := progressBar(&wg, exportProgress, importProgress)
|
||||
|
||||
// Export from source
|
||||
exportChan := make(chan models.LovesResult, 1000)
|
||||
go exportBackend.ExportLoves(timestamp, exportChan, exportProgress)
|
||||
|
||||
// Import into target
|
||||
resultChan := make(chan models.ImportResult)
|
||||
var processor = backends.LovesImportProcessor{
|
||||
Backend: importBackend,
|
||||
}
|
||||
go processor.Process(exportChan, resultChan, importProgress)
|
||||
result := <-resultChan
|
||||
close(exportProgress)
|
||||
wg.Wait()
|
||||
progress.Wait()
|
||||
if result.Error != nil {
|
||||
fmt.Printf("Import failed, last reported timestamp was %v (%v)\n", result.LastTimestamp, result.LastTimestamp.Unix())
|
||||
cobra.CheckErr(result.Error)
|
||||
}
|
||||
fmt.Printf("Imported %v of %v loves into %v.\n",
|
||||
result.ImportCount, result.TotalCount, targetName)
|
||||
|
||||
// Update timestamp
|
||||
if result.LastTimestamp.Unix() < timestamp.Unix() {
|
||||
result.LastTimestamp = timestamp
|
||||
}
|
||||
fmt.Printf("Latest timestamp: %v (%v)\n", result.LastTimestamp, result.LastTimestamp.Unix())
|
||||
err = db.SetImportTimestamp(sourceName, targetName, "loves", result.LastTimestamp)
|
||||
cobra.CheckErr(err)
|
||||
|
||||
// Print errors
|
||||
if len(result.ImportErrors) > 0 {
|
||||
fmt.Printf("\nDuring the import the following errors occurred:\n")
|
||||
for _, err := range result.ImportErrors {
|
||||
fmt.Printf("Error: %v\n", err)
|
||||
}
|
||||
imp.processor = backends.LovesImportProcessor{
|
||||
Backend: imp.backend,
|
||||
}
|
||||
cmdExportImport(cmd, "loves", exp, imp)
|
||||
},
|
||||
}
|
||||
|
||||
|
|
51
internal/backends/export.go
Normal file
51
internal/backends/export.go
Normal file
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
Copyright © 2023 Philipp Wolfer <phw@uploadedlobster.com>
|
||||
|
||||
Scotty is free software: you can redistribute it and/or modify it under the
|
||||
terms of the GNU General Public License as published by the Free Software
|
||||
Foundation, either version 3 of the License, or (at your option) any later version.
|
||||
|
||||
Scotty is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License along with
|
||||
Scotty. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package backends
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"go.uploadedlobster.com/scotty/internal/models"
|
||||
)
|
||||
|
||||
type ExportProcessor[T models.ListensResult | models.LovesResult] interface {
|
||||
ExportBackend() models.Backend
|
||||
Process(oldestTimestamp time.Time, results chan T, progress chan models.Progress)
|
||||
}
|
||||
|
||||
type ListensExportProcessor struct {
|
||||
Backend models.ListensExport
|
||||
}
|
||||
|
||||
func (p ListensExportProcessor) ExportBackend() models.Backend {
|
||||
return p.Backend
|
||||
}
|
||||
|
||||
func (p ListensExportProcessor) Process(oldestTimestamp time.Time, results chan models.ListensResult, progress chan models.Progress) {
|
||||
p.Backend.ExportListens(oldestTimestamp, results, progress)
|
||||
}
|
||||
|
||||
type LovesExportProcessor struct {
|
||||
Backend models.LovesExport
|
||||
}
|
||||
|
||||
func (p LovesExportProcessor) ExportBackend() models.Backend {
|
||||
return p.Backend
|
||||
}
|
||||
|
||||
func (p LovesExportProcessor) Process(oldestTimestamp time.Time, results chan models.LovesResult, progress chan models.Progress) {
|
||||
p.Backend.ExportLoves(oldestTimestamp, results, progress)
|
||||
}
|
|
@ -17,7 +17,9 @@ Scotty. If not, see <https://www.gnu.org/licenses/>.
|
|||
|
||||
package backends
|
||||
|
||||
import "go.uploadedlobster.com/scotty/internal/models"
|
||||
import (
|
||||
"go.uploadedlobster.com/scotty/internal/models"
|
||||
)
|
||||
|
||||
type ImportProcessor[T models.ListensResult | models.LovesResult] interface {
|
||||
ImportBackend() models.ImportBackend
|
||||
|
|
Loading…
Add table
Reference in a new issue