From 4614a7568df62ad5a602c6fdf0613c34bbcf552e Mon Sep 17 00:00:00 2001 From: CrazyMax Date: Sun, 9 Jun 2019 19:43:48 +0200 Subject: [PATCH] Add worker pool to parallelize analyses --- CHANGELOG.md | 4 ++ README.md | 10 +-- cmd/main.go | 2 +- go.mod | 1 - go.sum | 2 - internal/app/dispatcher.go | 48 +++++++++++++ internal/app/diun.go | 135 ++++++++++++++++++++----------------- internal/app/worker.go | 45 +++++++++++++ internal/config/config.go | 1 + internal/model/watch.go | 1 + 10 files changed, 178 insertions(+), 71 deletions(-) create mode 100644 internal/app/dispatcher.go create mode 100644 internal/app/worker.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e5f2ab5..c169ee80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 0.5.0 (2019/06/09) + +* Add worker pool to parallelize analyses + ## 0.4.1 (2019/06/08) * Filter tags before return them diff --git a/README.md b/README.md index 38ffea36..110147e9 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ * Allow to watch a full Docker repository and report new tags * Include and exclude filters with regular expression for tags * Internal cron implementation through go routines +* Worker pool to parallelize analyses * Allow overriding os and architecture when watching * Beautiful email report * Webhook notification @@ -81,6 +82,7 @@ db: path: diun.db watch: + workers: 10 schedule: 0 0 * * * * os: linux arch: amd64 @@ -128,14 +130,14 @@ items: - image: quay.io/coreos/hyperkube # Watch crazymax/swarm-cronjob image and assume docker.io regsitry and latest tag. - # Only include tags matching regexp ^1.2.* + # Only include tags matching regexp ^1\.2\..* - image: crazymax/swarm-cronjob watch_repo: true include_tags: - - ^1.2.* + - ^1\.2\..* # Watch portainer/portainer image on docker.io (DockerHub) and assume latest tag - # Only watch latest 10 tags and include tags matching regexp ^(0|[1-9]\d*)\.* + # Only watch latest 10 tags and include tags matching regexp ^(0|[1-9]\d*)\..* - image: docker.io/portainer/portainer watch_repo: true @@ -147,6 +149,7 @@ items: * `db` * `path`: Path to Bolt database file where images manifests are stored. Flag `--docker` force this path to `/data/diun.db` (default: `diun.db`). * `watch` + * `workers`: Maximum number of workers that will execute tasks concurrently. _Optional_. (default: `10`). * `schedule`: [CRON expression](https://godoc.org/github.com/crazy-max/cron#hdr-CRON_Expression_Format) to schedule Diun watcher. _Optional_. (default: `0 0 * * * *`). * `os`: OS to use for choosing images. _Optional_. (default: `linux`). * `arch`: Architecture to use for choosing images. _Optional_. (default: `amd64`). @@ -235,7 +238,6 @@ And here is an email sample if you add `mail` notification: ## TODO -* [ ] Create a worker pool to parallelize the analyses * [ ] Watch images inside Dockerfile and Compose files * [ ] Watch images from Docker daemon * [ ] Watch starred repo on DockerHub and Quay diff --git a/cmd/main.go b/cmd/main.go index f7413b92..09b55dec 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -85,7 +85,7 @@ func main() { // Start scheduler c = cron.NewWithLocation(location) - log.Info().Msgf("Start watcher with schedule %s", cfg.Watch.Schedule) + log.Info().Msgf("Watcher initialized with schedule %s", cfg.Watch.Schedule) if err := c.AddJob(cfg.Watch.Schedule, diun); err != nil { log.Fatal().Err(err).Msg("Cannot create cron task") } diff --git a/go.mod b/go.mod index c73918ed..6394734a 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,6 @@ require ( github.com/go-gomail/gomail v0.0.0-20160411212932-81ebce5c23df github.com/google/go-cmp v0.3.0 // indirect github.com/gorilla/mux v1.7.2 // indirect - github.com/hako/durafmt v0.0.0-20180520121703-7b7ae1e72ead github.com/imdario/mergo v0.3.7 github.com/matcornic/hermes/v2 v2.0.2 github.com/opencontainers/go-digest v1.0.0-rc1 diff --git a/go.sum b/go.sum index adeca058..f16fe994 100644 --- a/go.sum +++ b/go.sum @@ -58,8 +58,6 @@ github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.7.2 h1:zoNxOV7WjqXptQOVngLmcSQgXmgk4NMz1HibBchjl/I= github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= -github.com/hako/durafmt v0.0.0-20180520121703-7b7ae1e72ead h1:Y9WOGZY2nw5ksbEf5AIpk+vK52Tdg/VN/rHFRfEeeGQ= -github.com/hako/durafmt v0.0.0-20180520121703-7b7ae1e72ead/go.mod h1:5Scbynm8dF1XAPwIwkGPqzkM/shndPm79Jd1003hTjE= github.com/huandu/xstrings v1.2.0 h1:yPeWdRnmynF7p+lLYz0H2tthW9lqhMJrQV/U7yy4wX0= github.com/huandu/xstrings v1.2.0/go.mod h1:DvyZB1rfVYsBIigL8HwpZgxHwXozlTgGqn63UyNX5k4= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= diff --git a/internal/app/dispatcher.go b/internal/app/dispatcher.go new file mode 100644 index 00000000..710f1276 --- /dev/null +++ b/internal/app/dispatcher.go @@ -0,0 +1,48 @@ +package app + +type Collector struct { + Job chan Job + end chan bool +} + +var workerChannel = make(chan chan Job) + +func (di *Diun) StartDispatcher(workerCount int) Collector { + var i int + var workers []worker + input := make(chan Job) + end := make(chan bool) + collector := Collector{ + Job: input, + end: end, + } + + for i < workerCount { + i++ + worker := worker{ + diun: di, + workerPool: workerChannel, + jobChannel: make(chan Job), + end: make(chan bool), + } + worker.Start() + workers = append(workers, worker) + } + + go func() { + for { + select { + case <-end: + for _, w := range workers { + w.Stop() + } + return + case work := <-input: + worker := <-workerChannel + worker <- work + } + } + }() + + return collector +} diff --git a/internal/app/diun.go b/internal/app/diun.go index 3ed4a4c1..17c7f55a 100644 --- a/internal/app/diun.go +++ b/internal/app/diun.go @@ -1,8 +1,8 @@ package app import ( - "encoding/json" "fmt" + "sync" "sync/atomic" "time" @@ -13,16 +13,16 @@ import ( "github.com/crazy-max/diun/internal/utl" "github.com/crazy-max/diun/pkg/docker" "github.com/crazy-max/diun/pkg/docker/registry" - "github.com/hako/durafmt" "github.com/rs/zerolog/log" ) // Diun represents an active diun object type Diun struct { - cfg *config.Config - db *db.Client - notif *notif.Client - locker uint32 + cfg *config.Config + db *db.Client + notif *notif.Client + locker uint32 + collector Collector } // New creates new diun instance @@ -53,7 +53,10 @@ func (di *Diun) Run() { return } defer atomic.StoreUint32(&di.locker, 0) - defer di.trackTime(time.Now(), "Finished, total time spent: ") + + log.Info().Msg("Running process") + var wg sync.WaitGroup + di.collector = di.StartDispatcher(di.cfg.Watch.Workers) // Iterate items for _, item := range di.cfg.Items { @@ -70,58 +73,96 @@ func (di *Diun) Run() { continue } - image, err := di.analyzeImage(item.Image, item, reg) + image, err := registry.ParseImage(item.Image) if err != nil { - log.Error().Err(err).Str("image", item.Image).Msg("Cannot analyze image") + log.Error().Err(err).Str("image", item.Image).Msg("Cannot parse image") + continue + } + + wg.Add(1) + di.collector.Job <- Job{ + ImageStr: item.Image, + Item: item, + Reg: reg, + Wg: &wg, } if image.Domain != "" && item.WatchRepo { - di.analyzeRepo(image, item, reg) + tags, err := reg.Tags(docker.TagsOptions{ + Image: image, + Max: item.MaxTags, + Include: item.IncludeTags, + Exclude: item.ExcludeTags, + }) + if err != nil { + log.Error().Err(err).Str("image", image.String()).Msg("Cannot retrieve tags") + continue + } + + log.Debug().Str("image", image.String()).Msgf("%d tag(s) found in repository. %d will be analyzed (%d max, %d not included, %d excluded).", + tags.Total, + len(tags.List), + item.MaxTags, + tags.NotIncluded, + tags.Excluded, + ) + + for _, tag := range tags.List { + wg.Add(1) + di.collector.Job <- Job{ + ImageStr: fmt.Sprintf("%s/%s:%s", image.Domain, image.Path, tag), + Item: item, + Reg: reg, + Wg: &wg, + } + } } } + + wg.Wait() } -func (di *Diun) analyzeImage(imageStr string, item model.Item, reg *docker.RegistryClient) (registry.Image, error) { - image, err := registry.ParseImage(imageStr) +func (di *Diun) analyze(job Job) error { + defer job.Wg.Done() + image, err := registry.ParseImage(job.ImageStr) if err != nil { - return registry.Image{}, fmt.Errorf("cannot parse image name %s: %v", item.Image, err) + return err } - if !utl.IsIncluded(image.Tag, item.IncludeTags) { - log.Warn().Str("image", image.String()).Msgf("Tag %s not included", image.Tag) - return image, nil - } else if utl.IsExcluded(image.Tag, item.ExcludeTags) { - log.Warn().Str("image", image.String()).Msgf("Tag %s excluded", image.Tag) - return image, nil + if !utl.IsIncluded(image.Tag, job.Item.IncludeTags) { + log.Warn().Str("image", image.String()).Msg("Tag not included") + return nil + } else if utl.IsExcluded(image.Tag, job.Item.ExcludeTags) { + log.Warn().Str("image", image.String()).Msg("Tag excluded") + return nil } - log.Debug().Str("image", image.String()).Msgf("Fetching manifest") - liveManifest, err := reg.Manifest(image) + liveManifest, err := job.Reg.Manifest(image) if err != nil { - return image, err + return err } - b, _ := json.MarshalIndent(liveManifest, "", " ") - log.Debug().Msg(string(b)) + /*b, _ := json.MarshalIndent(liveManifest, "", " ") + log.Debug().Msg(string(b))*/ dbManifest, err := di.db.GetManifest(image) if err != nil { - return image, err + return err } status := model.ImageStatusUnchange if dbManifest.Name == "" { status = model.ImageStatusNew - log.Info().Str("image", image.String()).Msgf("New image found") + log.Info().Str("image", image.String()).Msg("New image found") } else if !liveManifest.Created.Equal(*dbManifest.Created) { status = model.ImageStatusUpdate - log.Info().Str("image", image.String()).Msgf("Image update found") + log.Info().Str("image", image.String()).Msg("Image update found") } else { - log.Debug().Str("image", image.String()).Msgf("No changes") - return image, nil + log.Debug().Str("image", image.String()).Msg("No changes") + return nil } if err := di.db.PutManifest(image, liveManifest); err != nil { - return image, err + return err } log.Debug().Str("image", image.String()).Msg("Manifest saved to database") @@ -131,35 +172,7 @@ func (di *Diun) analyzeImage(imageStr string, item model.Item, reg *docker.Regis Manifest: liveManifest, }) - return image, nil -} - -func (di *Diun) analyzeRepo(image registry.Image, item model.Item, reg *docker.RegistryClient) { - tags, err := reg.Tags(docker.TagsOptions{ - Image: image, - Max: item.MaxTags, - Include: item.IncludeTags, - Exclude: item.ExcludeTags, - }) - if err != nil { - log.Error().Err(err).Str("image", image.String()).Msg("Cannot retrieve tags") - return - } - log.Debug().Str("image", image.String()).Msgf("%d tag(s) found in repository. %d will be analyzed (%d max, %d not included, %d excluded).", - tags.Total, - len(tags.List), - item.MaxTags, - tags.NotIncluded, - tags.Excluded, - ) - - for _, tag := range tags.List { - imageStr := fmt.Sprintf("%s/%s:%s", image.Domain, image.Path, tag) - if _, err := di.analyzeImage(imageStr, item, reg); err != nil { - log.Error().Err(err).Str("image", imageStr).Msg("Cannot analyze image") - continue - } - } + return nil } // Close closes diun @@ -168,7 +181,3 @@ func (di *Diun) Close() { log.Warn().Err(err).Msg("Cannot close database") } } - -func (di *Diun) trackTime(start time.Time, prefix string) { - log.Info().Msgf("%s%s", prefix, durafmt.ParseShort(time.Since(start)).String()) -} diff --git a/internal/app/worker.go b/internal/app/worker.go new file mode 100644 index 00000000..cf21fc25 --- /dev/null +++ b/internal/app/worker.go @@ -0,0 +1,45 @@ +package app + +import ( + "sync" + + "github.com/crazy-max/diun/internal/model" + "github.com/crazy-max/diun/pkg/docker" + "github.com/rs/zerolog/log" +) + +type Job struct { + ImageStr string + Item model.Item + Reg *docker.RegistryClient + Wg *sync.WaitGroup +} + +type worker struct { + diun *Diun + workerPool chan chan Job + jobChannel chan Job + end chan bool +} + +// Start method starts the run loop for the worker +func (w *worker) Start() { + go func() { + for { + w.workerPool <- w.jobChannel + select { + case job := <-w.jobChannel: + if err := w.diun.analyze(job); err != nil { + log.Error().Err(err).Str("image", job.ImageStr).Msg("Error analyzing image") + } + case <-w.end: + return + } + } + }() +} + +// Stop signals the worker to stop listening for work requests. +func (w *worker) Stop() { + w.end <- true +} diff --git a/internal/config/config.go b/internal/config/config.go index 7e2d0aa9..5b1f8090 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -44,6 +44,7 @@ func Load(fl model.Flags, version string) (*Config, error) { Path: "diun.db", }, Watch: model.Watch{ + Workers: 10, Schedule: "0 0 * * * *", Os: "linux", Arch: "amd64", diff --git a/internal/model/watch.go b/internal/model/watch.go index aa6917f3..beeecf3f 100644 --- a/internal/model/watch.go +++ b/internal/model/watch.go @@ -2,6 +2,7 @@ package model // Watch holds data necessary for watch configuration type Watch struct { + Workers int `yaml:"int,omitempty"` Schedule string `yaml:"schedule,omitempty"` Os string `yaml:"os,omitempty"` Arch string `yaml:"arch,omitempty"`