Initial implementation of unified export/import progress

Both export and import progress get updated over a unified channel.
Most importantly this allows updating the import total from latest
export results.
This commit is contained in:
Philipp Wolfer 2025-05-05 11:38:29 +02:00
parent 1f48abc284
commit b8e6ccffdb
No known key found for this signature in database
GPG key ID: 8FDF744D4919943B
18 changed files with 369 additions and 194 deletions

View file

@ -28,7 +28,18 @@ import (
"go.uploadedlobster.com/scotty/internal/models"
)
func progressBar(exportProgress chan models.Progress, importProgress chan models.Progress) *mpb.Progress {
type progressBarUpdater struct {
wg *sync.WaitGroup
progress *mpb.Progress
exportBar *mpb.Bar
importBar *mpb.Bar
updateChan chan models.TransferProgress
lastExportUpdate time.Time
totalItems int
importedItems int
}
func setupProgressBars(updateChan chan models.TransferProgress) progressBarUpdater {
wg := &sync.WaitGroup{}
p := mpb.New(
mpb.WithWaitGroup(wg),
@ -37,15 +48,71 @@ func progressBar(exportProgress chan models.Progress, importProgress chan models
mpb.WithAutoRefresh(),
)
exportBar := setupProgressBar(p, i18n.Tr("exporting"))
importBar := setupProgressBar(p, i18n.Tr("importing"))
go updateProgressBar(exportBar, wg, exportProgress)
go updateProgressBar(importBar, wg, importProgress)
u := progressBarUpdater{
wg: wg,
progress: p,
exportBar: initProgressBar(p, i18n.Tr("exporting")),
importBar: initProgressBar(p, i18n.Tr("importing")),
updateChan: updateChan,
}
return p
go u.update()
return u
}
func setupProgressBar(p *mpb.Progress, name string) *mpb.Bar {
func (u *progressBarUpdater) wait() {
// FIXME: This should probably be closed elsewhere
close(u.updateChan)
u.progress.Wait()
}
func (u *progressBarUpdater) update() {
u.wg.Add(1)
defer u.wg.Done()
u.lastExportUpdate = time.Now()
for progress := range u.updateChan {
if progress.Export != nil {
u.updateExportProgress(progress.Export)
}
if progress.Import != nil {
if int64(u.totalItems) > progress.Import.Total {
progress.Import.Total = int64(u.totalItems)
}
u.updateImportProgress(progress.Import)
}
}
}
func (u *progressBarUpdater) updateExportProgress(progress *models.Progress) {
bar := u.exportBar
u.totalItems = progress.TotalItems
if progress.Aborted {
bar.Abort(false)
return
}
oldIterTime := u.lastExportUpdate
u.lastExportUpdate = time.Now()
elapsedTime := u.lastExportUpdate.Sub(oldIterTime)
bar.EwmaSetCurrent(progress.Elapsed, elapsedTime)
bar.SetTotal(progress.Total, progress.Completed)
}
func (u *progressBarUpdater) updateImportProgress(progress *models.Progress) {
bar := u.importBar
if progress.Aborted {
bar.Abort(false)
return
}
bar.SetCurrent(progress.Elapsed)
bar.SetTotal(progress.Total, progress.Completed)
}
func initProgressBar(p *mpb.Progress, name string) *mpb.Bar {
green := color.New(color.FgGreen).SprintFunc()
return p.New(0,
mpb.BarStyle(),
@ -69,19 +136,3 @@ func setupProgressBar(p *mpb.Progress, name string) *mpb.Bar {
),
)
}
func updateProgressBar(bar *mpb.Bar, wg *sync.WaitGroup, progressChan chan models.Progress) {
wg.Add(1)
defer wg.Done()
lastIterTime := time.Now()
for progress := range progressChan {
if progress.Aborted {
bar.Abort(false)
return
}
oldIterTime := lastIterTime
lastIterTime = time.Now()
bar.EwmaSetCurrent(progress.Elapsed, lastIterTime.Sub(oldIterTime))
bar.SetTotal(progress.Total, progress.Completed)
}
}

View file

@ -109,19 +109,18 @@ func (c *TransferCmd[E, I, R]) Transfer(exp backends.ExportProcessor[R], imp bac
printTimestamp("From timestamp: %v (%v)", timestamp)
// Prepare progress bars
exportProgress := make(chan models.Progress)
importProgress := make(chan models.Progress)
progress := progressBar(exportProgress, importProgress)
progressChan := make(chan models.TransferProgress)
progress := setupProgressBars(progressChan)
// Export from source
exportChan := make(chan R, 1000)
go exp.Process(timestamp, exportChan, exportProgress)
go exp.Process(timestamp, exportChan, progressChan)
// Import into target
resultChan := make(chan models.ImportResult)
go imp.Process(exportChan, resultChan, importProgress)
go imp.Process(exportChan, resultChan, progressChan)
result := <-resultChan
progress.Wait()
progress.wait()
// Update timestamp
err = c.updateTimestamp(&result, timestamp)