mirror of
https://github.com/amir20/dozzle.git
synced 2025-12-27 15:41:45 +01:00
Cleans up events and js layer (#895)
This commit is contained in:
@@ -69,8 +69,8 @@ event: containers-changed
|
||||
data: []
|
||||
|
||||
|
||||
event: container-event
|
||||
data: {"Type":"","Action":"start","Actor":{"ID":"","Attributes":null}}
|
||||
event: container-start
|
||||
data: {"actorId":"1234","name":"start"}
|
||||
|
||||
/* snapshot: Test_handler_streamLogs_error_finding_container */
|
||||
HTTP/1.1 404 Not Found
|
||||
|
||||
@@ -20,10 +20,7 @@ const state = {
|
||||
|
||||
const mutations = {
|
||||
SET_CONTAINERS(state, containers) {
|
||||
const containersById = state.containers.reduce((map, obj) => {
|
||||
map[obj.id] = obj;
|
||||
return map;
|
||||
}, {});
|
||||
const containersById = getters.allContainersById({ containers });
|
||||
|
||||
containers.forEach(
|
||||
(container) =>
|
||||
@@ -51,11 +48,10 @@ const mutations = {
|
||||
state.settings = { ...state.settings, ...newValues };
|
||||
storage.set(DOZZLE_SETTINGS_KEY, state.settings);
|
||||
},
|
||||
UPDATE_STAT(_, { container, stat }) {
|
||||
Vue.set(container, "stat", stat);
|
||||
},
|
||||
UPDATE_STATE(_, { container, state }) {
|
||||
Vue.set(container, "state", state);
|
||||
UPDATE_CONTAINER(_, { container, data }) {
|
||||
for (const [key, value] of Object.entries(data)) {
|
||||
Vue.set(container, key, value);
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
@@ -75,14 +71,14 @@ const actions = {
|
||||
UPDATE_STATS({ commit, getters: { allContainersById } }, stat) {
|
||||
const container = allContainersById[stat.id];
|
||||
if (container) {
|
||||
commit("UPDATE_STAT", { container, stat });
|
||||
commit("UPDATE_CONTAINER", { container, data: { stat } });
|
||||
}
|
||||
},
|
||||
CONTAINER_EVENT({ commit, getters: { allContainersById } }, event) {
|
||||
switch (event.status) {
|
||||
UPDATE_CONTAINER({ commit, getters: { allContainersById } }, event) {
|
||||
switch (event.name) {
|
||||
case "die":
|
||||
const container = allContainersById[event.Actor.ID.substr(0, 12)];
|
||||
commit("UPDATE_STATE", { container, state: "exited" });
|
||||
const container = allContainersById[event.actorId];
|
||||
commit("UPDATE_CONTAINER", { container, data: { state: "exited" } });
|
||||
break;
|
||||
default:
|
||||
}
|
||||
@@ -108,7 +104,7 @@ const getters = {
|
||||
const es = new EventSource(`${config.base}/api/events/stream`);
|
||||
es.addEventListener("containers-changed", (e) => store.commit("SET_CONTAINERS", JSON.parse(e.data)), false);
|
||||
es.addEventListener("container-stat", (e) => store.dispatch("UPDATE_STATS", JSON.parse(e.data)), false);
|
||||
es.addEventListener("container-event", (e) => store.dispatch("CONTAINER_EVENT", JSON.parse(e.data)), false);
|
||||
es.addEventListener("container-die", (e) => store.dispatch("UPDATE_CONTAINER", JSON.parse(e.data)), false);
|
||||
|
||||
mql.addEventListener("change", (e) => store.commit("SET_MOBILE_WIDTH", e.matches));
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ type Client interface {
|
||||
ListContainers() ([]Container, error)
|
||||
FindContainer(string) (Container, error)
|
||||
ContainerLogs(context.Context, string, int, string) (<-chan string, <-chan error)
|
||||
Events(context.Context) (<-chan events.Message, <-chan error)
|
||||
Events(context.Context) (<-chan ContainerEvent, <-chan error)
|
||||
ContainerLogsBetweenDates(context.Context, string, time.Time, time.Time) ([]string, error)
|
||||
ContainerStats(context.Context, string, chan<- ContainerStat) error
|
||||
}
|
||||
@@ -150,7 +150,6 @@ func logReader(reader io.ReadCloser, tty bool) func() (string, error) {
|
||||
}
|
||||
|
||||
func (d *dockerClient) ContainerStats(ctx context.Context, id string, stats chan<- ContainerStat) error {
|
||||
id = id[:12]
|
||||
response, err := d.cli.ContainerStats(ctx, id, true)
|
||||
|
||||
if err != nil {
|
||||
@@ -242,8 +241,30 @@ func (d *dockerClient) ContainerLogs(ctx context.Context, id string, tailSize in
|
||||
return messages, errChannel
|
||||
}
|
||||
|
||||
func (d *dockerClient) Events(ctx context.Context) (<-chan events.Message, <-chan error) {
|
||||
return d.cli.Events(ctx, types.EventsOptions{})
|
||||
func (d *dockerClient) Events(ctx context.Context) (<-chan ContainerEvent, <-chan error) {
|
||||
dockerMessages, errors := d.cli.Events(ctx, types.EventsOptions{})
|
||||
messages := make(chan ContainerEvent)
|
||||
|
||||
go func() {
|
||||
defer close(messages)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case message, ok := <-dockerMessages:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
messages <- ContainerEvent{
|
||||
ActorID: message.Actor.ID[:12],
|
||||
Name: message.Action,
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return messages, errors
|
||||
}
|
||||
|
||||
func (d *dockerClient) ContainerLogsBetweenDates(ctx context.Context, id string, from time.Time, to time.Time) ([]string, error) {
|
||||
|
||||
@@ -20,3 +20,9 @@ type ContainerStat struct {
|
||||
MemoryPercent int64 `json:"memory"`
|
||||
MemoryUsage int64 `json:"memoryUsage"`
|
||||
}
|
||||
|
||||
// ContainerEvent represents events that are triggered
|
||||
type ContainerEvent struct {
|
||||
ActorID string `json:"actorId"`
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
3
jsconfig.json
Normal file
3
jsconfig.json
Normal file
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"include": ["./assets/**/*"]
|
||||
}
|
||||
21
main_test.go
21
main_test.go
@@ -13,7 +13,6 @@ import (
|
||||
|
||||
"github.com/amir20/dozzle/docker"
|
||||
"github.com/beme/abide"
|
||||
"github.com/docker/docker/api/types/events"
|
||||
"github.com/gobuffalo/packr"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -56,9 +55,9 @@ func (m *MockedClient) ContainerLogs(ctx context.Context, id string, tailSize in
|
||||
return channel, err
|
||||
}
|
||||
|
||||
func (m *MockedClient) Events(ctx context.Context) (<-chan events.Message, <-chan error) {
|
||||
func (m *MockedClient) Events(ctx context.Context) (<-chan docker.ContainerEvent, <-chan error) {
|
||||
args := m.Called(ctx)
|
||||
channel, ok := args.Get(0).(chan events.Message)
|
||||
channel, ok := args.Get(0).(chan docker.ContainerEvent)
|
||||
if !ok {
|
||||
panic("channel is not of type chan events.Message")
|
||||
}
|
||||
@@ -205,17 +204,19 @@ func Test_handler_streamEvents_happy(t *testing.T) {
|
||||
req, err := http.NewRequest("GET", "/api/events/stream", nil)
|
||||
require.NoError(t, err, "NewRequest should not return an error.")
|
||||
mockedClient := new(MockedClient)
|
||||
messages := make(chan events.Message)
|
||||
messages := make(chan docker.ContainerEvent)
|
||||
errChannel := make(chan error)
|
||||
mockedClient.On("Events", mock.Anything).Return(messages, errChannel)
|
||||
mockedClient.On("ListContainers").Return([]docker.Container{}, nil)
|
||||
|
||||
go func() {
|
||||
messages <- events.Message{
|
||||
Action: "start",
|
||||
messages <- docker.ContainerEvent{
|
||||
Name: "start",
|
||||
ActorID: "1234",
|
||||
}
|
||||
messages <- events.Message{
|
||||
Action: "something-random",
|
||||
messages <- docker.ContainerEvent{
|
||||
Name: "something-random",
|
||||
ActorID: "1234",
|
||||
}
|
||||
close(messages)
|
||||
}()
|
||||
@@ -232,7 +233,7 @@ func Test_handler_streamEvents_error(t *testing.T) {
|
||||
req, err := http.NewRequest("GET", "/api/events/stream", nil)
|
||||
require.NoError(t, err, "NewRequest should not return an error.")
|
||||
mockedClient := new(MockedClient)
|
||||
messages := make(chan events.Message)
|
||||
messages := make(chan docker.ContainerEvent)
|
||||
errChannel := make(chan error)
|
||||
mockedClient.On("Events", mock.Anything).Return(messages, errChannel)
|
||||
mockedClient.On("ListContainers").Return([]docker.Container{}, nil)
|
||||
@@ -256,7 +257,7 @@ func Test_handler_streamEvents_error_request(t *testing.T) {
|
||||
|
||||
mockedClient := new(MockedClient)
|
||||
|
||||
messages := make(chan events.Message)
|
||||
messages := make(chan docker.ContainerEvent)
|
||||
errChannel := make(chan error)
|
||||
mockedClient.On("Events", mock.Anything).Return(messages, errChannel)
|
||||
mockedClient.On("ListContainers").Return([]docker.Container{}, nil)
|
||||
|
||||
20
routes.go
20
routes.go
@@ -154,7 +154,7 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
ctx := r.Context()
|
||||
|
||||
messages, err := h.client.Events(ctx)
|
||||
events, err := h.client.Events(ctx)
|
||||
stats := make(chan docker.ContainerStat)
|
||||
|
||||
if containers, err := h.client.ListContainers(); err == nil {
|
||||
@@ -182,16 +182,16 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
f.Flush()
|
||||
case message, ok := <-messages:
|
||||
case event, ok := <-events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
switch message.Action {
|
||||
switch event.Name {
|
||||
case "start", "die":
|
||||
log.Debugf("Triggering docker event: %v", message.Action)
|
||||
if message.Action == "start" {
|
||||
log.Debugf("Found new container with id: %v", message.Actor.ID)
|
||||
if err := h.client.ContainerStats(ctx, message.Actor.ID, stats); err != nil {
|
||||
log.Debugf("Triggering docker event: %v", event.Name)
|
||||
if event.Name == "start" {
|
||||
log.Debugf("Found new container with id: %v", event.ActorID)
|
||||
if err := h.client.ContainerStats(ctx, event.ActorID, stats); err != nil {
|
||||
log.Errorf("Error when streaming new container stats: %v", err)
|
||||
}
|
||||
if err := sendContainersJSON(h.client, w); err != nil {
|
||||
@@ -200,15 +200,15 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
bytes, _ := json.Marshal(message)
|
||||
if _, err := fmt.Fprintf(w, "event: container-event\ndata: %s\n\n", string(bytes)); err != nil {
|
||||
bytes, _ := json.Marshal(event)
|
||||
if _, err := fmt.Fprintf(w, "event: container-%s\ndata: %s\n\n", event.Name, string(bytes)); err != nil {
|
||||
log.Debugf("Error writing event to event stream: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
f.Flush()
|
||||
default:
|
||||
log.Debugf("Ignoring docker event: %v", message.Action)
|
||||
log.Debugf("Ignoring docker event: %v", event.Name)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user