feat(db): sqlite in worker (not working right now)

This commit is contained in:
Samuel 2025-01-20 17:00:57 +01:00
parent 0e1eed664d
commit c9aaf2ecef
12 changed files with 467 additions and 56 deletions

View file

@ -0,0 +1,183 @@
import type { DatabaseConnection, Driver, QueryResult } from "kysely";
import { CompiledQuery, SelectQueryNode } from "kysely";
import type { Emitter } from "zen-mitt";
import { mitt } from "zen-mitt";
import type { EventWithError, MainToWorkerMsg, OfficialWasmWorkerDialectConfig, WorkerToMainMsg } from "./type";
import workerUrl from "./worker?url";
export class OfficialWasmWorkerDriver implements Driver {
private worker?: Worker;
private connection?: DatabaseConnection;
private connectionMutex = new ConnectionMutex();
private mitt?: Emitter<EventWithError>;
constructor(private config: OfficialWasmWorkerDialectConfig) {}
async init(): Promise<void> {
// try to persist storage, https://web.dev/articles/persistent-storage#request_persistent_storage
try {
if (navigator.storage?.persist && !(await navigator.storage.persisted())) {
await navigator.storage.persist();
}
// biome-ignore lint/suspicious/noEmptyBlockStatements: <explanation>
} catch {}
this.mitt = mitt<EventWithError>();
this.worker =
this.config.worker ??
new Worker(workerUrl, {
type: "module",
});
this.worker.onmessage = ({ data: [type, ...msg] }: MessageEvent<WorkerToMainMsg>) => {
this.mitt?.emit(type, ...msg);
};
this.worker.postMessage([0, this.config.fileName, this.config.preferOPFS ?? false] satisfies MainToWorkerMsg);
await new Promise<void>((resolve, reject) => {
this.mitt?.once(0, (_, err) => (err ? reject(err) : resolve()));
});
this.connection = new OfficialWasmWorkerConnection(this.worker, this.mitt);
await this.config.onCreateConnection?.(this.connection);
}
async acquireConnection(): Promise<DatabaseConnection> {
// SQLite only has one single connection. We use a mutex here to wait
// until the single connection has been released.
await this.connectionMutex.lock();
// biome-ignore lint/style/noNonNullAssertion: <explanation>
return this.connection!;
}
async beginTransaction(connection: DatabaseConnection): Promise<void> {
await connection.executeQuery(CompiledQuery.raw("begin"));
}
async commitTransaction(connection: DatabaseConnection): Promise<void> {
await connection.executeQuery(CompiledQuery.raw("commit"));
}
async rollbackTransaction(connection: DatabaseConnection): Promise<void> {
await connection.executeQuery(CompiledQuery.raw("rollback"));
}
releaseConnection(): Promise<void> {
return new Promise((resolve) => {
this.connectionMutex.unlock();
resolve();
});
}
async destroy(): Promise<void> {
if (!this.worker) {
return;
}
this.worker.postMessage([2] satisfies MainToWorkerMsg);
return new Promise<void>((resolve, reject) => {
this.mitt?.once(2, (_, err) => {
if (err) {
reject(err);
} else {
this.worker?.terminate();
this.mitt?.off();
this.mitt = undefined;
resolve();
}
});
});
}
}
class ConnectionMutex {
private promise?: Promise<void>;
private resolve?: () => void;
async lock(): Promise<void> {
while (this.promise) {
await this.promise;
}
this.promise = new Promise((resolve) => {
this.resolve = resolve;
});
}
unlock(): void {
const resolve = this.resolve;
this.promise = undefined;
this.resolve = undefined;
resolve?.();
}
}
class OfficialWasmWorkerConnection implements DatabaseConnection {
readonly worker: Worker;
readonly mitt?: Emitter<EventWithError>;
constructor(worker: Worker, mitt?: Emitter<EventWithError>) {
this.worker = worker;
this.mitt = mitt;
}
async *streamQuery<R>(compiledQuery: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
const { parameters, sql, query } = compiledQuery;
if (!SelectQueryNode.is(query)) {
throw new Error("official wasm worker dialect only supports SELECT queries for streaming");
}
this.worker.postMessage([3, sql, parameters] satisfies MainToWorkerMsg);
let done = false;
let resolveFn: (value: IteratorResult<QueryResult<R>>) => void;
let rejectFn: (reason?: unknown) => void;
this.mitt?.on(3 /* data */, (data, err): void => {
if (err) {
rejectFn(err);
} else {
resolveFn({ value: { rows: data as R[] }, done: false });
}
});
this.mitt?.on(4 /* end */, (_, err): void => {
if (err) {
rejectFn(err);
} else {
resolveFn({ value: undefined, done: true });
}
});
while (!done) {
const result = await new Promise<IteratorResult<QueryResult<R>>>((res, rej) => {
resolveFn = res;
rejectFn = rej;
});
if (result.done) {
done = true;
this.mitt?.off(3 /* data */);
this.mitt?.off(4 /* end */);
} else {
yield result.value;
}
}
}
async executeQuery<R>(compiledQuery: CompiledQuery<unknown>): Promise<QueryResult<R>> {
const { sql, parameters, query } = compiledQuery;
const isSelect = SelectQueryNode.is(query);
this.worker.postMessage([1, isSelect, sql, parameters] satisfies MainToWorkerMsg);
return new Promise((resolve, reject) => {
if (!this.mitt) {
reject(new Error("kysely instance has been destroyed"));
}
this.mitt?.once(1, (data, err) => (!err && data ? resolve(data) : reject(err)));
});
}
}

View file

@ -0,0 +1,38 @@
import type {
DatabaseIntrospector,
Dialect,
DialectAdapter,
Driver,
Kysely,
QueryCompiler,
} from "kysely";
import { SqliteAdapter, SqliteIntrospector, SqliteQueryCompiler } from "kysely";
import { OfficialWasmWorkerDriver } from "./driver";
import type { OfficialWasmWorkerDialectConfig } from "./type";
export type {
Promisable,
OfficialWasmWorkerDialectConfig as WaSqliteWorkerDialectConfig,
} from "./type";
export { createOnMessageCallback } from "./worker/utils";
export class OfficialWasmWorkerDialect implements Dialect {
constructor(private config: OfficialWasmWorkerDialectConfig) {}
createDriver(): Driver {
return new OfficialWasmWorkerDriver(this.config);
}
createQueryCompiler(): QueryCompiler {
return new SqliteQueryCompiler();
}
createAdapter(): DialectAdapter {
return new SqliteAdapter();
}
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
createIntrospector(db: Kysely<any>): DatabaseIntrospector {
return new SqliteIntrospector(db);
}
}

View file

@ -0,0 +1,54 @@
import type { SqlValue } from "@sqlite.org/sqlite-wasm";
import type { DatabaseConnection, QueryResult } from "kysely";
export type Promisable<T> = T | Promise<T>;
export interface OfficialWasmWorkerDialectConfig {
/**
* db file name
*/
fileName: string;
/**
* prefer to store data in OPFS
* @default true
*/
preferOPFS?: boolean;
/**
* official wasm worker
*/
worker?: Worker;
onCreateConnection?: (connection: DatabaseConnection) => Promisable<void>;
}
type InitMsg = [type: 0, fileName: string, useOPFS: boolean];
type RunMsg = [type: 1, isSelect: boolean, sql: string, parameters?: readonly unknown[]];
type CloseMsg = [2];
type StreamMsg = [type: 3, sql: string, parameters?: readonly unknown[]];
type LoadDbMsg = [type: 4, filename: string, useOPFS: boolean, statements: string[]];
export type MainToWorkerMsg = InitMsg | RunMsg | CloseMsg | StreamMsg | LoadDbMsg;
type Events = {
0: null;
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
1: QueryResult<any> | null;
2: null;
3: {
[columnName: string]: SqlValue;
}[];
4: null;
5: number;
6: null;
};
export type WorkerToMainMsg = {
[K in keyof Events]: [type: K, data: Events[K], err: unknown];
}[keyof Events];
export type EventWithError = {
[K in keyof Events]: [data: Events[K], err: unknown];
};

View file

@ -0,0 +1,3 @@
import { createOnMessageCallback } from "./utils";
self.onmessage = createOnMessageCallback();

View file

@ -0,0 +1,126 @@
import sqlite3InitModule, {
type BindingSpec,
type Database,
type OpfsDatabase,
type Sqlite3Static,
type SqlValue,
} from "@sqlite.org/sqlite-wasm";
import type { QueryResult } from "kysely";
import type { MainToWorkerMsg, WorkerToMainMsg } from "../type";
let sqlite3: Sqlite3Static;
let db: Database | OpfsDatabase;
async function init(
fileName: string,
preferOpfs: boolean,
afterInit?: (sqliteDB: Database | OpfsDatabase) => Promise<void>,
): Promise<void> {
sqlite3 = await sqlite3InitModule();
db = preferOpfs && "opfs" in sqlite3.oo1 ? new sqlite3.oo1.OpfsDb(fileName) : new sqlite3.oo1.DB(fileName);
await afterInit?.(db);
}
function exec(isSelect: boolean, sql: string, parameters?: readonly unknown[]): QueryResult<SqlValue[]> {
const rows = db.exec(sql, {
bind: parameters as BindingSpec,
returnValue: "resultRows",
});
return isSelect || rows.length
? { rows }
: {
rows,
insertId: BigInt(sqlite3.capi.sqlite3_last_insert_rowid(db)),
numAffectedRows: BigInt(db.changes()),
};
}
function stream(
onData: (data: { [columnName: string]: SqlValue }) => void,
sql: string,
parameters?: readonly unknown[],
): void {
const stmt = db.prepare(sql);
if (parameters) {
stmt.bind(parameters as BindingSpec);
}
while (stmt.step()) {
onData(stmt.get({}));
}
stmt.finalize();
}
async function loadDb(onData: (percentage: number) => void, fileName: string, useOPFS: boolean, statements: string[]) {
if (!db) {
await init(fileName, useOPFS);
}
const length = statements.length;
let percentage = 0;
for (let i = 0; i < length; i++) {
const newPercentage = Math.round((i / length) * 100);
if (newPercentage !== percentage) {
onData(newPercentage);
percentage = newPercentage;
}
console.log("executing statement");
db.exec(statements[i]);
}
}
/**
* Handle worker message, support custom callback on initialization
* @example
* // worker.ts
* import { createOnMessageCallback, customFunction } from 'kysely-wasqlite-worker'
*
* onmessage = createOnMessageCallback(
* async (sqliteDB: SQLiteDB) => {
* customFunction(sqliteDB.sqlite, sqliteDB.db, 'customFunction', (a, b) => a + b)
* }
* )
*/
export function createOnMessageCallback(
afterInit?: (sqliteDB: Database | OpfsDatabase) => Promise<void>,
): (event: MessageEvent<MainToWorkerMsg>) => Promise<void> {
return async ({ data: [msg, data1, data2, data3] }: MessageEvent<MainToWorkerMsg>) => {
const ret: WorkerToMainMsg = [msg, null, null];
try {
switch (msg) {
case 0:
await init(data1, data2, afterInit);
break;
case 1:
ret[1] = exec(data1, data2, data3);
break;
case 2:
db.close();
break;
case 3:
stream((val) => postMessage([3, [val], null] satisfies WorkerToMainMsg), data1, data2);
ret[0] = 4;
break;
case 4:
loadDb((percentage) => postMessage([5, percentage, null] satisfies WorkerToMainMsg), data1, data2, data3);
ret[0] = 6;
break;
}
} catch (error) {
console.error(error);
ret[2] = error;
}
postMessage(ret);
};
}