Add worker pool to parallelize analyses

This commit is contained in:
CrazyMax
2019-06-09 19:43:48 +02:00
parent 4742827709
commit 4614a7568d
10 changed files with 178 additions and 71 deletions

View File

@@ -1,5 +1,9 @@
# Changelog
## 0.5.0 (2019/06/09)
* Add worker pool to parallelize analyses
## 0.4.1 (2019/06/08)
* Filter tags before return them

View File

@@ -22,6 +22,7 @@
* Allow to watch a full Docker repository and report new tags
* Include and exclude filters with regular expression for tags
* Internal cron implementation through go routines
* Worker pool to parallelize analyses
* Allow overriding os and architecture when watching
* Beautiful email report
* Webhook notification
@@ -81,6 +82,7 @@ db:
path: diun.db
watch:
workers: 10
schedule: 0 0 * * * *
os: linux
arch: amd64
@@ -128,14 +130,14 @@ items:
-
image: quay.io/coreos/hyperkube
# Watch crazymax/swarm-cronjob image and assume docker.io regsitry and latest tag.
# Only include tags matching regexp ^1.2.*
# Only include tags matching regexp ^1\.2\..*
-
image: crazymax/swarm-cronjob
watch_repo: true
include_tags:
- ^1.2.*
- ^1\.2\..*
# Watch portainer/portainer image on docker.io (DockerHub) and assume latest tag
# Only watch latest 10 tags and include tags matching regexp ^(0|[1-9]\d*)\.*
# Only watch latest 10 tags and include tags matching regexp ^(0|[1-9]\d*)\..*
-
image: docker.io/portainer/portainer
watch_repo: true
@@ -147,6 +149,7 @@ items:
* `db`
* `path`: Path to Bolt database file where images manifests are stored. Flag `--docker` force this path to `/data/diun.db` (default: `diun.db`).
* `watch`
* `workers`: Maximum number of workers that will execute tasks concurrently. _Optional_. (default: `10`).
* `schedule`: [CRON expression](https://godoc.org/github.com/crazy-max/cron#hdr-CRON_Expression_Format) to schedule Diun watcher. _Optional_. (default: `0 0 * * * *`).
* `os`: OS to use for choosing images. _Optional_. (default: `linux`).
* `arch`: Architecture to use for choosing images. _Optional_. (default: `amd64`).
@@ -235,7 +238,6 @@ And here is an email sample if you add `mail` notification:
## TODO
* [ ] Create a worker pool to parallelize the analyses
* [ ] Watch images inside Dockerfile and Compose files
* [ ] Watch images from Docker daemon
* [ ] Watch starred repo on DockerHub and Quay

View File

@@ -85,7 +85,7 @@ func main() {
// Start scheduler
c = cron.NewWithLocation(location)
log.Info().Msgf("Start watcher with schedule %s", cfg.Watch.Schedule)
log.Info().Msgf("Watcher initialized with schedule %s", cfg.Watch.Schedule)
if err := c.AddJob(cfg.Watch.Schedule, diun); err != nil {
log.Fatal().Err(err).Msg("Cannot create cron task")
}

1
go.mod
View File

@@ -18,7 +18,6 @@ require (
github.com/go-gomail/gomail v0.0.0-20160411212932-81ebce5c23df
github.com/google/go-cmp v0.3.0 // indirect
github.com/gorilla/mux v1.7.2 // indirect
github.com/hako/durafmt v0.0.0-20180520121703-7b7ae1e72ead
github.com/imdario/mergo v0.3.7
github.com/matcornic/hermes/v2 v2.0.2
github.com/opencontainers/go-digest v1.0.0-rc1

2
go.sum
View File

@@ -58,8 +58,6 @@ github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.7.2 h1:zoNxOV7WjqXptQOVngLmcSQgXmgk4NMz1HibBchjl/I=
github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/hako/durafmt v0.0.0-20180520121703-7b7ae1e72ead h1:Y9WOGZY2nw5ksbEf5AIpk+vK52Tdg/VN/rHFRfEeeGQ=
github.com/hako/durafmt v0.0.0-20180520121703-7b7ae1e72ead/go.mod h1:5Scbynm8dF1XAPwIwkGPqzkM/shndPm79Jd1003hTjE=
github.com/huandu/xstrings v1.2.0 h1:yPeWdRnmynF7p+lLYz0H2tthW9lqhMJrQV/U7yy4wX0=
github.com/huandu/xstrings v1.2.0/go.mod h1:DvyZB1rfVYsBIigL8HwpZgxHwXozlTgGqn63UyNX5k4=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=

View File

@@ -0,0 +1,48 @@
package app
type Collector struct {
Job chan Job
end chan bool
}
var workerChannel = make(chan chan Job)
func (di *Diun) StartDispatcher(workerCount int) Collector {
var i int
var workers []worker
input := make(chan Job)
end := make(chan bool)
collector := Collector{
Job: input,
end: end,
}
for i < workerCount {
i++
worker := worker{
diun: di,
workerPool: workerChannel,
jobChannel: make(chan Job),
end: make(chan bool),
}
worker.Start()
workers = append(workers, worker)
}
go func() {
for {
select {
case <-end:
for _, w := range workers {
w.Stop()
}
return
case work := <-input:
worker := <-workerChannel
worker <- work
}
}
}()
return collector
}

View File

@@ -1,8 +1,8 @@
package app
import (
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
@@ -13,16 +13,16 @@ import (
"github.com/crazy-max/diun/internal/utl"
"github.com/crazy-max/diun/pkg/docker"
"github.com/crazy-max/diun/pkg/docker/registry"
"github.com/hako/durafmt"
"github.com/rs/zerolog/log"
)
// Diun represents an active diun object
type Diun struct {
cfg *config.Config
db *db.Client
notif *notif.Client
locker uint32
cfg *config.Config
db *db.Client
notif *notif.Client
locker uint32
collector Collector
}
// New creates new diun instance
@@ -53,7 +53,10 @@ func (di *Diun) Run() {
return
}
defer atomic.StoreUint32(&di.locker, 0)
defer di.trackTime(time.Now(), "Finished, total time spent: ")
log.Info().Msg("Running process")
var wg sync.WaitGroup
di.collector = di.StartDispatcher(di.cfg.Watch.Workers)
// Iterate items
for _, item := range di.cfg.Items {
@@ -70,58 +73,96 @@ func (di *Diun) Run() {
continue
}
image, err := di.analyzeImage(item.Image, item, reg)
image, err := registry.ParseImage(item.Image)
if err != nil {
log.Error().Err(err).Str("image", item.Image).Msg("Cannot analyze image")
log.Error().Err(err).Str("image", item.Image).Msg("Cannot parse image")
continue
}
wg.Add(1)
di.collector.Job <- Job{
ImageStr: item.Image,
Item: item,
Reg: reg,
Wg: &wg,
}
if image.Domain != "" && item.WatchRepo {
di.analyzeRepo(image, item, reg)
tags, err := reg.Tags(docker.TagsOptions{
Image: image,
Max: item.MaxTags,
Include: item.IncludeTags,
Exclude: item.ExcludeTags,
})
if err != nil {
log.Error().Err(err).Str("image", image.String()).Msg("Cannot retrieve tags")
continue
}
log.Debug().Str("image", image.String()).Msgf("%d tag(s) found in repository. %d will be analyzed (%d max, %d not included, %d excluded).",
tags.Total,
len(tags.List),
item.MaxTags,
tags.NotIncluded,
tags.Excluded,
)
for _, tag := range tags.List {
wg.Add(1)
di.collector.Job <- Job{
ImageStr: fmt.Sprintf("%s/%s:%s", image.Domain, image.Path, tag),
Item: item,
Reg: reg,
Wg: &wg,
}
}
}
}
wg.Wait()
}
func (di *Diun) analyzeImage(imageStr string, item model.Item, reg *docker.RegistryClient) (registry.Image, error) {
image, err := registry.ParseImage(imageStr)
func (di *Diun) analyze(job Job) error {
defer job.Wg.Done()
image, err := registry.ParseImage(job.ImageStr)
if err != nil {
return registry.Image{}, fmt.Errorf("cannot parse image name %s: %v", item.Image, err)
return err
}
if !utl.IsIncluded(image.Tag, item.IncludeTags) {
log.Warn().Str("image", image.String()).Msgf("Tag %s not included", image.Tag)
return image, nil
} else if utl.IsExcluded(image.Tag, item.ExcludeTags) {
log.Warn().Str("image", image.String()).Msgf("Tag %s excluded", image.Tag)
return image, nil
if !utl.IsIncluded(image.Tag, job.Item.IncludeTags) {
log.Warn().Str("image", image.String()).Msg("Tag not included")
return nil
} else if utl.IsExcluded(image.Tag, job.Item.ExcludeTags) {
log.Warn().Str("image", image.String()).Msg("Tag excluded")
return nil
}
log.Debug().Str("image", image.String()).Msgf("Fetching manifest")
liveManifest, err := reg.Manifest(image)
liveManifest, err := job.Reg.Manifest(image)
if err != nil {
return image, err
return err
}
b, _ := json.MarshalIndent(liveManifest, "", " ")
log.Debug().Msg(string(b))
/*b, _ := json.MarshalIndent(liveManifest, "", " ")
log.Debug().Msg(string(b))*/
dbManifest, err := di.db.GetManifest(image)
if err != nil {
return image, err
return err
}
status := model.ImageStatusUnchange
if dbManifest.Name == "" {
status = model.ImageStatusNew
log.Info().Str("image", image.String()).Msgf("New image found")
log.Info().Str("image", image.String()).Msg("New image found")
} else if !liveManifest.Created.Equal(*dbManifest.Created) {
status = model.ImageStatusUpdate
log.Info().Str("image", image.String()).Msgf("Image update found")
log.Info().Str("image", image.String()).Msg("Image update found")
} else {
log.Debug().Str("image", image.String()).Msgf("No changes")
return image, nil
log.Debug().Str("image", image.String()).Msg("No changes")
return nil
}
if err := di.db.PutManifest(image, liveManifest); err != nil {
return image, err
return err
}
log.Debug().Str("image", image.String()).Msg("Manifest saved to database")
@@ -131,35 +172,7 @@ func (di *Diun) analyzeImage(imageStr string, item model.Item, reg *docker.Regis
Manifest: liveManifest,
})
return image, nil
}
func (di *Diun) analyzeRepo(image registry.Image, item model.Item, reg *docker.RegistryClient) {
tags, err := reg.Tags(docker.TagsOptions{
Image: image,
Max: item.MaxTags,
Include: item.IncludeTags,
Exclude: item.ExcludeTags,
})
if err != nil {
log.Error().Err(err).Str("image", image.String()).Msg("Cannot retrieve tags")
return
}
log.Debug().Str("image", image.String()).Msgf("%d tag(s) found in repository. %d will be analyzed (%d max, %d not included, %d excluded).",
tags.Total,
len(tags.List),
item.MaxTags,
tags.NotIncluded,
tags.Excluded,
)
for _, tag := range tags.List {
imageStr := fmt.Sprintf("%s/%s:%s", image.Domain, image.Path, tag)
if _, err := di.analyzeImage(imageStr, item, reg); err != nil {
log.Error().Err(err).Str("image", imageStr).Msg("Cannot analyze image")
continue
}
}
return nil
}
// Close closes diun
@@ -168,7 +181,3 @@ func (di *Diun) Close() {
log.Warn().Err(err).Msg("Cannot close database")
}
}
func (di *Diun) trackTime(start time.Time, prefix string) {
log.Info().Msgf("%s%s", prefix, durafmt.ParseShort(time.Since(start)).String())
}

45
internal/app/worker.go Normal file
View File

@@ -0,0 +1,45 @@
package app
import (
"sync"
"github.com/crazy-max/diun/internal/model"
"github.com/crazy-max/diun/pkg/docker"
"github.com/rs/zerolog/log"
)
type Job struct {
ImageStr string
Item model.Item
Reg *docker.RegistryClient
Wg *sync.WaitGroup
}
type worker struct {
diun *Diun
workerPool chan chan Job
jobChannel chan Job
end chan bool
}
// Start method starts the run loop for the worker
func (w *worker) Start() {
go func() {
for {
w.workerPool <- w.jobChannel
select {
case job := <-w.jobChannel:
if err := w.diun.analyze(job); err != nil {
log.Error().Err(err).Str("image", job.ImageStr).Msg("Error analyzing image")
}
case <-w.end:
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w *worker) Stop() {
w.end <- true
}

View File

@@ -44,6 +44,7 @@ func Load(fl model.Flags, version string) (*Config, error) {
Path: "diun.db",
},
Watch: model.Watch{
Workers: 10,
Schedule: "0 0 * * * *",
Os: "linux",
Arch: "amd64",

View File

@@ -2,6 +2,7 @@ package model
// Watch holds data necessary for watch configuration
type Watch struct {
Workers int `yaml:"int,omitempty"`
Schedule string `yaml:"schedule,omitempty"`
Os string `yaml:"os,omitempty"`
Arch string `yaml:"arch,omitempty"`