1
0
mirror of https://github.com/amir20/dozzle.git synced 2025-12-27 07:31:46 +01:00
Files
dozzle/assets/composables/eventsource.ts
Amir Raminfar f1dad96b86 fix: clears logs when an exception is thrown in logs events source (#2406)
* feat: adds more logging for eventsource

* feat: removes error from event streams

* removes unused code

* adds errors back

* moves logs to debug

* removes lastEventId

* fixes tests
2023-10-03 16:58:46 +00:00

177 lines
4.8 KiB
TypeScript

import { type Ref } from "vue";
import { encodeXML } from "entities";
import debounce from "lodash.debounce";
import {
type LogEvent,
type JSONObject,
LogEntry,
asLogEntry,
DockerEventLogEntry,
SkippedLogsEntry,
} from "@/models/LogEntry";
import { Container } from "@/models/Container";
function parseMessage(data: string): LogEntry<string | JSONObject> {
const e = JSON.parse(data, (key, value) => {
if (typeof value === "string") {
return encodeXML(value);
}
return value;
}) as LogEvent;
return asLogEntry(e);
}
type LogStreamConfig = {
stdout: boolean;
stderr: boolean;
};
export function useLogStream(container: Ref<Container>, streamConfig: LogStreamConfig) {
let messages: LogEntry<string | JSONObject>[] = $ref([]);
let buffer: LogEntry<string | JSONObject>[] = $ref([]);
const scrollingPaused = $ref(inject("scrollingPaused") as Ref<boolean>);
let containerId = container.value.id;
function flushNow() {
if (messages.length > config.maxLogs) {
if (scrollingPaused) {
console.log("Skipping ", buffer.length, " log items");
if (messages.at(-1) instanceof SkippedLogsEntry) {
const lastEvent = messages.at(-1) as SkippedLogsEntry;
const lastItem = buffer.at(-1) as LogEntry<string | JSONObject>;
lastEvent.addSkippedEntries(buffer.length, lastItem);
} else {
const firstItem = buffer.at(0) as LogEntry<string | JSONObject>;
const lastItem = buffer.at(-1) as LogEntry<string | JSONObject>;
messages.push(new SkippedLogsEntry(new Date(), buffer.length, firstItem, lastItem));
}
buffer = [];
} else {
messages.push(...buffer);
buffer = [];
messages = messages.slice(-config.maxLogs);
}
} else {
messages.push(...buffer);
buffer = [];
}
}
const flushBuffer = debounce(flushNow, 250, { maxWait: 1000 });
let es: EventSource | null = null;
function close() {
if (es) {
es.close();
console.debug(`EventSource closed for ${containerId}`);
es = null;
}
}
function clearMessages() {
flushBuffer.cancel();
messages = [];
buffer = [];
console.debug(`Clearing messages for ${containerId}`);
}
function connect({ clear } = { clear: true }) {
close();
if (clear) {
clearMessages();
}
const params = {} as { stdout?: string; stderr?: string };
if (streamConfig.stdout) {
params.stdout = "1";
}
if (streamConfig.stderr) {
params.stderr = "1";
}
containerId = container.value.id;
console.debug(`Connecting to ${containerId} with params`, params);
es = new EventSource(
`${config.base}/api/logs/stream/${container.value.host}/${containerId}?${new URLSearchParams(params).toString()}`,
);
es.addEventListener("container-stopped", () => {
close();
buffer.push(new DockerEventLogEntry("Container stopped", new Date(), "container-stopped"));
flushBuffer();
flushBuffer.flush();
});
es.onerror = (e) => {
console.error(`Unexpected error for eventsource container-id:${containerId}. Clearing logs and reconnecting.`);
clearMessages();
};
es.onmessage = (e) => {
if (e.data) {
buffer.push(parseMessage(e.data));
flushBuffer();
}
};
}
async function loadOlderLogs({ beforeLoading, afterLoading } = { beforeLoading: () => {}, afterLoading: () => {} }) {
if (messages.length < 300) return;
beforeLoading();
const to = messages[0].date;
const last = messages[299].date;
const delta = to.getTime() - last.getTime();
const from = new Date(to.getTime() + delta);
const params = {
from: from.toISOString(),
to: to.toISOString(),
} as { from: string; to: string; stdout?: string; stderr?: string };
if (streamConfig.stdout) {
params.stdout = "1";
}
if (streamConfig.stderr) {
params.stderr = "1";
}
const logs = await (
await fetch(
`${config.base}/api/logs/${container.value.host}/${containerId}?${new URLSearchParams(params).toString()}`,
)
).text();
if (logs) {
const newMessages = logs
.trim()
.split("\n")
.map((line) => parseMessage(line));
messages.unshift(...newMessages);
}
afterLoading();
}
watch(
() => container.value.state,
(newValue, oldValue) => {
console.log("LogEventSource: container changed", newValue, oldValue);
if (newValue == "running" && newValue != oldValue) {
buffer.push(new DockerEventLogEntry("Container started", new Date(), "container-started"));
connect({ clear: false });
}
},
);
onUnmounted(() => close());
watch(
() => container.value.id,
() => connect(),
{ immediate: true },
);
watch(streamConfig, () => connect());
return { ...$$({ messages }), loadOlderLogs };
}