Use reconnecting-websocket and refactor debug stream

This commit is contained in:
Valtteri Karesto
2022-06-27 15:03:42 +03:00
parent dcb7e94e86
commit b2af37ab4b

View File

@@ -1,5 +1,7 @@
import { useCallback, useEffect } from "react";
import { useEffect } from "react";
import ReconnectingWebSocket, { CloseEvent } from "reconnecting-websocket";
import { proxy, ref, useSnapshot } from "valtio";
import { subscribeKey } from "valtio/utils";
import { Select } from ".";
import state, { ILog, transactionsState } from "../state";
import { extractJSON } from "../utils/json";
@@ -15,7 +17,7 @@ export interface IStreamState {
status: "idle" | "opened" | "closed";
statusChangeTimestamp?: number;
logs: ILog[];
socket?: WebSocket;
socket?: ReconnectingWebSocket;
}
export const streamState = proxy<IStreamState>({
@@ -24,12 +26,70 @@ export const streamState = proxy<IStreamState>({
logs: [] as ILog[],
});
const onOpen = (account: ISelect | null) => {
if (!account) {
return;
}
// streamState.logs = [];
streamState.status = "opened";
streamState.statusChangeTimestamp = Date.now();
pushLog(`Debug stream opened for account ${account?.value}`, {
type: "success",
});
};
const onError = () => {
pushLog("Something went wrong! Check your connection and try again.", {
type: "error",
});
};
const onClose = (e: CloseEvent) => {
// 999 = closed websocket connection by switching account
if (e.code !== 4999) {
pushLog(`Connection was closed. [code: ${e.code}]`, {
type: "error",
});
}
streamState.status = "closed";
streamState.statusChangeTimestamp = Date.now();
};
const onMessage = (event: any) => {
pushLog(event.data);
};
const addListeners = (account: ISelect | null) => {
if (account?.value && streamState.socket?.url.endsWith(account?.value)) {
return;
}
streamState.logs = [];
if (account?.value) {
if (streamState.socket) {
streamState.socket?.removeEventListener("open", () => onOpen(account));
streamState.socket?.removeEventListener("close", onClose);
streamState.socket?.removeEventListener("error", onError);
streamState.socket?.removeEventListener("message", onMessage);
}
streamState.socket = ref(
new ReconnectingWebSocket(
`wss://${process.env.NEXT_PUBLIC_DEBUG_STREAM_URL}/${account?.value}`
)
);
streamState.socket.addEventListener("open", () => onOpen(account));
streamState.socket.addEventListener("close", onClose);
streamState.socket.addEventListener("error", onError);
streamState.socket.addEventListener("message", onMessage);
}
};
subscribeKey(streamState, "selectedAccount", addListeners);
const DebugStream = () => {
const { selectedAccount, logs, socket } = useSnapshot(streamState);
const { selectedAccount, logs } = useSnapshot(streamState);
const { activeHeader: activeTxTab } = useSnapshot(transactionsState);
const { accounts } = useSnapshot(state);
const accountOptions = accounts.map(acc => ({
const accountOptions = accounts.map((acc) => ({
label: acc.name,
value: acc.address,
}));
@@ -42,117 +102,21 @@ const DebugStream = () => {
options={accountOptions}
hideSelectedOptions
value={selectedAccount}
onChange={acc => (streamState.selectedAccount = acc as any)}
onChange={(acc) => {
streamState.socket?.close(
4999,
"Old connection closed because user switched account"
);
streamState.selectedAccount = acc as any;
}}
css={{ width: "100%" }}
/>
</>
);
useEffect(() => {
const account = selectedAccount?.value;
if (account && (!socket || !socket.url.endsWith(account))) {
socket?.close();
streamState.socket = ref(
new WebSocket(
`wss://${process.env.NEXT_PUBLIC_DEBUG_STREAM_URL}/${account}`
)
);
} else if (!account && socket) {
socket.close();
streamState.socket = undefined;
}
}, [selectedAccount?.value, socket]);
const onMount = useCallback(async () => {
// deliberately using `proxy` values and not the `useSnapshot` ones to have no dep list
const acc = streamState.selectedAccount;
const status = streamState.status;
if (status === "opened" && acc) {
// fetch the missing ones
try {
const url = `https://${process.env.NEXT_PUBLIC_DEBUG_STREAM_URL}/recent/${acc?.value}`;
// TODO Remove after api sets cors properly
const res = await fetch("/api/proxy", {
method: "POST",
body: JSON.stringify({ url }),
headers: {
"Content-Type": "application/json",
},
});
if (!res.ok) return;
const body = await res.json();
if (!body?.logs) return;
const start = streamState.statusChangeTimestamp || 0;
streamState.logs = [];
pushLog(`Debug stream opened for account ${acc.value}`, {
type: "success",
});
const logs = Object.entries(body.logs).filter(([tm]) => +tm >= start);
logs.forEach(([tm, log]) => pushLog(log));
} catch (error) {
console.error(error);
}
}
}, []);
useEffect(() => {
onMount();
}, [onMount]);
useEffect(() => {
const account = selectedAccount?.value;
const socket = streamState.socket;
if (!socket) return;
const onOpen = () => {
streamState.logs = [];
streamState.status = "opened";
streamState.statusChangeTimestamp = Date.now();
pushLog(`Debug stream opened for account ${account}`, {
type: "success",
});
};
const onError = () => {
pushLog("Something went wrong! Check your connection and try again.", {
type: "error",
});
};
const onClose = (e: CloseEvent) => {
pushLog(`Connection was closed. [code: ${e.code}]`, {
type: "error",
});
streamState.selectedAccount = null;
streamState.status = "closed";
streamState.statusChangeTimestamp = Date.now();
};
const onMessage = (event: any) => {
pushLog(event.data);
};
socket.addEventListener("open", onOpen);
socket.addEventListener("close", onClose);
socket.addEventListener("error", onError);
socket.addEventListener("message", onMessage);
return () => {
socket.removeEventListener("open", onOpen);
socket.removeEventListener("close", onClose);
socket.removeEventListener("message", onMessage);
socket.removeEventListener("error", onError);
};
}, [selectedAccount?.value, socket]);
useEffect(() => {
const account = transactionsState.transactions.find(
tx => tx.header === activeTxTab
(tx) => tx.header === activeTxTab
)?.state.selectedAccount;
if (account && account.value !== streamState.selectedAccount?.value)