From c9aaf2eceff3a6e103329bacf0a9e66cb3c16995 Mon Sep 17 00:00:00 2001 From: Samuel Date: Mon, 20 Jan 2025 17:00:57 +0100 Subject: [PATCH] feat(db): sqlite in worker (not working right now) --- package.json | 3 +- pnpm-lock.yaml | 8 + src/db/db-queries.ts | 46 +++-- src/db/db.ts | 19 +- src/db/index.ts | 2 +- src/index.tsx | 27 ++- src/lib/kysely-official-wasm-worker/driver.ts | 183 ++++++++++++++++++ src/lib/kysely-official-wasm-worker/index.ts | 38 ++++ src/lib/kysely-official-wasm-worker/type.ts | 54 ++++++ .../worker/index.ts | 3 + .../worker/utils.ts | 126 ++++++++++++ src/pages/home.tsx | 14 +- 12 files changed, 467 insertions(+), 56 deletions(-) create mode 100644 src/lib/kysely-official-wasm-worker/driver.ts create mode 100644 src/lib/kysely-official-wasm-worker/index.ts create mode 100644 src/lib/kysely-official-wasm-worker/type.ts create mode 100644 src/lib/kysely-official-wasm-worker/worker/index.ts create mode 100644 src/lib/kysely-official-wasm-worker/worker/utils.ts diff --git a/package.json b/package.json index 8fc0e9d..8c736c8 100644 --- a/package.json +++ b/package.json @@ -54,7 +54,8 @@ "seroval": "^1.2.0", "solid-js": "^1.9.4", "tailwind-merge": "^2.6.0", - "tailwindcss-animate": "^1.0.7" + "tailwindcss-animate": "^1.0.7", + "zen-mitt": "^3.0.0" }, "lint-staged": { "*.{ts,tsx}": [ diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c157142..0c568bc 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -83,6 +83,9 @@ importers: tailwindcss-animate: specifier: ^1.0.7 version: 1.0.7(tailwindcss@3.4.17) + zen-mitt: + specifier: ^3.0.0 + version: 3.0.0 devDependencies: '@biomejs/biome': specifier: 1.9.4 @@ -2118,6 +2121,9 @@ packages: resolution: {integrity: sha512-b4JR1PFR10y1mKjhHY9LaGo6tmrgjit7hxVIeAmyMw3jegXR4dhYqLaQF5zMXZxY7tLpMyJeLjr1C4rLmkVe8g==} engines: {node: '>=12.20'} + zen-mitt@3.0.0: + resolution: {integrity: sha512-r17ulzFIV/Iq6x35lWKcwbX/EJiDNoFI3hxQ4OY2p2pJrfH6i4IrWwVd/nEIHqMuMR0XHBeYQN9PIx6LfJGoMw==} + snapshots: '@alloc/quick-lru@5.2.0': {} @@ -3969,3 +3975,5 @@ snapshots: yargs-parser: 21.1.1 yocto-queue@1.1.1: {} + + zen-mitt@3.0.0: {} diff --git a/src/db/db-queries.ts b/src/db/db-queries.ts index 19af65b..eb10036 100644 --- a/src/db/db-queries.ts +++ b/src/db/db-queries.ts @@ -1,29 +1,33 @@ import { sql, type NotNull } from "kysely"; -import { db, kyselyDb, SELF_ID } from "./db"; +import { worker, kyselyDb, SELF_ID, DB_FILENAME } from "./db"; import { cached } from "../lib/db-cache"; +import type { MainToWorkerMsg, WorkerToMainMsg } from "~/lib/kysely-official-wasm-worker/type"; -export const loadDb = (statements: string[], progressCallback?: (percentage: number) => void) => { - const length = statements.length; - let percentage = 0; - - for (let i = 0; i < length; i++) { - const statement = statements[i]; - const newPercentage = Math.round((i / length) * 100); - - try { - if (progressCallback && newPercentage !== percentage) { - progressCallback(newPercentage); - - percentage = newPercentage; +export const loadDb = (statements: string[], progressCallback?: (percentage: number) => void): Promise => { + return new Promise((resolve, reject) => { + const progressListener = ({ data }: MessageEvent) => { + if (data[0] === 5) { + progressCallback?.(data[1]); } + }; - db.exec(statement); - } catch (e) { - throw new Error(`statement failed: ${statement}`, { - cause: e, - }); - } - } + const endListener = ({ data }: MessageEvent) => { + if (data[0] === 6) { + if (data[2]) { + reject(new Error(`statement failed`, { cause: data[2] })); + } + + worker.removeEventListener("message", progressListener); + worker.removeEventListener("message", endListener); + resolve(); + } + }; + + worker.addEventListener("message", endListener); + worker.addEventListener("message", progressListener); + + worker.postMessage([4, DB_FILENAME, true, statements] satisfies MainToWorkerMsg); + }); }; const allThreadsOverviewQueryRaw = () => diff --git a/src/db/db.ts b/src/db/db.ts index 4ace36b..76f0bd4 100644 --- a/src/db/db.ts +++ b/src/db/db.ts @@ -1,21 +1,22 @@ import { makePersisted } from "@solid-primitives/storage"; -import sqlite3InitModule from "@sqlite.org/sqlite-wasm"; import { Kysely } from "kysely"; import type { DB } from "./db-schema"; -import { OfficialWasmDialect } from "kysely-wasm"; import { createSignal } from "solid-js"; +import { OfficialWasmWorkerDialect } from "~/lib/kysely-official-wasm-worker"; +import wasmWorkerUrl from "~/lib/kysely-official-wasm-worker/worker?url"; export const SELF_ID = 2; -const sqlite3 = await sqlite3InitModule({ - print: console.log, - printErr: console.error, +export const DB_FILENAME = "signal.sqlite"; + +export const worker = new Worker(wasmWorkerUrl, { + type: "module", }); -export const db = new sqlite3.oo1.DB("signal"); - -const dialect = new OfficialWasmDialect({ - database: db, +const dialect = new OfficialWasmWorkerDialect({ + fileName: DB_FILENAME, + preferOPFS: true, + worker, }); export const kyselyDb = new Kysely({ diff --git a/src/db/index.ts b/src/db/index.ts index 32b878c..5a4f08d 100644 --- a/src/db/index.ts +++ b/src/db/index.ts @@ -1,2 +1,2 @@ -export * from "./db"; +export { kyselyDb, SELF_ID } from "./db"; export * from "./db-queries"; diff --git a/src/index.tsx b/src/index.tsx index 461f0db..6bddfe1 100644 --- a/src/index.tsx +++ b/src/index.tsx @@ -1,13 +1,8 @@ /* @refresh reload */ import { MetaProvider } from "@solidjs/meta"; -import { Router, useNavigate } from "@solidjs/router"; +import { Router } from "@solidjs/router"; import { render } from "solid-js/web"; -import { createEffect, enableScheduling } from "solid-js"; - import App from "./App"; -import { db } from "./db/db"; - -enableScheduling(); const root = document.getElementById("root"); @@ -23,18 +18,18 @@ if (root) {
{ - const navigate = useNavigate(); - const { pathname } = props.location; + // root={(props) => { + // const navigate = useNavigate(); + // const { pathname } = props.location; - createEffect(() => { - if (!db && pathname !== "/") { - navigate("/"); - } - }); + // createEffect(() => { + // if (!db && pathname !== "/") { + // navigate("/"); + // } + // }); - return props.children; - }} + // return props.children; + // }} > diff --git a/src/lib/kysely-official-wasm-worker/driver.ts b/src/lib/kysely-official-wasm-worker/driver.ts new file mode 100644 index 0000000..5253f7b --- /dev/null +++ b/src/lib/kysely-official-wasm-worker/driver.ts @@ -0,0 +1,183 @@ +import type { DatabaseConnection, Driver, QueryResult } from "kysely"; +import { CompiledQuery, SelectQueryNode } from "kysely"; +import type { Emitter } from "zen-mitt"; +import { mitt } from "zen-mitt"; +import type { EventWithError, MainToWorkerMsg, OfficialWasmWorkerDialectConfig, WorkerToMainMsg } from "./type"; +import workerUrl from "./worker?url"; + +export class OfficialWasmWorkerDriver implements Driver { + private worker?: Worker; + private connection?: DatabaseConnection; + private connectionMutex = new ConnectionMutex(); + private mitt?: Emitter; + constructor(private config: OfficialWasmWorkerDialectConfig) {} + + async init(): Promise { + // try to persist storage, https://web.dev/articles/persistent-storage#request_persistent_storage + try { + if (navigator.storage?.persist && !(await navigator.storage.persisted())) { + await navigator.storage.persist(); + } + // biome-ignore lint/suspicious/noEmptyBlockStatements: + } catch {} + + this.mitt = mitt(); + + this.worker = + this.config.worker ?? + new Worker(workerUrl, { + type: "module", + }); + + this.worker.onmessage = ({ data: [type, ...msg] }: MessageEvent) => { + this.mitt?.emit(type, ...msg); + }; + + this.worker.postMessage([0, this.config.fileName, this.config.preferOPFS ?? false] satisfies MainToWorkerMsg); + + await new Promise((resolve, reject) => { + this.mitt?.once(0, (_, err) => (err ? reject(err) : resolve())); + }); + + this.connection = new OfficialWasmWorkerConnection(this.worker, this.mitt); + await this.config.onCreateConnection?.(this.connection); + } + + async acquireConnection(): Promise { + // SQLite only has one single connection. We use a mutex here to wait + // until the single connection has been released. + await this.connectionMutex.lock(); + + // biome-ignore lint/style/noNonNullAssertion: + return this.connection!; + } + + async beginTransaction(connection: DatabaseConnection): Promise { + await connection.executeQuery(CompiledQuery.raw("begin")); + } + + async commitTransaction(connection: DatabaseConnection): Promise { + await connection.executeQuery(CompiledQuery.raw("commit")); + } + + async rollbackTransaction(connection: DatabaseConnection): Promise { + await connection.executeQuery(CompiledQuery.raw("rollback")); + } + + releaseConnection(): Promise { + return new Promise((resolve) => { + this.connectionMutex.unlock(); + + resolve(); + }); + } + + async destroy(): Promise { + if (!this.worker) { + return; + } + this.worker.postMessage([2] satisfies MainToWorkerMsg); + return new Promise((resolve, reject) => { + this.mitt?.once(2, (_, err) => { + if (err) { + reject(err); + } else { + this.worker?.terminate(); + this.mitt?.off(); + this.mitt = undefined; + resolve(); + } + }); + }); + } +} + +class ConnectionMutex { + private promise?: Promise; + private resolve?: () => void; + + async lock(): Promise { + while (this.promise) { + await this.promise; + } + + this.promise = new Promise((resolve) => { + this.resolve = resolve; + }); + } + + unlock(): void { + const resolve = this.resolve; + + this.promise = undefined; + this.resolve = undefined; + + resolve?.(); + } +} + +class OfficialWasmWorkerConnection implements DatabaseConnection { + readonly worker: Worker; + readonly mitt?: Emitter; + constructor(worker: Worker, mitt?: Emitter) { + this.worker = worker; + this.mitt = mitt; + } + + async *streamQuery(compiledQuery: CompiledQuery): AsyncIterableIterator> { + const { parameters, sql, query } = compiledQuery; + if (!SelectQueryNode.is(query)) { + throw new Error("official wasm worker dialect only supports SELECT queries for streaming"); + } + this.worker.postMessage([3, sql, parameters] satisfies MainToWorkerMsg); + let done = false; + let resolveFn: (value: IteratorResult>) => void; + let rejectFn: (reason?: unknown) => void; + + this.mitt?.on(3 /* data */, (data, err): void => { + if (err) { + rejectFn(err); + } else { + resolveFn({ value: { rows: data as R[] }, done: false }); + } + }); + + this.mitt?.on(4 /* end */, (_, err): void => { + if (err) { + rejectFn(err); + } else { + resolveFn({ value: undefined, done: true }); + } + }); + + while (!done) { + const result = await new Promise>>((res, rej) => { + resolveFn = res; + rejectFn = rej; + }); + + if (result.done) { + done = true; + this.mitt?.off(3 /* data */); + this.mitt?.off(4 /* end */); + } else { + yield result.value; + } + } + } + + async executeQuery(compiledQuery: CompiledQuery): Promise> { + const { sql, parameters, query } = compiledQuery; + + const isSelect = SelectQueryNode.is(query); + + this.worker.postMessage([1, isSelect, sql, parameters] satisfies MainToWorkerMsg); + return new Promise((resolve, reject) => { + if (!this.mitt) { + reject(new Error("kysely instance has been destroyed")); + } + + this.mitt?.once(1, (data, err) => (!err && data ? resolve(data) : reject(err))); + }); + } +} diff --git a/src/lib/kysely-official-wasm-worker/index.ts b/src/lib/kysely-official-wasm-worker/index.ts new file mode 100644 index 0000000..1886c71 --- /dev/null +++ b/src/lib/kysely-official-wasm-worker/index.ts @@ -0,0 +1,38 @@ +import type { + DatabaseIntrospector, + Dialect, + DialectAdapter, + Driver, + Kysely, + QueryCompiler, +} from "kysely"; +import { SqliteAdapter, SqliteIntrospector, SqliteQueryCompiler } from "kysely"; +import { OfficialWasmWorkerDriver } from "./driver"; +import type { OfficialWasmWorkerDialectConfig } from "./type"; + +export type { + Promisable, + OfficialWasmWorkerDialectConfig as WaSqliteWorkerDialectConfig, +} from "./type"; +export { createOnMessageCallback } from "./worker/utils"; + +export class OfficialWasmWorkerDialect implements Dialect { + constructor(private config: OfficialWasmWorkerDialectConfig) {} + + createDriver(): Driver { + return new OfficialWasmWorkerDriver(this.config); + } + + createQueryCompiler(): QueryCompiler { + return new SqliteQueryCompiler(); + } + + createAdapter(): DialectAdapter { + return new SqliteAdapter(); + } + + // biome-ignore lint/suspicious/noExplicitAny: + createIntrospector(db: Kysely): DatabaseIntrospector { + return new SqliteIntrospector(db); + } +} diff --git a/src/lib/kysely-official-wasm-worker/type.ts b/src/lib/kysely-official-wasm-worker/type.ts new file mode 100644 index 0000000..80511ce --- /dev/null +++ b/src/lib/kysely-official-wasm-worker/type.ts @@ -0,0 +1,54 @@ +import type { SqlValue } from "@sqlite.org/sqlite-wasm"; +import type { DatabaseConnection, QueryResult } from "kysely"; + +export type Promisable = T | Promise; + +export interface OfficialWasmWorkerDialectConfig { + /** + * db file name + */ + fileName: string; + /** + * prefer to store data in OPFS + * @default true + */ + preferOPFS?: boolean; + /** + * official wasm worker + */ + worker?: Worker; + onCreateConnection?: (connection: DatabaseConnection) => Promisable; +} + +type InitMsg = [type: 0, fileName: string, useOPFS: boolean]; + +type RunMsg = [type: 1, isSelect: boolean, sql: string, parameters?: readonly unknown[]]; + +type CloseMsg = [2]; + +type StreamMsg = [type: 3, sql: string, parameters?: readonly unknown[]]; + +type LoadDbMsg = [type: 4, filename: string, useOPFS: boolean, statements: string[]]; + +export type MainToWorkerMsg = InitMsg | RunMsg | CloseMsg | StreamMsg | LoadDbMsg; + +type Events = { + 0: null; + // biome-ignore lint/suspicious/noExplicitAny: + 1: QueryResult | null; + 2: null; + 3: { + [columnName: string]: SqlValue; + }[]; + 4: null; + 5: number; + 6: null; +}; + +export type WorkerToMainMsg = { + [K in keyof Events]: [type: K, data: Events[K], err: unknown]; +}[keyof Events]; + +export type EventWithError = { + [K in keyof Events]: [data: Events[K], err: unknown]; +}; diff --git a/src/lib/kysely-official-wasm-worker/worker/index.ts b/src/lib/kysely-official-wasm-worker/worker/index.ts new file mode 100644 index 0000000..93d008a --- /dev/null +++ b/src/lib/kysely-official-wasm-worker/worker/index.ts @@ -0,0 +1,3 @@ +import { createOnMessageCallback } from "./utils"; + +self.onmessage = createOnMessageCallback(); diff --git a/src/lib/kysely-official-wasm-worker/worker/utils.ts b/src/lib/kysely-official-wasm-worker/worker/utils.ts new file mode 100644 index 0000000..45e92f3 --- /dev/null +++ b/src/lib/kysely-official-wasm-worker/worker/utils.ts @@ -0,0 +1,126 @@ +import sqlite3InitModule, { + type BindingSpec, + type Database, + type OpfsDatabase, + type Sqlite3Static, + type SqlValue, +} from "@sqlite.org/sqlite-wasm"; +import type { QueryResult } from "kysely"; +import type { MainToWorkerMsg, WorkerToMainMsg } from "../type"; + +let sqlite3: Sqlite3Static; +let db: Database | OpfsDatabase; + +async function init( + fileName: string, + preferOpfs: boolean, + afterInit?: (sqliteDB: Database | OpfsDatabase) => Promise, +): Promise { + sqlite3 = await sqlite3InitModule(); + + db = preferOpfs && "opfs" in sqlite3.oo1 ? new sqlite3.oo1.OpfsDb(fileName) : new sqlite3.oo1.DB(fileName); + + await afterInit?.(db); +} + +function exec(isSelect: boolean, sql: string, parameters?: readonly unknown[]): QueryResult { + const rows = db.exec(sql, { + bind: parameters as BindingSpec, + returnValue: "resultRows", + }); + + return isSelect || rows.length + ? { rows } + : { + rows, + insertId: BigInt(sqlite3.capi.sqlite3_last_insert_rowid(db)), + numAffectedRows: BigInt(db.changes()), + }; +} + +function stream( + onData: (data: { [columnName: string]: SqlValue }) => void, + sql: string, + parameters?: readonly unknown[], +): void { + const stmt = db.prepare(sql); + + if (parameters) { + stmt.bind(parameters as BindingSpec); + } + + while (stmt.step()) { + onData(stmt.get({})); + } + + stmt.finalize(); +} + +async function loadDb(onData: (percentage: number) => void, fileName: string, useOPFS: boolean, statements: string[]) { + if (!db) { + await init(fileName, useOPFS); + } + + const length = statements.length; + let percentage = 0; + + for (let i = 0; i < length; i++) { + const newPercentage = Math.round((i / length) * 100); + + if (newPercentage !== percentage) { + onData(newPercentage); + + percentage = newPercentage; + } + + console.log("executing statement"); + + db.exec(statements[i]); + } +} + +/** + * Handle worker message, support custom callback on initialization + * @example + * // worker.ts + * import { createOnMessageCallback, customFunction } from 'kysely-wasqlite-worker' + * + * onmessage = createOnMessageCallback( + * async (sqliteDB: SQLiteDB) => { + * customFunction(sqliteDB.sqlite, sqliteDB.db, 'customFunction', (a, b) => a + b) + * } + * ) + */ +export function createOnMessageCallback( + afterInit?: (sqliteDB: Database | OpfsDatabase) => Promise, +): (event: MessageEvent) => Promise { + return async ({ data: [msg, data1, data2, data3] }: MessageEvent) => { + const ret: WorkerToMainMsg = [msg, null, null]; + + try { + switch (msg) { + case 0: + await init(data1, data2, afterInit); + break; + case 1: + ret[1] = exec(data1, data2, data3); + break; + case 2: + db.close(); + break; + case 3: + stream((val) => postMessage([3, [val], null] satisfies WorkerToMainMsg), data1, data2); + ret[0] = 4; + break; + case 4: + loadDb((percentage) => postMessage([5, percentage, null] satisfies WorkerToMainMsg), data1, data2, data3); + ret[0] = 6; + break; + } + } catch (error) { + console.error(error); + ret[2] = error; + } + postMessage(ret); + }; +} diff --git a/src/pages/home.tsx b/src/pages/home.tsx index 3701498..d2e2405 100644 --- a/src/pages/home.tsx +++ b/src/pages/home.tsx @@ -45,19 +45,17 @@ export const Home: Component = () => { if (currentBackupFile && currentPassphrase) { decryptBackup(currentBackupFile, currentPassphrase, setDecryptionProgress) - .then((result) => { + .then(async (result) => { setDecryptionProgress(undefined); // setIsLoadingDatabase(true); setLoadingProgress(0); - setTimeout(() => { - loadDb(result.database_statements, (newValue) => (console.log(newValue), setLoadingProgress(newValue))); + await loadDb(result.database_statements, (newValue) => (console.log(newValue), setLoadingProgress(newValue))); - // setIsLoadingDatabase(false); - setLoadingProgress(undefined); + // setIsLoadingDatabase(false); + setLoadingProgress(undefined); - navigate("/overview"); - }, 0); + navigate("/overview"); }) .catch((error) => { console.error("Decryption failed:", error); @@ -79,7 +77,7 @@ export const Home: Component = () => { }} > -

Decrypting database

+

Decrypting backup