mirror of
https://github.com/amir20/dozzle.git
synced 2026-01-02 19:17:37 +01:00
chore: removes pointers to logevents
This commit is contained in:
@@ -92,7 +92,7 @@ func rpcErrToErr(err error) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) LogsBetweenDates(ctx context.Context, containerID string, since time.Time, until time.Time, std container.StdType) (<-chan *container.LogEvent, error) {
|
||||
func (c *Client) LogsBetweenDates(ctx context.Context, containerID string, since time.Time, until time.Time, std container.StdType) (<-chan container.LogEvent, error) {
|
||||
stream, err := c.client.LogsBetweenDates(ctx, &pb.LogsBetweenDatesRequest{
|
||||
ContainerId: containerID,
|
||||
Since: timestamppb.New(since),
|
||||
@@ -104,7 +104,7 @@ func (c *Client) LogsBetweenDates(ctx context.Context, containerID string, since
|
||||
return nil, err
|
||||
}
|
||||
|
||||
events := make(chan *container.LogEvent)
|
||||
events := make(chan container.LogEvent)
|
||||
|
||||
go func() {
|
||||
sendLogs(stream, events)
|
||||
@@ -114,7 +114,7 @@ func (c *Client) LogsBetweenDates(ctx context.Context, containerID string, since
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func (c *Client) StreamContainerLogs(ctx context.Context, containerID string, since time.Time, std container.StdType, events chan<- *container.LogEvent) error {
|
||||
func (c *Client) StreamContainerLogs(ctx context.Context, containerID string, since time.Time, std container.StdType, events chan<- container.LogEvent) error {
|
||||
stream, err := c.client.StreamLogs(ctx, &pb.StreamLogsRequest{
|
||||
ContainerId: containerID,
|
||||
Since: timestamppb.New(since),
|
||||
@@ -128,7 +128,7 @@ func (c *Client) StreamContainerLogs(ctx context.Context, containerID string, si
|
||||
return sendLogs(stream, events)
|
||||
}
|
||||
|
||||
func sendLogs(stream pb.AgentService_StreamLogsClient, events chan<- *container.LogEvent) error {
|
||||
func sendLogs(stream pb.AgentService_StreamLogsClient, events chan<- container.LogEvent) error {
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
@@ -154,7 +154,7 @@ func sendLogs(stream pb.AgentService_StreamLogsClient, events chan<- *container.
|
||||
continue
|
||||
}
|
||||
|
||||
events <- &container.LogEvent{
|
||||
events <- container.LogEvent{
|
||||
Id: resp.Event.Id,
|
||||
ContainerID: resp.Event.ContainerId,
|
||||
Message: message,
|
||||
|
||||
@@ -65,7 +65,7 @@ func (s *server) StreamLogs(in *pb.StreamLogsRequest, out pb.AgentService_Stream
|
||||
|
||||
for event := range g.Events {
|
||||
out.Send(&pb.StreamLogsResponse{
|
||||
Event: logEventToPb(event),
|
||||
Event: logEventToPb(&event),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -95,7 +95,7 @@ func (s *server) LogsBetweenDates(in *pb.LogsBetweenDatesRequest, out pb.AgentSe
|
||||
select {
|
||||
case event := <-g.Events:
|
||||
out.Send(&pb.StreamLogsResponse{
|
||||
Event: logEventToPb(event),
|
||||
Event: logEventToPb(&event),
|
||||
})
|
||||
case e := <-g.Errors:
|
||||
return e
|
||||
|
||||
@@ -17,11 +17,11 @@ import (
|
||||
)
|
||||
|
||||
type EventGenerator struct {
|
||||
Events chan *LogEvent
|
||||
Events chan LogEvent
|
||||
Errors chan error
|
||||
reader LogReader
|
||||
next *LogEvent
|
||||
buffer chan *LogEvent
|
||||
buffer chan LogEvent
|
||||
wg sync.WaitGroup
|
||||
containerID string
|
||||
ctx context.Context
|
||||
@@ -36,9 +36,9 @@ type LogReader interface {
|
||||
func NewEventGenerator(ctx context.Context, reader LogReader, container Container) *EventGenerator {
|
||||
generator := &EventGenerator{
|
||||
reader: reader,
|
||||
buffer: make(chan *LogEvent, 100),
|
||||
buffer: make(chan LogEvent, 100),
|
||||
Errors: make(chan error, 1),
|
||||
Events: make(chan *LogEvent),
|
||||
Events: make(chan LogEvent),
|
||||
containerID: container.ID,
|
||||
ctx: ctx,
|
||||
}
|
||||
@@ -49,24 +49,29 @@ func NewEventGenerator(ctx context.Context, reader LogReader, container Containe
|
||||
}
|
||||
|
||||
func (g *EventGenerator) processBuffer() {
|
||||
var current, next *LogEvent
|
||||
var current, next LogEvent
|
||||
var hasNext bool
|
||||
|
||||
loop:
|
||||
for {
|
||||
if g.next != nil {
|
||||
current = g.next
|
||||
current = *g.next
|
||||
g.next = nil
|
||||
next = g.peek()
|
||||
next, hasNext = g.peek()
|
||||
} else {
|
||||
event, ok := <-g.buffer
|
||||
if !ok {
|
||||
break loop
|
||||
}
|
||||
current = event
|
||||
next = g.peek()
|
||||
next, hasNext = g.peek()
|
||||
}
|
||||
|
||||
checkPosition(current, next)
|
||||
if hasNext {
|
||||
checkPosition(¤t, &next)
|
||||
} else {
|
||||
checkPosition(¤t, nil)
|
||||
}
|
||||
|
||||
select {
|
||||
case g.Events <- current:
|
||||
@@ -101,23 +106,23 @@ func (g *EventGenerator) consumeReader() {
|
||||
g.wg.Done()
|
||||
}
|
||||
|
||||
func (g *EventGenerator) peek() *LogEvent {
|
||||
func (g *EventGenerator) peek() (LogEvent, bool) {
|
||||
if g.next != nil {
|
||||
return g.next
|
||||
return *g.next, true
|
||||
}
|
||||
select {
|
||||
case event := <-g.buffer:
|
||||
g.next = event
|
||||
return g.next
|
||||
g.next = &event
|
||||
return event, true
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
return nil
|
||||
return LogEvent{}, false
|
||||
}
|
||||
}
|
||||
|
||||
func createEvent(message string, streamType StdType) *LogEvent {
|
||||
func createEvent(message string, streamType StdType) LogEvent {
|
||||
h := fnv.New32a()
|
||||
h.Write([]byte(message))
|
||||
logEvent := &LogEvent{Id: h.Sum32(), Message: message, Stream: streamType.String()}
|
||||
logEvent := LogEvent{Id: h.Sum32(), Message: message, Stream: streamType.String()}
|
||||
if index := strings.IndexAny(message, " "); index != -1 {
|
||||
logId := message[:index]
|
||||
if timestamp, err := time.Parse(time.RFC3339Nano, logId); err == nil {
|
||||
@@ -157,20 +162,20 @@ func createEvent(message string, streamType StdType) *LogEvent {
|
||||
}
|
||||
|
||||
func checkPosition(currentEvent *LogEvent, nextEvent *LogEvent) {
|
||||
currentLevel := guessLogLevel(currentEvent)
|
||||
currentLevel := guessLogLevel(*currentEvent)
|
||||
if nextEvent != nil {
|
||||
if currentEvent.IsCloseToTime(nextEvent) && currentLevel != "unknown" && !nextEvent.HasLevel() {
|
||||
if currentEvent.IsCloseToTime(*nextEvent) && currentLevel != "unknown" && !nextEvent.HasLevel() {
|
||||
currentEvent.Position = Beginning
|
||||
nextEvent.Position = Middle
|
||||
}
|
||||
|
||||
// If next item is not close to current item or has level, set current item position to end
|
||||
if currentEvent.Position == Middle && (nextEvent.HasLevel() || !currentEvent.IsCloseToTime(nextEvent)) {
|
||||
if currentEvent.Position == Middle && (nextEvent.HasLevel() || !currentEvent.IsCloseToTime(*nextEvent)) {
|
||||
currentEvent.Position = End
|
||||
}
|
||||
|
||||
// If next item is close to current item and has no level, set next item position to middle
|
||||
if currentEvent.Position == Middle && !nextEvent.HasLevel() && currentEvent.IsCloseToTime(nextEvent) {
|
||||
if currentEvent.Position == Middle && !nextEvent.HasLevel() && currentEvent.IsCloseToTime(*nextEvent) {
|
||||
nextEvent.Position = Middle
|
||||
}
|
||||
// Set next item level to current item level
|
||||
|
||||
@@ -96,14 +96,14 @@ func Test_createEvent(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want *LogEvent
|
||||
want LogEvent
|
||||
}{
|
||||
{
|
||||
name: "empty message",
|
||||
args: args{
|
||||
message: "",
|
||||
},
|
||||
want: &LogEvent{
|
||||
want: LogEvent{
|
||||
Message: "",
|
||||
},
|
||||
}, {
|
||||
@@ -111,7 +111,7 @@ func Test_createEvent(t *testing.T) {
|
||||
args: args{
|
||||
message: "2020-05-13T18:55:37.772853839Z {\"xyz\": \"value\", \"abc\": \"value2\"}",
|
||||
},
|
||||
want: &LogEvent{
|
||||
want: LogEvent{
|
||||
Message: data,
|
||||
},
|
||||
},
|
||||
@@ -120,7 +120,7 @@ func Test_createEvent(t *testing.T) {
|
||||
args: args{
|
||||
message: "2020-05-13T18:55:37.772853839Z {\"key\"}",
|
||||
},
|
||||
want: &LogEvent{
|
||||
want: LogEvent{
|
||||
Message: "{\"key\"}",
|
||||
},
|
||||
},
|
||||
@@ -129,7 +129,7 @@ func Test_createEvent(t *testing.T) {
|
||||
args: args{
|
||||
message: "2020-05-13T18:55:37.772853839Z 123",
|
||||
},
|
||||
want: &LogEvent{
|
||||
want: LogEvent{
|
||||
Message: "123",
|
||||
},
|
||||
},
|
||||
@@ -138,7 +138,7 @@ func Test_createEvent(t *testing.T) {
|
||||
args: args{
|
||||
message: "2020-05-13T18:55:37.772853839Z sample text with=equal sign",
|
||||
},
|
||||
want: &LogEvent{
|
||||
want: LogEvent{
|
||||
Message: "sample text with=equal sign",
|
||||
},
|
||||
},
|
||||
@@ -147,7 +147,7 @@ func Test_createEvent(t *testing.T) {
|
||||
args: args{
|
||||
message: "2020-05-13T18:55:37.772853839Z null",
|
||||
},
|
||||
want: &LogEvent{
|
||||
want: LogEvent{
|
||||
Message: "",
|
||||
},
|
||||
},
|
||||
|
||||
@@ -41,7 +41,7 @@ func init() {
|
||||
SupportedLogLevels["unknown"] = struct{}{}
|
||||
}
|
||||
|
||||
func guessLogLevel(logEvent *LogEvent) string {
|
||||
func guessLogLevel(logEvent LogEvent) string {
|
||||
switch value := logEvent.Message.(type) {
|
||||
case string:
|
||||
value = stripANSI(value)
|
||||
|
||||
@@ -72,7 +72,7 @@ func TestGuessLogLevel(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
name, _ := json.Marshal(test.input)
|
||||
t.Run(string(name), func(t *testing.T) {
|
||||
actual := guessLogLevel(&LogEvent{Message: test.input})
|
||||
actual := guessLogLevel(LogEvent{Message: test.input})
|
||||
if actual != test.expected {
|
||||
t.Errorf("Expected %s, got %s", test.expected, actual)
|
||||
}
|
||||
|
||||
@@ -176,14 +176,14 @@ type LogEvent struct {
|
||||
ContainerID string `json:"c,omitempty"`
|
||||
}
|
||||
|
||||
func (l *LogEvent) HasLevel() bool {
|
||||
func (l LogEvent) HasLevel() bool {
|
||||
return l.Level != "unknown"
|
||||
}
|
||||
|
||||
func (l *LogEvent) IsCloseToTime(other *LogEvent) bool {
|
||||
func (l LogEvent) IsCloseToTime(other LogEvent) bool {
|
||||
return math.Abs(float64(l.Timestamp-other.Timestamp)) < 10
|
||||
}
|
||||
|
||||
func (l *LogEvent) MessageId() int64 {
|
||||
func (l LogEvent) MessageId() int64 {
|
||||
return l.Timestamp
|
||||
}
|
||||
|
||||
@@ -32,11 +32,11 @@ func (a *agentService) RawLogs(ctx context.Context, container container.Containe
|
||||
return a.client.StreamRawBytes(ctx, container.ID, from, to, stdTypes)
|
||||
}
|
||||
|
||||
func (a *agentService) LogsBetweenDates(ctx context.Context, container container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan *container.LogEvent, error) {
|
||||
func (a *agentService) LogsBetweenDates(ctx context.Context, container container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan container.LogEvent, error) {
|
||||
return a.client.LogsBetweenDates(ctx, container.ID, from, to, stdTypes)
|
||||
}
|
||||
|
||||
func (a *agentService) StreamLogs(ctx context.Context, container container.Container, from time.Time, stdTypes container.StdType, events chan<- *container.LogEvent) error {
|
||||
func (a *agentService) StreamLogs(ctx context.Context, container container.Container, from time.Time, stdTypes container.StdType, events chan<- container.LogEvent) error {
|
||||
return a.client.StreamContainerLogs(ctx, container.ID, from, stdTypes, events)
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ type ClientService interface {
|
||||
ListContainers(ctx context.Context, filter container.ContainerLabels) ([]container.Container, error)
|
||||
Host(ctx context.Context) (container.Host, error)
|
||||
ContainerAction(ctx context.Context, container container.Container, action container.ContainerAction) error
|
||||
LogsBetweenDates(ctx context.Context, container container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan *container.LogEvent, error)
|
||||
LogsBetweenDates(ctx context.Context, container container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan container.LogEvent, error)
|
||||
RawLogs(context.Context, container.Container, time.Time, time.Time, container.StdType) (io.ReadCloser, error)
|
||||
|
||||
// Subscriptions
|
||||
@@ -24,7 +24,7 @@ type ClientService interface {
|
||||
SubscribeContainersStarted(context.Context, chan<- container.Container)
|
||||
|
||||
// Blocking streaming functions that should be used in a goroutine
|
||||
StreamLogs(context.Context, container.Container, time.Time, container.StdType, chan<- *container.LogEvent) error
|
||||
StreamLogs(context.Context, container.Container, time.Time, container.StdType, chan<- container.LogEvent) error
|
||||
|
||||
// Terminal
|
||||
Attach(context.Context, container.Container, io.Reader, io.Writer) error
|
||||
|
||||
@@ -24,11 +24,11 @@ func (c *ContainerService) RawLogs(ctx context.Context, from time.Time, to time.
|
||||
return c.clientService.RawLogs(ctx, c.Container, from, to, stdTypes)
|
||||
}
|
||||
|
||||
func (c *ContainerService) LogsBetweenDates(ctx context.Context, from time.Time, to time.Time, stdTypes container.StdType) (<-chan *container.LogEvent, error) {
|
||||
func (c *ContainerService) LogsBetweenDates(ctx context.Context, from time.Time, to time.Time, stdTypes container.StdType) (<-chan container.LogEvent, error) {
|
||||
return c.clientService.LogsBetweenDates(ctx, c.Container, from, to, stdTypes)
|
||||
}
|
||||
|
||||
func (c *ContainerService) StreamLogs(ctx context.Context, from time.Time, stdTypes container.StdType, events chan<- *container.LogEvent) error {
|
||||
func (c *ContainerService) StreamLogs(ctx context.Context, from time.Time, stdTypes container.StdType, events chan<- container.LogEvent) error {
|
||||
return c.clientService.StreamLogs(ctx, c.Container, from, stdTypes, events)
|
||||
}
|
||||
|
||||
|
||||
@@ -52,7 +52,7 @@ func (d *DockerClientService) RawLogs(ctx context.Context, container container.C
|
||||
|
||||
}
|
||||
|
||||
func (d *DockerClientService) LogsBetweenDates(ctx context.Context, c container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan *container.LogEvent, error) {
|
||||
func (d *DockerClientService) LogsBetweenDates(ctx context.Context, c container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan container.LogEvent, error) {
|
||||
reader, err := d.client.ContainerLogsBetweenDates(ctx, c.ID, from, to, stdTypes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -63,7 +63,7 @@ func (d *DockerClientService) LogsBetweenDates(ctx context.Context, c container.
|
||||
return g.Events, nil
|
||||
}
|
||||
|
||||
func (d *DockerClientService) StreamLogs(ctx context.Context, c container.Container, from time.Time, stdTypes container.StdType, events chan<- *container.LogEvent) error {
|
||||
func (d *DockerClientService) StreamLogs(ctx context.Context, c container.Container, from time.Time, stdTypes container.StdType, events chan<- container.LogEvent) error {
|
||||
reader, err := d.client.ContainerLogs(ctx, c.ID, from, stdTypes)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -45,7 +45,7 @@ func (k *K8sClientService) ContainerAction(ctx context.Context, container contai
|
||||
return k.client.ContainerActions(ctx, action, container.ID)
|
||||
}
|
||||
|
||||
func (k *K8sClientService) LogsBetweenDates(ctx context.Context, c container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan *container.LogEvent, error) {
|
||||
func (k *K8sClientService) LogsBetweenDates(ctx context.Context, c container.Container, from time.Time, to time.Time, stdTypes container.StdType) (<-chan container.LogEvent, error) {
|
||||
reader, err := k.client.ContainerLogsBetweenDates(ctx, c.ID, from, to, stdTypes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -60,7 +60,7 @@ func (k *K8sClientService) RawLogs(ctx context.Context, container container.Cont
|
||||
return k.client.ContainerLogsBetweenDates(ctx, container.ID, from, to, stdTypes)
|
||||
}
|
||||
|
||||
func (k *K8sClientService) StreamLogs(ctx context.Context, c container.Container, from time.Time, stdTypes container.StdType, events chan<- *container.LogEvent) error {
|
||||
func (k *K8sClientService) StreamLogs(ctx context.Context, c container.Container, from time.Time, stdTypes container.StdType, events chan<- container.LogEvent) error {
|
||||
reader, err := k.client.ContainerLogs(ctx, c.ID, from, stdTypes)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -16,7 +16,7 @@ func ParseRegex(search string) (*regexp.Regexp, error) {
|
||||
return CreateRegex(search, search == strings.ToLower(search))
|
||||
}
|
||||
|
||||
func Search(re *regexp.Regexp, logEvent *container.LogEvent) bool {
|
||||
func Search(re *regexp.Regexp, logEvent container.LogEvent) bool {
|
||||
matcher := NewPatternMatcher(re, MarkerStart, MarkerEnd)
|
||||
return matcher.MarkInLogEvent(logEvent)
|
||||
return matcher.MarkInLogEvent(&logEvent)
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ func (h *handler) fetchLogsBetweenDates(w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
|
||||
minimum := 0
|
||||
buffer := utils.NewRingBuffer[*container.LogEvent](500)
|
||||
buffer := utils.NewRingBuffer[container.LogEvent](500)
|
||||
if r.URL.Query().Has("min") {
|
||||
minimum, err = strconv.Atoi(r.URL.Query().Get("min"))
|
||||
if err != nil {
|
||||
@@ -94,7 +94,7 @@ func (h *handler) fetchLogsBetweenDates(w http.ResponseWriter, r *http.Request)
|
||||
http.Error(w, errors.New("minimum must be between 0 and buffer size").Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
buffer = utils.NewRingBuffer[*container.LogEvent](minimum)
|
||||
buffer = utils.NewRingBuffer[container.LogEvent](minimum)
|
||||
}
|
||||
|
||||
maxStart := math.MaxInt
|
||||
@@ -177,7 +177,7 @@ func (h *handler) fetchLogsBetweenDates(w http.ResponseWriter, r *http.Request)
|
||||
break
|
||||
}
|
||||
|
||||
support_web.EscapeHTMLValues(event)
|
||||
support_web.EscapeHTMLValues(&event)
|
||||
buffer.Push(event)
|
||||
}
|
||||
}
|
||||
@@ -294,9 +294,9 @@ func (h *handler) streamLogsForContainers(w http.ResponseWriter, r *http.Request
|
||||
|
||||
absoluteTime := time.Time{}
|
||||
var regex *regexp.Regexp
|
||||
liveLogs := make(chan *container.LogEvent)
|
||||
liveLogs := make(chan container.LogEvent)
|
||||
events := make(chan *container.ContainerEvent, 1)
|
||||
backfill := make(chan []*container.LogEvent)
|
||||
backfill := make(chan []container.LogEvent)
|
||||
|
||||
levels := make(map[string]struct{})
|
||||
for _, level := range r.URL.Query()["levels"] {
|
||||
@@ -318,7 +318,7 @@ func (h *handler) streamLogsForContainers(w http.ResponseWriter, r *http.Request
|
||||
delta := -10 * time.Second
|
||||
to := absoluteTime
|
||||
for minimum > 0 {
|
||||
events := make([]*container.LogEvent, 0)
|
||||
events := make([]container.LogEvent, 0)
|
||||
stillRunning := false
|
||||
for _, container := range existingContainers {
|
||||
containerService, err := h.hostService.FindContainer(container.Host, container.ID, userLabels)
|
||||
@@ -418,7 +418,7 @@ loop:
|
||||
continue
|
||||
}
|
||||
|
||||
support_web.EscapeHTMLValues(logEvent)
|
||||
support_web.EscapeHTMLValues(&logEvent)
|
||||
sseWriter.Message(logEvent)
|
||||
case c := <-newContainers:
|
||||
if _, err := h.hostService.FindContainer(c.Host, c.ID, userLabels); err == nil {
|
||||
@@ -433,8 +433,8 @@ loop:
|
||||
}
|
||||
|
||||
case backfillEvents := <-backfill:
|
||||
for _, event := range backfillEvents {
|
||||
support_web.EscapeHTMLValues(event)
|
||||
for i := range backfillEvents {
|
||||
support_web.EscapeHTMLValues(&backfillEvents[i])
|
||||
}
|
||||
if err := sseWriter.Event("logs-backfill", backfillEvents); err != nil {
|
||||
log.Error().Err(err).Msg("error encoding container event")
|
||||
|
||||
Reference in New Issue
Block a user