mirror of
https://github.com/crazy-max/diun.git
synced 2025-12-24 06:28:13 +01:00
Review config file structure and improve worker pool
This commit is contained in:
@@ -1,5 +1,10 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 1.0.0 (2019/06/28)
|
||||||
|
|
||||||
|
* Review config file structure
|
||||||
|
* Improve worker pool
|
||||||
|
|
||||||
## 0.5.0 (2019/06/09)
|
## 0.5.0 (2019/06/09)
|
||||||
|
|
||||||
* Add worker pool to parallelize analyses
|
* Add worker pool to parallelize analyses
|
||||||
|
|||||||
@@ -67,10 +67,6 @@ func main() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal().Err(err).Msg("Cannot load configuration")
|
log.Fatal().Err(err).Msg("Cannot load configuration")
|
||||||
}
|
}
|
||||||
if err := cfg.Check(); err != nil {
|
|
||||||
cfg.Display()
|
|
||||||
log.Fatal().Err(err).Msg("Improper configuration")
|
|
||||||
}
|
|
||||||
cfg.Display()
|
cfg.Display()
|
||||||
|
|
||||||
// Init
|
// Init
|
||||||
@@ -85,7 +81,7 @@ func main() {
|
|||||||
|
|
||||||
// Start scheduler
|
// Start scheduler
|
||||||
c = cron.NewWithLocation(location)
|
c = cron.NewWithLocation(location)
|
||||||
log.Info().Msgf("Watcher initialized with schedule %s", cfg.Watch.Schedule)
|
log.Info().Msgf("Cron initialized with schedule %s", cfg.Watch.Schedule)
|
||||||
if err := c.AddJob(cfg.Watch.Schedule, diun); err != nil {
|
if err := c.AddJob(cfg.Watch.Schedule, diun); err != nil {
|
||||||
log.Fatal().Err(err).Msg("Cannot create cron task")
|
log.Fatal().Err(err).Msg("Cannot create cron task")
|
||||||
}
|
}
|
||||||
|
|||||||
5
go.mod
5
go.mod
@@ -4,7 +4,7 @@ require (
|
|||||||
github.com/BurntSushi/toml v0.3.1 // indirect
|
github.com/BurntSushi/toml v0.3.1 // indirect
|
||||||
github.com/Microsoft/go-winio v0.4.12 // indirect
|
github.com/Microsoft/go-winio v0.4.12 // indirect
|
||||||
github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc // indirect
|
github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc // indirect
|
||||||
github.com/containers/image v1.5.1
|
github.com/containers/image v2.0.0+incompatible
|
||||||
github.com/containers/storage v1.12.9 // indirect
|
github.com/containers/storage v1.12.9 // indirect
|
||||||
github.com/crazy-max/cron v1.2.2
|
github.com/crazy-max/cron v1.2.2
|
||||||
github.com/docker/distribution v2.7.1+incompatible // indirect
|
github.com/docker/distribution v2.7.1+incompatible // indirect
|
||||||
@@ -18,14 +18,17 @@ require (
|
|||||||
github.com/go-gomail/gomail v0.0.0-20160411212932-81ebce5c23df
|
github.com/go-gomail/gomail v0.0.0-20160411212932-81ebce5c23df
|
||||||
github.com/google/go-cmp v0.3.0 // indirect
|
github.com/google/go-cmp v0.3.0 // indirect
|
||||||
github.com/gorilla/mux v1.7.2 // indirect
|
github.com/gorilla/mux v1.7.2 // indirect
|
||||||
|
github.com/hako/durafmt v0.0.0-20190612201238-650ed9f29a84
|
||||||
github.com/imdario/mergo v0.3.7
|
github.com/imdario/mergo v0.3.7
|
||||||
github.com/matcornic/hermes/v2 v2.0.2
|
github.com/matcornic/hermes/v2 v2.0.2
|
||||||
github.com/opencontainers/go-digest v1.0.0-rc1
|
github.com/opencontainers/go-digest v1.0.0-rc1
|
||||||
github.com/opencontainers/image-spec v1.0.1 // indirect
|
github.com/opencontainers/image-spec v1.0.1 // indirect
|
||||||
github.com/opencontainers/runc v0.1.1 // indirect
|
github.com/opencontainers/runc v0.1.1 // indirect
|
||||||
|
github.com/panjf2000/ants v1.0.0
|
||||||
github.com/prometheus/client_golang v0.9.4 // indirect
|
github.com/prometheus/client_golang v0.9.4 // indirect
|
||||||
github.com/rs/zerolog v1.14.3
|
github.com/rs/zerolog v1.14.3
|
||||||
github.com/sirupsen/logrus v1.4.2 // indirect
|
github.com/sirupsen/logrus v1.4.2 // indirect
|
||||||
|
github.com/stretchr/testify v1.3.0
|
||||||
go.etcd.io/bbolt v1.3.2
|
go.etcd.io/bbolt v1.3.2
|
||||||
gopkg.in/alecthomas/kingpin.v2 v2.2.6
|
gopkg.in/alecthomas/kingpin.v2 v2.2.6
|
||||||
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
|
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
|
||||||
|
|||||||
8
go.sum
8
go.sum
@@ -17,8 +17,8 @@ github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
|
|||||||
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
||||||
github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc h1:TP+534wVlf61smEIq1nwLLAjQVEK2EADoW3CX9AuT+8=
|
github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc h1:TP+534wVlf61smEIq1nwLLAjQVEK2EADoW3CX9AuT+8=
|
||||||
github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
|
github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
|
||||||
github.com/containers/image v1.5.1 h1:ssEuj1c24uJvdMkUa2IrawuEFZBP12p6WzrjNBTQxE0=
|
github.com/containers/image v2.0.0+incompatible h1:FTr6Br7jlIKNCKMjSOMbAxKp2keQ0//jzJaYNTVhauk=
|
||||||
github.com/containers/image v1.5.1/go.mod h1:8Vtij257IWSanUQKe1tAeNOm2sRVkSqQTVQ1IlwI3+M=
|
github.com/containers/image v2.0.0+incompatible/go.mod h1:8Vtij257IWSanUQKe1tAeNOm2sRVkSqQTVQ1IlwI3+M=
|
||||||
github.com/containers/storage v1.12.9 h1:kYUE0EBpYv9zwW+MEgXBDsoY5FzmHE5PNKXhWbAkLKg=
|
github.com/containers/storage v1.12.9 h1:kYUE0EBpYv9zwW+MEgXBDsoY5FzmHE5PNKXhWbAkLKg=
|
||||||
github.com/containers/storage v1.12.9/go.mod h1:+RirK6VQAqskQlaTBrOG6ulDvn4si2QjFE1NZCn06MM=
|
github.com/containers/storage v1.12.9/go.mod h1:+RirK6VQAqskQlaTBrOG6ulDvn4si2QjFE1NZCn06MM=
|
||||||
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
|
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
|
||||||
@@ -58,6 +58,8 @@ github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
|
|||||||
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/gorilla/mux v1.7.2 h1:zoNxOV7WjqXptQOVngLmcSQgXmgk4NMz1HibBchjl/I=
|
github.com/gorilla/mux v1.7.2 h1:zoNxOV7WjqXptQOVngLmcSQgXmgk4NMz1HibBchjl/I=
|
||||||
github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
|
github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
|
||||||
|
github.com/hako/durafmt v0.0.0-20190612201238-650ed9f29a84 h1:RvcDqcKLua4b/jtXez7ZVe9s6Iq5N6ujVevqY4FBQmM=
|
||||||
|
github.com/hako/durafmt v0.0.0-20190612201238-650ed9f29a84/go.mod h1:5Scbynm8dF1XAPwIwkGPqzkM/shndPm79Jd1003hTjE=
|
||||||
github.com/huandu/xstrings v1.2.0 h1:yPeWdRnmynF7p+lLYz0H2tthW9lqhMJrQV/U7yy4wX0=
|
github.com/huandu/xstrings v1.2.0 h1:yPeWdRnmynF7p+lLYz0H2tthW9lqhMJrQV/U7yy4wX0=
|
||||||
github.com/huandu/xstrings v1.2.0/go.mod h1:DvyZB1rfVYsBIigL8HwpZgxHwXozlTgGqn63UyNX5k4=
|
github.com/huandu/xstrings v1.2.0/go.mod h1:DvyZB1rfVYsBIigL8HwpZgxHwXozlTgGqn63UyNX5k4=
|
||||||
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
|
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
|
||||||
@@ -92,6 +94,8 @@ github.com/opencontainers/image-spec v1.0.1 h1:JMemWkRwHx4Zj+fVxWoMCFm/8sYGGrUVo
|
|||||||
github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
|
github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
|
||||||
github.com/opencontainers/runc v0.1.1 h1:GlxAyO6x8rfZYN9Tt0Kti5a/cP41iuiO2yYT0IJGY8Y=
|
github.com/opencontainers/runc v0.1.1 h1:GlxAyO6x8rfZYN9Tt0Kti5a/cP41iuiO2yYT0IJGY8Y=
|
||||||
github.com/opencontainers/runc v0.1.1/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rmF59PVA73FjuZG0U=
|
github.com/opencontainers/runc v0.1.1/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rmF59PVA73FjuZG0U=
|
||||||
|
github.com/panjf2000/ants v1.0.0 h1:MZBsUt8W6ktQfhIswUZpw17IJlXY6ly2+U5b9jxwad4=
|
||||||
|
github.com/panjf2000/ants v1.0.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY=
|
||||||
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
|
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
|
||||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
|
|||||||
@@ -1,49 +0,0 @@
|
|||||||
package app
|
|
||||||
|
|
||||||
type Collector struct {
|
|
||||||
Job chan Job
|
|
||||||
end chan bool
|
|
||||||
}
|
|
||||||
|
|
||||||
var workerChannel = make(chan chan Job)
|
|
||||||
|
|
||||||
func (di *Diun) StartDispatcher(workerCount int) Collector {
|
|
||||||
var i int
|
|
||||||
var workers []worker
|
|
||||||
input := make(chan Job)
|
|
||||||
end := make(chan bool)
|
|
||||||
collector := Collector{
|
|
||||||
Job: input,
|
|
||||||
end: end,
|
|
||||||
}
|
|
||||||
|
|
||||||
for i < workerCount {
|
|
||||||
i++
|
|
||||||
worker := worker{
|
|
||||||
id: i,
|
|
||||||
diun: di,
|
|
||||||
workerPool: workerChannel,
|
|
||||||
jobChannel: make(chan Job),
|
|
||||||
end: make(chan bool),
|
|
||||||
}
|
|
||||||
worker.Start()
|
|
||||||
workers = append(workers, worker)
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-end:
|
|
||||||
for _, w := range workers {
|
|
||||||
w.Stop()
|
|
||||||
}
|
|
||||||
return
|
|
||||||
case work := <-input:
|
|
||||||
worker := <-workerChannel
|
|
||||||
worker <- work
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return collector
|
|
||||||
}
|
|
||||||
@@ -1,18 +1,15 @@
|
|||||||
package app
|
package app
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/crazy-max/diun/internal/config"
|
"github.com/crazy-max/diun/internal/config"
|
||||||
"github.com/crazy-max/diun/internal/db"
|
"github.com/crazy-max/diun/internal/db"
|
||||||
"github.com/crazy-max/diun/internal/model"
|
|
||||||
"github.com/crazy-max/diun/internal/notif"
|
"github.com/crazy-max/diun/internal/notif"
|
||||||
"github.com/crazy-max/diun/internal/utl"
|
"github.com/hako/durafmt"
|
||||||
"github.com/crazy-max/diun/pkg/docker"
|
"github.com/panjf2000/ants"
|
||||||
"github.com/crazy-max/diun/pkg/docker/registry"
|
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -22,7 +19,8 @@ type Diun struct {
|
|||||||
db *db.Client
|
db *db.Client
|
||||||
notif *notif.Client
|
notif *notif.Client
|
||||||
locker uint32
|
locker uint32
|
||||||
collector Collector
|
pool *ants.PoolWithFunc
|
||||||
|
wg *sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates new diun instance
|
// New creates new diun instance
|
||||||
@@ -54,125 +52,34 @@ func (di *Diun) Run() {
|
|||||||
}
|
}
|
||||||
defer atomic.StoreUint32(&di.locker, 0)
|
defer atomic.StoreUint32(&di.locker, 0)
|
||||||
|
|
||||||
log.Info().Msg("Running process")
|
start := time.Now()
|
||||||
var wg sync.WaitGroup
|
defer di.trackTime(start, "Finished, total time spent: ")
|
||||||
di.collector = di.StartDispatcher(di.cfg.Watch.Workers)
|
|
||||||
|
|
||||||
// Iterate items
|
log.Info().Msg("Starting Diun...")
|
||||||
for _, item := range di.cfg.Items {
|
di.wg = new(sync.WaitGroup)
|
||||||
reg, err := docker.NewRegistryClient(docker.RegistryOptions{
|
di.pool, _ = ants.NewPoolWithFunc(di.cfg.Watch.Workers, func(i interface{}) {
|
||||||
Os: di.cfg.Watch.Os,
|
var err error
|
||||||
Arch: di.cfg.Watch.Arch,
|
switch t := i.(type) {
|
||||||
Username: item.Registry.Username,
|
case imageJob:
|
||||||
Password: item.Registry.Password,
|
err = di.imageJob(t)
|
||||||
Timeout: time.Duration(item.Registry.Timeout) * time.Second,
|
if err != nil {
|
||||||
InsecureTLS: item.Registry.InsecureTLS,
|
log.Error().Err(err).Msg("Job image error")
|
||||||
|
}
|
||||||
|
err = di.imageRepoJob(t)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msg("Job image repo error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
di.wg.Done()
|
||||||
})
|
})
|
||||||
if err != nil {
|
defer func() {
|
||||||
log.Error().Err(err).Str("image", item.Image).Msg("Cannot create registry client")
|
if err := di.pool.Release(); err != nil {
|
||||||
continue
|
log.Warn().Err(err).Msg("Cannot release pool")
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
image, err := registry.ParseImage(item.Image)
|
di.procImages()
|
||||||
if err != nil {
|
di.wg.Wait()
|
||||||
log.Error().Err(err).Str("image", item.Image).Msg("Cannot parse image")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
di.collector.Job <- Job{
|
|
||||||
ImageStr: item.Image,
|
|
||||||
Item: item,
|
|
||||||
Reg: reg,
|
|
||||||
Wg: &wg,
|
|
||||||
}
|
|
||||||
|
|
||||||
if image.Domain != "" && item.WatchRepo {
|
|
||||||
tags, err := reg.Tags(docker.TagsOptions{
|
|
||||||
Image: image,
|
|
||||||
Max: item.MaxTags,
|
|
||||||
Include: item.IncludeTags,
|
|
||||||
Exclude: item.ExcludeTags,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Error().Err(err).Str("image", image.String()).Msg("Cannot retrieve tags")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debug().Str("image", image.String()).Msgf("%d tag(s) found in repository. %d will be analyzed (%d max, %d not included, %d excluded).",
|
|
||||||
tags.Total,
|
|
||||||
len(tags.List),
|
|
||||||
item.MaxTags,
|
|
||||||
tags.NotIncluded,
|
|
||||||
tags.Excluded,
|
|
||||||
)
|
|
||||||
|
|
||||||
for _, tag := range tags.List {
|
|
||||||
wg.Add(1)
|
|
||||||
di.collector.Job <- Job{
|
|
||||||
ImageStr: fmt.Sprintf("%s/%s:%s", image.Domain, image.Path, tag),
|
|
||||||
Item: item,
|
|
||||||
Reg: reg,
|
|
||||||
Wg: &wg,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (di *Diun) analyze(job Job, workerID int) error {
|
|
||||||
defer job.Wg.Done()
|
|
||||||
image, err := registry.ParseImage(job.ImageStr)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !utl.IsIncluded(image.Tag, job.Item.IncludeTags) {
|
|
||||||
log.Warn().Str("image", image.String()).Int("worker_id", workerID).Msg("Tag not included")
|
|
||||||
return nil
|
|
||||||
} else if utl.IsExcluded(image.Tag, job.Item.ExcludeTags) {
|
|
||||||
log.Warn().Str("image", image.String()).Int("worker_id", workerID).Msg("Tag excluded")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
liveManifest, err := job.Reg.Manifest(image)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
/*b, _ := json.MarshalIndent(liveManifest, "", " ")
|
|
||||||
log.Debug().Msg(string(b))*/
|
|
||||||
|
|
||||||
dbManifest, err := di.db.GetManifest(image)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
status := model.ImageStatusUnchange
|
|
||||||
if dbManifest.Name == "" {
|
|
||||||
status = model.ImageStatusNew
|
|
||||||
log.Info().Str("image", image.String()).Int("worker_id", workerID).Msg("New image found")
|
|
||||||
} else if !liveManifest.Created.Equal(*dbManifest.Created) {
|
|
||||||
status = model.ImageStatusUpdate
|
|
||||||
log.Info().Str("image", image.String()).Int("worker_id", workerID).Msg("Image update found")
|
|
||||||
} else {
|
|
||||||
log.Debug().Str("image", image.String()).Int("worker_id", workerID).Msg("No changes")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := di.db.PutManifest(image, liveManifest); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
log.Debug().Str("image", image.String()).Int("worker_id", workerID).Msg("Manifest saved to database")
|
|
||||||
|
|
||||||
di.notif.Send(model.NotifEntry{
|
|
||||||
Status: status,
|
|
||||||
Image: image,
|
|
||||||
Manifest: liveManifest,
|
|
||||||
})
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes diun
|
// Close closes diun
|
||||||
@@ -181,3 +88,7 @@ func (di *Diun) Close() {
|
|||||||
log.Warn().Err(err).Msg("Cannot close database")
|
log.Warn().Err(err).Msg("Cannot close database")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (di *Diun) trackTime(start time.Time, prefix string) {
|
||||||
|
log.Info().Msgf("%s%s", prefix, durafmt.ParseShort(time.Since(start)).String())
|
||||||
|
}
|
||||||
|
|||||||
139
internal/app/image.go
Normal file
139
internal/app/image.go
Normal file
@@ -0,0 +1,139 @@
|
|||||||
|
package app
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/crazy-max/diun/internal/model"
|
||||||
|
"github.com/crazy-max/diun/internal/utl"
|
||||||
|
"github.com/crazy-max/diun/pkg/docker"
|
||||||
|
"github.com/crazy-max/diun/pkg/docker/registry"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type imageJob struct {
|
||||||
|
origin bool
|
||||||
|
image model.Image
|
||||||
|
registry *docker.RegistryClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func (di *Diun) procImages() {
|
||||||
|
// Iterate images
|
||||||
|
for _, img := range di.cfg.Image {
|
||||||
|
reg, err := docker.NewRegistryClient(docker.RegistryOptions{
|
||||||
|
Os: di.cfg.Watch.Os,
|
||||||
|
Arch: di.cfg.Watch.Arch,
|
||||||
|
Username: img.RegOpts.Username,
|
||||||
|
Password: img.RegOpts.Password,
|
||||||
|
Timeout: time.Duration(img.RegOpts.Timeout) * time.Second,
|
||||||
|
InsecureTLS: img.RegOpts.InsecureTLS,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Str("image", img.Name).Msg("Cannot create registry client")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
di.wg.Add(1)
|
||||||
|
err = di.pool.Invoke(imageJob{
|
||||||
|
origin: true,
|
||||||
|
image: img,
|
||||||
|
registry: reg,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msgf("Invoking image job")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (di *Diun) imageJob(job imageJob) error {
|
||||||
|
image, err := registry.ParseImage(job.image.Name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !utl.IsIncluded(image.Tag, job.image.IncludeTags) {
|
||||||
|
log.Warn().Str("image", image.String()).Msg("Tag not included")
|
||||||
|
return nil
|
||||||
|
} else if utl.IsExcluded(image.Tag, job.image.ExcludeTags) {
|
||||||
|
log.Warn().Str("image", image.String()).Msg("Tag excluded")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
liveManifest, err := job.registry.Manifest(image)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
/*b, _ := json.MarshalIndent(liveManifest, "", " ")
|
||||||
|
log.Debug().Msg(string(b))*/
|
||||||
|
|
||||||
|
dbManifest, err := di.db.GetManifest(image)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
status := model.ImageStatusUnchange
|
||||||
|
if dbManifest.Name == "" {
|
||||||
|
status = model.ImageStatusNew
|
||||||
|
log.Info().Str("image", image.String()).Msg("New image found")
|
||||||
|
} else if !liveManifest.Created.Equal(*dbManifest.Created) {
|
||||||
|
status = model.ImageStatusUpdate
|
||||||
|
log.Info().Str("image", image.String()).Msg("Image update found")
|
||||||
|
} else {
|
||||||
|
log.Debug().Str("image", image.String()).Msg("No changes")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := di.db.PutManifest(image, liveManifest); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Debug().Str("image", image.String()).Msg("Manifest saved to database")
|
||||||
|
|
||||||
|
di.notif.Send(model.NotifEntry{
|
||||||
|
Status: status,
|
||||||
|
Image: image,
|
||||||
|
Manifest: liveManifest,
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (di *Diun) imageRepoJob(job imageJob) error {
|
||||||
|
image, err := registry.ParseImage(job.image.Name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !job.origin || image.Domain == "" || !job.image.WatchRepo {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
tags, err := job.registry.Tags(docker.TagsOptions{
|
||||||
|
Image: image,
|
||||||
|
Max: job.image.MaxTags,
|
||||||
|
Include: job.image.IncludeTags,
|
||||||
|
Exclude: job.image.ExcludeTags,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug().Str("image", image.String()).Msgf("%d tag(s) found in repository. %d will be analyzed (%d max, %d not included, %d excluded).",
|
||||||
|
tags.Total,
|
||||||
|
len(tags.List),
|
||||||
|
job.image.MaxTags,
|
||||||
|
tags.NotIncluded,
|
||||||
|
tags.Excluded,
|
||||||
|
)
|
||||||
|
|
||||||
|
job.origin = false
|
||||||
|
for _, tag := range tags.List {
|
||||||
|
job.image.Name = fmt.Sprintf("%s/%s:%s", image.Domain, image.Path, tag)
|
||||||
|
di.wg.Add(1)
|
||||||
|
err = di.pool.Invoke(job)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Msgf("Invoking repo image job")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -1,49 +0,0 @@
|
|||||||
package app
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/crazy-max/diun/internal/model"
|
|
||||||
"github.com/crazy-max/diun/pkg/docker"
|
|
||||||
"github.com/rs/zerolog/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Job struct {
|
|
||||||
ImageStr string
|
|
||||||
Item model.Item
|
|
||||||
Reg *docker.RegistryClient
|
|
||||||
Wg *sync.WaitGroup
|
|
||||||
}
|
|
||||||
|
|
||||||
type worker struct {
|
|
||||||
id int
|
|
||||||
diun *Diun
|
|
||||||
workerPool chan chan Job
|
|
||||||
jobChannel chan Job
|
|
||||||
end chan bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start method starts the run loop for the worker
|
|
||||||
func (w *worker) Start() {
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
w.workerPool <- w.jobChannel
|
|
||||||
select {
|
|
||||||
case job := <-w.jobChannel:
|
|
||||||
if err := w.diun.analyze(job, w.id); err != nil {
|
|
||||||
log.Error().Err(err).
|
|
||||||
Str("image", job.ImageStr).
|
|
||||||
Int("worker_id", w.id).
|
|
||||||
Msg("Error analyzing image")
|
|
||||||
}
|
|
||||||
case <-w.end:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop signals the worker to stop listening for work requests.
|
|
||||||
func (w *worker) Stop() {
|
|
||||||
w.end <- true
|
|
||||||
}
|
|
||||||
@@ -23,15 +23,15 @@ type Config struct {
|
|||||||
Db model.Db `yaml:"db,omitempty"`
|
Db model.Db `yaml:"db,omitempty"`
|
||||||
Watch model.Watch `yaml:"watch,omitempty"`
|
Watch model.Watch `yaml:"watch,omitempty"`
|
||||||
Notif model.Notif `yaml:"notif,omitempty"`
|
Notif model.Notif `yaml:"notif,omitempty"`
|
||||||
Registries map[string]model.Registry `yaml:"registries,omitempty"`
|
RegOpts map[string]model.RegOpts `yaml:"regopts,omitempty"`
|
||||||
Items []model.Item `yaml:"items,omitempty"`
|
Image []model.Image `yaml:"image,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load returns Configuration struct
|
// Load returns Configuration struct
|
||||||
func Load(fl model.Flags, version string) (*Config, error) {
|
func Load(flags model.Flags, version string) (*Config, error) {
|
||||||
var err error
|
var err error
|
||||||
var cfg = Config{
|
var cfg = Config{
|
||||||
Flags: fl,
|
Flags: flags,
|
||||||
App: model.App{
|
App: model.App{
|
||||||
ID: "diun",
|
ID: "diun",
|
||||||
Name: "Diun",
|
Name: "Diun",
|
||||||
@@ -65,24 +65,27 @@ func Load(fl model.Flags, version string) (*Config, error) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = os.Lstat(fl.Cfgfile); err != nil {
|
if _, err = os.Lstat(flags.Cfgfile); err != nil {
|
||||||
return nil, fmt.Errorf("unable to open config file, %s", err)
|
return nil, fmt.Errorf("unable to open config file, %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
bytes, err := ioutil.ReadFile(fl.Cfgfile)
|
bytes, err := ioutil.ReadFile(flags.Cfgfile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("unable to read config file, %s", err)
|
return nil, fmt.Errorf("unable to read config file, %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := yaml.Unmarshal(bytes, &cfg); err != nil {
|
if err := yaml.UnmarshalStrict(bytes, &cfg); err != nil {
|
||||||
return nil, fmt.Errorf("unable to decode into struct, %v", err)
|
return nil, fmt.Errorf("unable to decode into struct, %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := cfg.validate(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return &cfg, nil
|
return &cfg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check verifies Config values
|
func (cfg *Config) validate() error {
|
||||||
func (cfg *Config) Check() error {
|
|
||||||
if cfg.Flags.Docker {
|
if cfg.Flags.Docker {
|
||||||
cfg.Db.Path = "/data/diun.db"
|
cfg.Db.Path = "/data/diun.db"
|
||||||
}
|
}
|
||||||
@@ -92,49 +95,14 @@ func (cfg *Config) Check() error {
|
|||||||
}
|
}
|
||||||
cfg.Db.Path = path.Clean(cfg.Db.Path)
|
cfg.Db.Path = path.Clean(cfg.Db.Path)
|
||||||
|
|
||||||
for id, reg := range cfg.Registries {
|
for id, regopts := range cfg.RegOpts {
|
||||||
if err := mergo.Merge(®, model.Registry{
|
if err := cfg.validateRegOpts(id, regopts); err != nil {
|
||||||
InsecureTLS: false,
|
return err
|
||||||
Timeout: 10,
|
|
||||||
}); err != nil {
|
|
||||||
return fmt.Errorf("cannot set default registry values for %s: %v", id, err)
|
|
||||||
}
|
|
||||||
cfg.Registries[id] = reg
|
|
||||||
}
|
|
||||||
|
|
||||||
for key, item := range cfg.Items {
|
|
||||||
if item.Image == "" {
|
|
||||||
return fmt.Errorf("image is required for item %d", key)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := mergo.Merge(&item, model.Item{
|
|
||||||
WatchRepo: false,
|
|
||||||
MaxTags: 25,
|
|
||||||
}); err != nil {
|
|
||||||
return fmt.Errorf("cannot set default item values for %s: %v", item.Image, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if item.RegistryID != "" {
|
|
||||||
reg, found := cfg.Registries[item.RegistryID]
|
|
||||||
if !found {
|
|
||||||
return fmt.Errorf("registry ID '%s' not found", item.RegistryID)
|
|
||||||
}
|
|
||||||
cfg.Items[key].Registry = reg
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, includeTag := range item.IncludeTags {
|
|
||||||
if _, err := regexp.Compile(includeTag); err != nil {
|
|
||||||
return fmt.Errorf("include tag regex '%s' for '%s' image cannot compile, %v", item.Image, includeTag, err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, excludeTag := range item.ExcludeTags {
|
for key, img := range cfg.Image {
|
||||||
if _, err := regexp.Compile(excludeTag); err != nil {
|
if err := cfg.validateImage(key, img); err != nil {
|
||||||
return fmt.Errorf("exclude tag regex '%s' for '%s' image cannot compile, %v", item.Image, excludeTag, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := mergo.Merge(&cfg.Items[key], item); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -151,6 +119,59 @@ func (cfg *Config) Check() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cfg *Config) validateRegOpts(id string, regopts model.RegOpts) error {
|
||||||
|
defTimeout := 10
|
||||||
|
if regopts.Timeout <= 0 {
|
||||||
|
defTimeout = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := mergo.Merge(®opts, model.RegOpts{
|
||||||
|
InsecureTLS: false,
|
||||||
|
Timeout: defTimeout,
|
||||||
|
}); err != nil {
|
||||||
|
return fmt.Errorf("cannot set default registry options values for %s: %v", id, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.RegOpts[id] = regopts
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cfg *Config) validateImage(key int, img model.Image) error {
|
||||||
|
if img.Name == "" {
|
||||||
|
return fmt.Errorf("name is required for image %d", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := mergo.Merge(&img, model.Image{
|
||||||
|
WatchRepo: false,
|
||||||
|
MaxTags: 0,
|
||||||
|
}); err != nil {
|
||||||
|
return fmt.Errorf("cannot set default image values for %s: %v", img.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if img.RegOptsID != "" {
|
||||||
|
regopts, found := cfg.RegOpts[img.RegOptsID]
|
||||||
|
if !found {
|
||||||
|
return fmt.Errorf("registry options %s not found for %s", img.RegOptsID, img.Name)
|
||||||
|
}
|
||||||
|
cfg.Image[key].RegOpts = regopts
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, includeTag := range img.IncludeTags {
|
||||||
|
if _, err := regexp.Compile(includeTag); err != nil {
|
||||||
|
return fmt.Errorf("include tag regex '%s' for %s cannot compile, %v", includeTag, img.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, excludeTag := range img.ExcludeTags {
|
||||||
|
if _, err := regexp.Compile(excludeTag); err != nil {
|
||||||
|
return fmt.Errorf("exclude tag regex '%s' for '%s' image cannot compile, %v", img.Name, excludeTag, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.Image[key] = img
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Display logs configuration in a pretty JSON format
|
// Display logs configuration in a pretty JSON format
|
||||||
func (cfg *Config) Display() {
|
func (cfg *Config) Display() {
|
||||||
b, _ := json.MarshalIndent(cfg, "", " ")
|
b, _ := json.MarshalIndent(cfg, "", " ")
|
||||||
|
|||||||
20
internal/model/image.go
Normal file
20
internal/model/image.go
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
package model
|
||||||
|
|
||||||
|
// RegOpts holds registry options configuration
|
||||||
|
type RegOpts struct {
|
||||||
|
Username string `yaml:"username,omitempty" json:",omitempty"`
|
||||||
|
Password string `yaml:"password,omitempty" json:",omitempty"`
|
||||||
|
InsecureTLS bool `yaml:"insecure_tls,omitempty" json:",omitempty"`
|
||||||
|
Timeout int `yaml:"timeout,omitempty" json:",omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Image holds image configuration
|
||||||
|
type Image struct {
|
||||||
|
Name string `yaml:"name,omitempty" json:",omitempty"`
|
||||||
|
RegOptsID string `yaml:"regopts_id,omitempty" json:",omitempty"`
|
||||||
|
WatchRepo bool `yaml:"watch_repo,omitempty" json:",omitempty"`
|
||||||
|
MaxTags int `yaml:"max_tags,omitempty" json:",omitempty"`
|
||||||
|
IncludeTags []string `yaml:"include_tags,omitempty" json:",omitempty"`
|
||||||
|
ExcludeTags []string `yaml:"exclude_tags,omitempty" json:",omitempty"`
|
||||||
|
RegOpts RegOpts `yaml:"-" json:"-"`
|
||||||
|
}
|
||||||
@@ -1,12 +0,0 @@
|
|||||||
package model
|
|
||||||
|
|
||||||
// Item holds item configuration for a Docker image
|
|
||||||
type Item struct {
|
|
||||||
Image string `yaml:"image,omitempty" json:",omitempty"`
|
|
||||||
RegistryID string `yaml:"registry_id,omitempty" json:",omitempty"`
|
|
||||||
WatchRepo bool `yaml:"watch_repo,omitempty" json:",omitempty"`
|
|
||||||
MaxTags int `yaml:"max_tags,omitempty" json:",omitempty"`
|
|
||||||
IncludeTags []string `yaml:"include_tags,omitempty" json:",omitempty"`
|
|
||||||
ExcludeTags []string `yaml:"exclude_tags,omitempty" json:",omitempty"`
|
|
||||||
Registry Registry `yaml:"-" json:"-"`
|
|
||||||
}
|
|
||||||
@@ -1,9 +0,0 @@
|
|||||||
package model
|
|
||||||
|
|
||||||
// Registry holds registry configuration
|
|
||||||
type Registry struct {
|
|
||||||
Username string `yaml:"username,omitempty" json:",omitempty"`
|
|
||||||
Password string `yaml:"password,omitempty" json:",omitempty"`
|
|
||||||
InsecureTLS bool `yaml:"insecure_tls,omitempty" json:",omitempty"`
|
|
||||||
Timeout int `yaml:"timeout,omitempty" json:",omitempty"`
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user