diff --git a/server/src/database/helpers.ts b/server/src/database/helpers.ts index 47713198..745a3403 100644 --- a/server/src/database/helpers.ts +++ b/server/src/database/helpers.ts @@ -1,5 +1,3 @@ -import highland from "highland"; -import { streamEachPromise } from "../utils/streams.js"; import { clone } from "../utils/utils.js"; import { AssociationOptions, Model, ModelAttributeColumnOptions, ModelCtor, WhereOptions, DataTypes, FindOptions, Op, Sequelize, ModelStatic, InferAttributes, InferCreationAttributes, CreationAttributes } from "sequelize"; import { Line, Marker, PadId, ID, LineUpdate, MarkerUpdate, Type, Bbox } from "facilmap-types"; @@ -7,6 +5,7 @@ import Database from "./database.js"; import { isEqual } from "lodash-es"; import { calculateRouteForLine } from "../routing/routing.js"; import { PadModel } from "./pad"; +import { arrayToAsyncIterator } from "../utils/streams"; const ITEMS_PER_BATCH = 5000; @@ -149,15 +148,15 @@ export default class DatabaseHelpers { this._db = db; } - async _updateObjectStyles(objectStream: Marker | Line | Highland.Stream | Highland.Stream | Highland.Stream): Promise { - const stream = (highland.isStream(objectStream) ? highland(objectStream) : highland([ objectStream ])) as Highland.Stream; + async _updateObjectStyles(objects: Marker | Line | AsyncGenerator): Promise { + const iterator = Symbol.asyncIterator in objects ? objects : arrayToAsyncIterator([objects]); type MarkerData = { object: Marker; type: Type; update: MarkerUpdate; }; type LineData = { object: Line; type: Type; update: LineUpdate; }; const isLine = (data: MarkerData | LineData): data is LineData => (data.type.type == "line"); const types: Record = { }; - await streamEachPromise(stream, async (object: Marker | Line) => { + for await (const object of iterator) { const padId = object.padId; if(!types[object.typeId]) { @@ -234,7 +233,7 @@ export default class DatabaseHelpers { } await Promise.all(ret); - }); + } } async _padObjectExists(type: string, padId: PadId, id: ID): Promise { @@ -267,33 +266,27 @@ export default class DatabaseHelpers { return data; } - _toStream(getData: () => Promise>): Highland.Stream { - return highland(getData()).flatten() as any; - } + async* _getPadObjects(type: string, padId: PadId, condition?: FindOptions): AsyncGenerator { + const includeData = [ "Marker", "Line" ].includes(type); - _getPadObjects(type: string, padId: PadId, condition?: FindOptions): Highland.Stream { - return this._toStream(async () => { - const includeData = [ "Marker", "Line" ].includes(type); + if(includeData) { + condition = condition || { }; + condition.include = [ ...(condition.include ? (Array.isArray(condition.include) ? condition.include : [ condition.include ]) : [ ]), this._db._conn.model(type + "Data") ]; + } + + const Pad = this._db.pads.PadModel.build({ id: padId } satisfies Partial> as any); + const objs: Array = await (Pad as any)["get" + this._db._conn.model(type).getTableName()](condition); + + for (const obj of objs) { + const d: any = obj.toJSON(); if(includeData) { - condition = condition || { }; - condition.include = [ ...(condition.include ? (Array.isArray(condition.include) ? condition.include : [ condition.include ]) : [ ]), this._db._conn.model(type + "Data") ]; + d.data = this._dataFromArr((d as any)[type+"Data"]); + delete (d as any)[type+"Data"]; } - const Pad = this._db.pads.PadModel.build({ id: padId } satisfies Partial> as any); - const objs: Array = await (Pad as any)["get" + this._db._conn.model(type).getTableName()](condition); - - return objs.map((obj) => { - const d: any = obj.toJSON(); - - if(includeData) { - d.data = this._dataFromArr((d as any)[type+"Data"]); - delete (d as any)[type+"Data"]; - } - - return d; - }); - }); + yield d; + } } async _createPadObject(type: string, padId: PadId, data: any): Promise { @@ -397,10 +390,10 @@ export default class DatabaseHelpers { await model.bulkCreate(this._dataToArr(data, idObj)); } - renameObjectDataField(padId: PadId, typeId: ID, rename: Record }>, isLine: boolean): Promise { - const objectStream = (isLine ? this._db.lines.getPadLinesByType(padId, typeId) : this._db.markers.getPadMarkersByType(padId, typeId)) as Highland.Stream; + async renameObjectDataField(padId: PadId, typeId: ID, rename: Record }>, isLine: boolean): Promise { + const objectStream = (isLine ? this._db.lines.getPadLinesByType(padId, typeId) : this._db.markers.getPadMarkersByType(padId, typeId)); - return streamEachPromise(objectStream, async (object) => { + for await (const object of objectStream) { const newData = clone(object.data); const newNames: string[] = [ ]; @@ -424,7 +417,7 @@ export default class DatabaseHelpers { else await this._db.markers.updateMarker(object.padId, object.id, {data: newData}, true); // Last param true to not create history entry } - }); + } } async _bulkCreateInBatches(model: ModelCtor, data: Array>): Promise> { diff --git a/server/src/database/history.ts b/server/src/database/history.ts index 26751e8a..a694ecac 100644 --- a/server/src/database/history.ts +++ b/server/src/database/history.ts @@ -100,7 +100,7 @@ export default class DatabaseHistory { } - getHistory(padId: PadId, types?: HistoryEntryType[]): Highland.Stream { + getHistory(padId: PadId, types?: HistoryEntryType[]): AsyncGenerator { const query: FindOptions = { order: [[ "time", "DESC" ]] }; if(types) query.where = {type: types}; diff --git a/server/src/database/line.ts b/server/src/database/line.ts index e1d2fbe1..c84f2b46 100644 --- a/server/src/database/line.ts +++ b/server/src/database/line.ts @@ -2,8 +2,7 @@ import { CreationAttributes, CreationOptional, DataTypes, ForeignKey, HasManyGet import { BboxWithZoom, ID, Latitude, Line, LineCreate, ExtraInfo, LineUpdate, Longitude, PadId, Point, Route, TrackPoint } from "facilmap-types"; import Database from "./database.js"; import { BboxWithExcept, createModel, dataDefinition, DataModel, getDefaultIdType, getLatType, getLonType, getPosType, getVirtualLatType, getVirtualLonType, makeBboxCondition, makeNotNullForeignKey, validateColour } from "./helpers.js"; -import { groupBy, isEqual, mapValues, omit } from "lodash-es"; -import { wrapAsync } from "../utils/streams.js"; +import { chunk, groupBy, isEqual, mapValues, omit } from "lodash-es"; import { calculateRouteForLine } from "../routing/routing.js"; import { PadModel } from "./pad"; import { Point as GeoJsonPoint } from "geojson"; @@ -45,7 +44,7 @@ export interface LinePointModel extends Model, I zoom: number; idx: number; ele: number | null; - toJSON: () => TrackPoint; + toJSON: () => TrackPoint & { lineId: ID; pos: GeoJsonPoint }; } export default class DatabaseLines { @@ -150,21 +149,20 @@ export default class DatabaseLines { this.LineModel.hasMany(this.LineDataModel, { foreignKey: "lineId" }); } - getPadLines(padId: PadId, fields?: Array): Highland.Stream { + getPadLines(padId: PadId, fields?: Array): AsyncGenerator { const cond = fields ? { attributes: fields } : { }; return this._db.helpers._getPadObjects("Line", padId, cond); } - getPadLinesByType(padId: PadId, typeId: ID): Highland.Stream { + getPadLinesByType(padId: PadId, typeId: ID): AsyncGenerator { return this._db.helpers._getPadObjects("Line", padId, { where: { typeId: typeId } }); } - getPadLinesWithPoints(padId: PadId): Highland.Stream { - return this.getPadLines(padId) - .flatMap(wrapAsync(async (line): Promise => { - const trackPoints = await this.getAllLinePoints(line.id); - return { ...line, trackPoints }; - })); + async* getPadLinesWithPoints(padId: PadId): AsyncGenerator { + for await (const line of this.getPadLines(padId)) { + const trackPoints = await this.getAllLinePoints(line.id); + yield { ...line, trackPoints }; + } } async getLineTemplate(padId: PadId, data: { typeId: ID }): Promise { @@ -267,29 +265,30 @@ export default class DatabaseLines { return oldLine; } - getLinePointsForPad(padId: PadId, bboxWithZoom: BboxWithZoom & BboxWithExcept): Highland.Stream<{ id: ID; trackPoints: TrackPoint[] }> { - return this._db.helpers._toStream(async () => await this.LineModel.findAll({ attributes: ["id"], where: { padId } })) - .map((line) => line.id) - .batch(50000) - .flatMap(wrapAsync(async (lineIds) => { - const linePoints = await this.LinePointModel.findAll({ - where: { - [Op.and]: [ - { - zoom: { [Op.lte]: bboxWithZoom.zoom }, - lineId: { [Op.in]: lineIds } - }, - makeBboxCondition(bboxWithZoom) - ] - }, - attributes: ["pos", "lat", "lon", "ele", "zoom", "idx", "lineId"] - }); + async* getLinePointsForPad(padId: PadId, bboxWithZoom: BboxWithZoom & BboxWithExcept): AsyncGenerator<{ id: ID; trackPoints: TrackPoint[] }, void, void> { + const lines = await this.LineModel.findAll({ attributes: ["id"], where: { padId } }); + const chunks = chunk(lines.map((line) => line.id), 50000); + for (const lineIds of chunks) { + const linePoints = await this.LinePointModel.findAll({ + where: { + [Op.and]: [ + { + zoom: { [Op.lte]: bboxWithZoom.zoom }, + lineId: { [Op.in]: lineIds } + }, + makeBboxCondition(bboxWithZoom) + ] + }, + attributes: ["pos", "lat", "lon", "ele", "zoom", "idx", "lineId"] + }); - return Object.entries(groupBy(linePoints, "lineId")).map(([key, val]) => ({ + for (const [key, val] of Object.entries(groupBy(linePoints, "lineId"))) { + yield { id: Number(key), trackPoints: val.map((p) => omit(p.toJSON(), ["lineId", "pos"])) - })); - })).flatten(); + }; + } + } } async getAllLinePoints(lineId: ID): Promise { diff --git a/server/src/database/marker.ts b/server/src/database/marker.ts index 467dc100..12b7001f 100644 --- a/server/src/database/marker.ts +++ b/server/src/database/marker.ts @@ -68,11 +68,11 @@ export default class DatabaseMarkers { this.MarkerModel.hasMany(this.MarkerDataModel, { foreignKey: "markerId" }); } - getPadMarkers(padId: PadId, bbox?: BboxWithZoom & BboxWithExcept): Highland.Stream { + getPadMarkers(padId: PadId, bbox?: BboxWithZoom & BboxWithExcept): AsyncGenerator { return this._db.helpers._getPadObjects("Marker", padId, { where: makeBboxCondition(bbox) }); } - getPadMarkersByType(padId: PadId, typeId: ID): Highland.Stream { + getPadMarkersByType(padId: PadId, typeId: ID): AsyncGenerator { return this._db.helpers._getPadObjects("Marker", padId, { where: { padId: padId, typeId: typeId } }); } diff --git a/server/src/database/migrations.ts b/server/src/database/migrations.ts index 200743c2..18440244 100644 --- a/server/src/database/migrations.ts +++ b/server/src/database/migrations.ts @@ -1,15 +1,11 @@ import { clone, generateRandomId, promiseProps } from "../utils/utils.js"; -import { streamEachPromise } from "../utils/streams.js"; -import Sequelize, { CreationAttributes, DataTypes } from "sequelize"; +import { CreationAttributes, DataTypes, Op, Utils, col, fn } from "sequelize"; import { isEqual } from "lodash-es"; import Database from "./database.js"; import { PadModel } from "./pad.js"; -import { Line, Marker } from "facilmap-types"; import { LineModel, LinePointModel } from "./line.js"; import { getElevationForPoints } from "../elevation.js"; -const Op = Sequelize.Op; - export default class DatabaseMigrations { _db: Database; @@ -76,13 +72,13 @@ export default class DatabaseMigrations { // allow null on Pad.name, Marker.name, Line.name if(["Pads", "Markers", "Lines"].includes(table) && !attributes.name.allowNull) - await queryInterface.changeColumn(table, 'name', { type: Sequelize.TEXT, allowNull: true }); + await queryInterface.changeColumn(table, 'name', { type: DataTypes.TEXT, allowNull: true }); // Change routing mode field from ENUM to TEXT if(table == "Lines" && attributes.mode.type != "TEXT") - await queryInterface.changeColumn(table, "mode", { type: Sequelize.TEXT, allowNull: false, defaultValue: "" }); + await queryInterface.changeColumn(table, "mode", { type: DataTypes.TEXT, allowNull: false, defaultValue: "" }); if(table == "Types" && attributes.defaultMode.type != "TEXT") - await queryInterface.changeColumn(table, "defaultMode", { type: Sequelize.TEXT, allowNull: true }); + await queryInterface.changeColumn(table, "defaultMode", { type: DataTypes.TEXT, allowNull: true }); } } @@ -117,9 +113,9 @@ export default class DatabaseMigrations { const newFields = type.fields; // type.fields is a getter, we cannot modify the object directly const dropdowns = newFields.filter((field) => field.type == "dropdown"); if(dropdowns.length > 0) { - const objectStream = (type.type == "line" ? this._db.lines.getPadLinesByType(type.padId, type.id) : this._db.markers.getPadMarkersByType(type.padId, type.id)) as Highland.Stream; + const objectStream = (type.type == "line" ? this._db.lines.getPadLinesByType(type.padId, type.id) : this._db.markers.getPadMarkersByType(type.padId, type.id)); - await streamEachPromise(objectStream, (object) => { + for await (const object of objectStream) { const newData = clone(object.data); for(const dropdown of dropdowns) { const newVal = (dropdown.options || []).filter((option: any) => option.key == newData[dropdown.name])[0]; @@ -131,7 +127,7 @@ export default class DatabaseMigrations { if(!isEqual(newData, object.data)) return this._db.helpers._updatePadObject(type.type == "line" ? "Line" : "Marker", object.padId, object.id, {data: newData}, true); - }); + } dropdowns.forEach((dropdown) => { if(dropdown.default) { @@ -247,7 +243,7 @@ export default class DatabaseMigrations { allowNull: true }); await queryInterface.bulkUpdate(table, { - pos: Sequelize.fn("POINT", Sequelize.col("lon"), Sequelize.col("lat")) + pos: fn("POINT", col("lon"), col("lat")) }, {}); await queryInterface.changeColumn(table, 'pos', model.rawAttributes.pos); await queryInterface.removeColumn(table, 'lat'); @@ -256,7 +252,7 @@ export default class DatabaseMigrations { // We create the index here even in a non-migration case, because adding it to the model definition will cause an error if the column does not exist yet. const indexes: any = await queryInterface.showIndex(table); - if (!indexes.some((index: any) => index.name == (Sequelize.Utils as any).underscore(`${table}_pos`))) + if (!indexes.some((index: any) => index.name == (Utils as any).underscore(`${table}_pos`))) await queryInterface.addIndex(table, { fields: ["pos"], type: "SPATIAL" }); } } diff --git a/server/src/database/pad.ts b/server/src/database/pad.ts index 8c98f78d..3ccd9d14 100644 --- a/server/src/database/pad.ts +++ b/server/src/database/pad.ts @@ -1,7 +1,6 @@ import { DataTypes, InferAttributes, InferCreationAttributes, Model, Op, Sequelize } from "sequelize"; import { FindPadsQuery, FindPadsResult, PadData, PadDataCreate, PadDataUpdate, PadId, PagedResults } from "facilmap-types"; import Database from "./database.js"; -import { streamEachPromise } from "../utils/streams.js"; import { createModel } from "./helpers.js"; export interface PadModel extends Model, InferCreationAttributes> { @@ -159,21 +158,21 @@ export default class DatabasePads { await this.updatePadData(padData.id, { defaultViewId: null }); } - await streamEachPromise(this._db.markers.getPadMarkers(padData.id), async (marker) => { + for await (const marker of this._db.markers.getPadMarkers(padData.id)) { await this._db.markers.deleteMarker(padData.id, marker.id); - }); + } - await streamEachPromise(this._db.lines.getPadLines(padData.id, ['id']), async (line) => { + for await (const line of this._db.lines.getPadLines(padData.id, ['id'])) { await this._db.lines.deleteLine(padData.id, line.id); - }); + } - await streamEachPromise(this._db.types.getTypes(padData.id), async (type) => { + for await (const type of this._db.types.getTypes(padData.id)) { await this._db.types.deleteType(padData.id, type.id); - }); + } - await streamEachPromise(this._db.views.getViews(padData.id), async (view) => { + for await (const view of this._db.views.getViews(padData.id)) { await this._db.views.deleteView(padData.id, view.id); - }); + } await this._db.history.clearHistory(padData.id); diff --git a/server/src/database/route.ts b/server/src/database/route.ts index 6b7f9c51..f7557de1 100644 --- a/server/src/database/route.ts +++ b/server/src/database/route.ts @@ -1,5 +1,5 @@ import { generateRandomId } from "../utils/utils.js"; -import { DataTypes, InferAttributes, InferCreationAttributes, Model, Op, WhereOptions, WhereOptions } from "sequelize"; +import { DataTypes, InferAttributes, InferCreationAttributes, Model, Op, WhereOptions } from "sequelize"; import Database from "./database.js"; import { BboxWithZoom, ID, Latitude, Longitude, PadId, Point, Route, RouteMode, TrackPoint } from "facilmap-types"; import { BboxWithExcept, createModel, getPosType, getVirtualLatType, getVirtualLonType, makeBboxCondition } from "./helpers.js"; diff --git a/server/src/database/search.ts b/server/src/database/search.ts index 34e9dc57..f93935db 100644 --- a/server/src/database/search.ts +++ b/server/src/database/search.ts @@ -1,11 +1,9 @@ import { FindOnMapResult, PadId } from "facilmap-types"; -import Sequelize, { ModelCtor } from "sequelize"; +import { ModelStatic, Op, and, col, fn, where } from "sequelize"; import Database from "./database.js"; import { LineModel } from "./line.js"; import { MarkerModel } from "./marker.js"; -import similarity from "string-similarity"; - -const Op = Sequelize.Op; +import { compareTwoStrings } from "string-similarity"; export default class DatabaseSearch { @@ -17,11 +15,11 @@ export default class DatabaseSearch { async search(padId: PadId, searchText: string): Promise> { const objects = (await Promise.all([ "Marker", "Line" ].map(async (kind) => { - const model = this._db._conn.model(kind) as ModelCtor; + const model = this._db._conn.model(kind) as ModelStatic; const objs = await model.findAll({ - where: Sequelize.and( + where: and( { padId }, - Sequelize.where(Sequelize.fn("lower", Sequelize.col(`${kind}.name`)), {[Op.like]: `%${searchText.toLowerCase()}%`}) + where(fn("lower", col(`${kind}.name`)), {[Op.like]: `%${searchText.toLowerCase()}%`}) ), attributes: [ "id", "name", "typeId" ].concat(kind == "Marker" ? [ "pos", "lat", "lon", "symbol" ] : [ "top", "left", "bottom", "right" ]) }); @@ -29,7 +27,7 @@ export default class DatabaseSearch { return objs.map((obj) => ({ ...obj.toJSON(), kind: kind.toLowerCase() as any, - similarity: similarity.compareTwoStrings(searchText, obj.name ?? '') + similarity: compareTwoStrings(searchText, obj.name ?? '') })); }))).flat(); diff --git a/server/src/database/type.ts b/server/src/database/type.ts index 6d6c1972..a9557fe7 100644 --- a/server/src/database/type.ts +++ b/server/src/database/type.ts @@ -1,4 +1,4 @@ -import Sequelize, { CreationOptional, ForeignKey, InferAttributes, InferCreationAttributes, Model } from "sequelize"; +import { CreationOptional, DataTypes, ForeignKey, InferAttributes, InferCreationAttributes, Model } from "sequelize"; import { Field, ID, PadId, Type, TypeCreate, TypeUpdate } from "facilmap-types"; import Database from "./database.js"; import { createModel, getDefaultIdType, makeNotNullForeignKey, validateColour } from "./helpers.js"; @@ -42,24 +42,24 @@ export default class DatabaseTypes { this.TypeModel.init({ id: getDefaultIdType(), - name: { type: Sequelize.TEXT, allowNull: false }, - type: { type: Sequelize.ENUM("marker", "line"), allowNull: false }, - defaultColour: { type: Sequelize.STRING(6), allowNull: true, validate: validateColour }, - colourFixed: { type: Sequelize.BOOLEAN, allowNull: true }, - defaultSize: { type: Sequelize.INTEGER.UNSIGNED, allowNull: true, validate: { min: 15 } }, - sizeFixed: { type: Sequelize.BOOLEAN, allowNull: true }, - defaultSymbol: { type: Sequelize.TEXT, allowNull: true}, - symbolFixed: { type: Sequelize.BOOLEAN, allowNull: true}, - defaultShape: { type: Sequelize.TEXT, allowNull: true }, - shapeFixed: { type: Sequelize.BOOLEAN, allowNull: true }, - defaultWidth: { type: Sequelize.INTEGER.UNSIGNED, allowNull: true, validate: { min: 1 } }, - widthFixed: { type: Sequelize.BOOLEAN, allowNull: true }, - defaultMode: { type: Sequelize.TEXT, allowNull: true }, - modeFixed: { type: Sequelize.BOOLEAN, allowNull: true }, - showInLegend: { type: Sequelize.BOOLEAN, allowNull: true }, + name: { type: DataTypes.TEXT, allowNull: false }, + type: { type: DataTypes.ENUM("marker", "line"), allowNull: false }, + defaultColour: { type: DataTypes.STRING(6), allowNull: true, validate: validateColour }, + colourFixed: { type: DataTypes.BOOLEAN, allowNull: true }, + defaultSize: { type: DataTypes.INTEGER.UNSIGNED, allowNull: true, validate: { min: 15 } }, + sizeFixed: { type: DataTypes.BOOLEAN, allowNull: true }, + defaultSymbol: { type: DataTypes.TEXT, allowNull: true}, + symbolFixed: { type: DataTypes.BOOLEAN, allowNull: true}, + defaultShape: { type: DataTypes.TEXT, allowNull: true }, + shapeFixed: { type: DataTypes.BOOLEAN, allowNull: true }, + defaultWidth: { type: DataTypes.INTEGER.UNSIGNED, allowNull: true, validate: { min: 1 } }, + widthFixed: { type: DataTypes.BOOLEAN, allowNull: true }, + defaultMode: { type: DataTypes.TEXT, allowNull: true }, + modeFixed: { type: DataTypes.BOOLEAN, allowNull: true }, + showInLegend: { type: DataTypes.BOOLEAN, allowNull: true }, fields: { - type: Sequelize.TEXT, + type: DataTypes.TEXT, allowNull: false, get: function(this: TypeModel) { const fields = this.getDataValue("fields") as any as string; @@ -165,7 +165,7 @@ export default class DatabaseTypes { PadModel.hasMany(this.TypeModel, { foreignKey: "padId" }); } - getTypes(padId: PadId): Highland.Stream { + getTypes(padId: PadId): AsyncGenerator { return this._db.helpers._getPadObjects("Type", padId); } diff --git a/server/src/database/view.ts b/server/src/database/view.ts index a0e142db..8c4c4ace 100644 --- a/server/src/database/view.ts +++ b/server/src/database/view.ts @@ -57,7 +57,7 @@ export default class DatabaseViews { this._db.pads.PadModel.hasMany(this.ViewModel, { foreignKey: "padId" }); } - getViews(padId: PadId): Highland.Stream { + getViews(padId: PadId): AsyncGenerator { return this._db.helpers._getPadObjects("View", padId); } diff --git a/server/src/export/geojson.ts b/server/src/export/geojson.ts index 3cdca332..98caae5c 100644 --- a/server/src/export/geojson.ts +++ b/server/src/export/geojson.ts @@ -1,4 +1,4 @@ -import { jsonStream, streamToArrayPromise, toStream } from "../utils/streams.js"; +import { jsonStream, asyncIteratorToArray, toStream } from "../utils/streams.js"; import { clone } from "../utils/utils.js"; import { compileExpression } from "facilmap-utils"; import { Marker, MarkerFeature, LineFeature, PadId } from "facilmap-types"; @@ -18,7 +18,7 @@ export function exportGeoJson(database: Database, padId: PadId, filter?: string) const views = database.views.getViews(padId) .map((view) => omit(view, ["id", "padId"])); - const types = keyBy(await streamToArrayPromise(database.types.getTypes(padId)), "id"); + const types = keyBy(await asyncIteratorToArray(database.types.getTypes(padId)), "id"); const markers = database.markers.getPadMarkers(padId) .filter((marker) => filterFunc(marker, types[marker.typeId])) diff --git a/server/src/export/gpx.ts b/server/src/export/gpx.ts index 773db72b..fc7b0a48 100644 --- a/server/src/export/gpx.ts +++ b/server/src/export/gpx.ts @@ -1,11 +1,10 @@ -import { streamToArrayPromise, toStream } from "../utils/streams.js"; +import { asyncIteratorToArray, asyncIteratorToStream } from "../utils/streams.js"; import { compile } from "ejs"; import Database from "../database/database.js"; import { Field, PadId, Type } from "facilmap-types"; import { compileExpression, quoteHtml } from "facilmap-utils"; import { LineWithTrackPoints } from "../database/line.js"; import { keyBy } from "lodash-es"; -import highland from "highland"; import gpxLineEjs from "./gpx-line.ejs?raw"; const lineTemplate = compile(gpxLineEjs); @@ -21,55 +20,68 @@ function dataToText(fields: Field[], data: Record) { return text.join('\n\n'); } -export function exportGpx(database: Database, padId: PadId, useTracks: boolean, filter?: string): Highland.Stream { - return toStream(async () => { +export function exportGpx(database: Database, padId: PadId, useTracks: boolean, filter?: string): ReadableStream { + return asyncIteratorToStream((async function* () { const filterFunc = compileExpression(filter); const [padData, types] = await Promise.all([ database.pads.getPadData(padId), - streamToArrayPromise(database.types.getTypes(padId)).then((types) => keyBy(types, 'id')) + asyncIteratorToArray(database.types.getTypes(padId)).then((types) => keyBy(types, 'id')) ]); if (!padData) throw new Error(`Pad ${padId} could not be found.`); - const markers = database.markers.getPadMarkers(padId).filter((marker) => filterFunc(marker, types[marker.typeId])); - const lines = database.lines.getPadLinesWithPoints(padId).filter((line) => filterFunc(line, types[line.typeId])); - - return highland([ + yield ( `\n` + `\n` + `\t\n` + `\t\t${quoteHtml(padData.name)}\n` + `\t\t\n` + `\t\n` - ]).concat(markers.map((marker) => ( - `\t\n` + - `\t\t${quoteHtml(marker.name)}\n` + - `\t\t${quoteHtml(dataToText(types[marker.typeId].fields, marker.data))}\n` + - `\t\n` - ))).concat(lines.map((line) => ((useTracks || line.mode == "track") ? ( - `\t\n` + - `\t\t${quoteHtml(line.name)}\n` + - `\t\t${dataToText(types[line.typeId].fields, line.data)}\n` + - `\t\t\n` + - line.trackPoints.map((trackPoint) => ( - `\t\t\t\n` - )).join("") + - `\t\t\n` + - `\t\n` - ) : ( - `\t\n` + - `\t\t${quoteHtml(line.name)}\n` + - `\t\t${quoteHtml(dataToText(types[line.typeId].fields, line.data))}\n` + - line.routePoints.map((routePoint) => ( - `\t\t\n` - )).join("") + - `\t\n` - )))).concat([ - `` - ]); - }).flatten(); + ); + + for await (const marker of database.markers.getPadMarkers(padId)) { + if (filterFunc(marker, types[marker.typeId])) { + yield ( + `\t\n` + + `\t\t${quoteHtml(marker.name)}\n` + + `\t\t${quoteHtml(dataToText(types[marker.typeId].fields, marker.data))}\n` + + `\t\n` + ); + } + } + + for await (const line of database.lines.getPadLinesWithPoints(padId)) { + if (filterFunc(line, types[line.typeId])) { + if (useTracks || line.mode == "track") { + yield ( + `\t\n` + + `\t\t${quoteHtml(line.name)}\n` + + `\t\t${dataToText(types[line.typeId].fields, line.data)}\n` + + `\t\t\n` + + line.trackPoints.map((trackPoint) => ( + `\t\t\t\n` + )).join("") + + `\t\t\n` + + `\t\n` + ); + } else { + yield ( + `\t\n` + + `\t\t${quoteHtml(line.name)}\n` + + `\t\t${quoteHtml(dataToText(types[line.typeId].fields, line.data))}\n` + + line.routePoints.map((routePoint) => ( + `\t\t\n` + )).join("") + + `\t\n` + ); + } + } + } + + yield ``; + })()); } type LineForExport = Partial>; diff --git a/server/src/export/table.ts b/server/src/export/table.ts index efc99a48..a55647d8 100644 --- a/server/src/export/table.ts +++ b/server/src/export/table.ts @@ -1,6 +1,5 @@ -import { streamEachPromise } from "../utils/streams.js"; import { promiseAuto } from "../utils/utils.js"; -import ejs from "ejs"; +import { render } from "ejs"; import { ID, Line, Marker, PadId, Type } from "facilmap-types"; import { compileExpression } from "facilmap-utils"; import * as utils from "facilmap-utils"; @@ -21,28 +20,28 @@ export function createTable(database: Database, padId: PadId, filter: string | u types: async () => { const types = { } as Record; - await streamEachPromise(database.types.getTypes(padId), (type: Type) => { + for await (const type of database.types.getTypes(padId)) { types[type.id] = { ...type, markers: [], lines: [] }; - }); + } return types; }, - markers: (types) => { - return streamEachPromise(database.markers.getPadMarkers(padId), (marker: Marker) => { + markers: async (types) => { + for await (const marker of database.markers.getPadMarkers(padId)) { if(filterFunc(marker, types[marker.typeId])) types[marker.typeId].markers.push(marker); - }); + } }, - lines: (types) => { - return streamEachPromise(database.lines.getPadLines(padId), (line: Line) => { + lines: async (types) => { + for await (const line of database.lines.getPadLines(padId)) { if(filterFunc(line, types[line.typeId])) types[line.typeId].lines.push(line); - }); + } }, template: readFile(paths.tableEjs).then((t) => t.toString()) @@ -52,7 +51,7 @@ export function createTable(database: Database, padId: PadId, filter: string | u delete results.types[i]; } - return ejs.render(results.template, { + return render(results.template, { padData: results.padData, types: results.types, utils, diff --git a/server/src/search.ts b/server/src/search.ts index b3ed81c1..fe068791 100644 --- a/server/src/search.ts +++ b/server/src/search.ts @@ -464,7 +464,7 @@ function _formatAddress(result: NominatimResult) { }; } -async function _loadUrl(url: string, completeOsmObjects = false) { +async function _loadUrl(url: string, completeOsmObjects = false): Promise { let bodyBuf = await fetch( url, { @@ -472,7 +472,7 @@ async function _loadUrl(url: string, completeOsmObjects = false) { "User-Agent": config.userAgent } } - ).then((res) => res.buffer()); + ).then(async (res) => Buffer.from(await res.arrayBuffer())); if(!bodyBuf) throw new Error("Invalid response from server."); diff --git a/server/src/socket.ts b/server/src/socket.ts index ff899123..0f140aea 100644 --- a/server/src/socket.ts +++ b/server/src/socket.ts @@ -1,5 +1,5 @@ import { promiseProps, stripObject } from "./utils/utils.js"; -import { streamToArrayPromise } from "./utils/streams.js"; +import { asyncIteratorToArray } from "./utils/streams.js"; import { isInBbox } from "./utils/geo.js"; import { Server, Socket as SocketIO } from "socket.io"; import domain from "domain"; @@ -114,15 +114,15 @@ class SocketConnection { getPadObjects(padData: PadData) { const promises: MultipleEventPromises = { padData: [ padData ], - view: streamToArrayPromise(this.database.views.getViews(padData.id)), - type: streamToArrayPromise(this.database.types.getTypes(padData.id)), - line: streamToArrayPromise(this.database.lines.getPadLines(padData.id)) + view: asyncIteratorToArray(this.database.views.getViews(padData.id)), + type: asyncIteratorToArray(this.database.types.getTypes(padData.id)), + line: asyncIteratorToArray(this.database.lines.getPadLines(padData.id)) }; if(this.bbox) { // In case bbox is set while fetching pad data Object.assign(promises, { - marker: streamToArrayPromise(this.database.markers.getPadMarkers(padData.id, this.bbox)), - linePoints: streamToArrayPromise(this.database.lines.getLinePointsForPad(padData.id, this.bbox)) + marker: asyncIteratorToArray(this.database.markers.getPadMarkers(padData.id, this.bbox)), + linePoints: asyncIteratorToArray(this.database.lines.getLinePointsForPad(padData.id, this.bbox)) }); } @@ -198,8 +198,8 @@ class SocketConnection { const ret: MultipleEventPromises = {}; if(this.padId && this.padId !== true) { - ret.marker = streamToArrayPromise(this.database.markers.getPadMarkers(this.padId, bboxWithExcept)); - ret.linePoints = streamToArrayPromise(this.database.lines.getLinePointsForPad(this.padId, bboxWithExcept)); + ret.marker = asyncIteratorToArray(this.database.markers.getPadMarkers(this.padId, bboxWithExcept)); + ret.linePoints = asyncIteratorToArray(this.database.lines.getLinePointsForPad(this.padId, bboxWithExcept)); } if(this.route) ret.routePoints = this.database.routes.getRoutePoints(this.route.id, bboxWithExcept, !bboxWithExcept.except).then((points) => ([points])); @@ -805,7 +805,7 @@ class SocketConnection { }); return promiseProps({ - history: streamToArrayPromise(this.database.history.getHistory(this.padId, this.writable == Writable.ADMIN ? undefined : ["Marker", "Line"])) + history: asyncIteratorToArray(this.database.history.getHistory(this.padId, this.writable == Writable.ADMIN ? undefined : ["Marker", "Line"])) }); }, @@ -841,7 +841,7 @@ class SocketConnection { } return promiseProps({ - history: streamToArrayPromise(this.database.history.getHistory(this.padId, this.writable == Writable.ADMIN ? undefined : ["Marker", "Line"])) + history: asyncIteratorToArray(this.database.history.getHistory(this.padId, this.writable == Writable.ADMIN ? undefined : ["Marker", "Line"])) }); }, diff --git a/server/src/utils/streams.ts b/server/src/utils/streams.ts index 5c415a0e..aba6eb93 100644 --- a/server/src/utils/streams.ts +++ b/server/src/utils/streams.ts @@ -3,33 +3,33 @@ import highland from "highland"; import jsonFormat from "json-format"; -export function streamEachPromise(stream: Highland.Stream, handle: (item: T) => Promise | void): Promise { - return new Promise((resolve, reject) => { - stream - .flatMap((item) => highland(Promise.resolve(handle(item as T)))) - .stopOnError(reject) - .done(resolve); +export async function asyncIteratorToArray(iterator: AsyncGenerator): Promise> { + const result: T[] = []; + for await (const it of iterator) { + result.push(it); + } + return result; +} + +export async function* arrayToAsyncIterator(array: T[]): AsyncGenerator { + for (const it of array) { + yield it; + } +} + +export function asyncIteratorToStream(iterator: AsyncGenerator): ReadableStream { + return new ReadableStream({ + async pull(controller) { + const { value, done } = await iterator.next(); + if (done) { + controller.close(); + } else { + controller.enqueue(value); + } + }, }); } -export function streamToArrayPromise(stream: Highland.Stream): Promise> { - return new Promise((resolve, reject) => { - stream - .stopOnError(reject) - .toArray(resolve) - }) -} - -export function wrapAsync(func: (...args: A) => Promise): (...args: A) => Highland.Stream { - return (...args: A) => { - return highland(func(...args)); - }; -} - -export function toStream(func: () => Promise): Highland.Stream { - return highland(func()); -} - export function jsonStream(template: any, data: Record | any>): Highland.Stream { let lastIndent = ''; diff --git a/server/src/webserver.ts b/server/src/webserver.ts index 6190f9ec..4904ee13 100644 --- a/server/src/webserver.ts +++ b/server/src/webserver.ts @@ -11,6 +11,7 @@ import { exportGpx } from "./export/gpx.js"; import domainMiddleware from "express-domain-middleware"; import { paths, serve } from "facilmap-frontend/build.js"; import { Manifest } from "vite"; +import { Writable } from "stream"; const isDevMode = !!process.env.FM_DEV; @@ -123,7 +124,7 @@ export async function initWebserver(database: Database, port: number, host?: str res.set("Content-type", "application/gpx+xml"); res.attachment(padData.name.replace(/[\\/:*?"<>|]+/g, '_') + ".gpx"); - exportGpx(database, padData ? padData.id : req.params.padId, req.query.useTracks == "1", req.query.filter as string | undefined).pipe(res); + exportGpx(database, padData ? padData.id : req.params.padId, req.query.useTracks == "1", req.query.filter as string | undefined).pipeTo(Writable.toWeb(res)); } catch (e) { next(e); } diff --git a/server/vite.config.ts b/server/vite.config.ts index 2d6146ea..09a31b9c 100644 --- a/server/vite.config.ts +++ b/server/vite.config.ts @@ -16,9 +16,6 @@ export default defineConfig({ name: 'facilmap-server', fileName: () => 'facilmap-server.mjs', formats: ['es'] - }, - rollupOptions: { - //external: ["canvas", "pg-hstore"] } } });