Replace materialized view of water by tables with diff update (#853)

Replacing materialized view by a tables with update from trigger on change only.

Start with the most simple cases.

Just replicate the change on:
* `osm_water_polygon` to `osm_water_lakeline`,
* `osm_water_polygon` to `osm_water_point`.

Use a view to factorize the `osm_water_lakeline` and `osm_water_point_view` definition and reuse it in the trigger.

The update of `osm_important_waterway_linestring` is more complex, as it is a merge of `osm_waterway_linestring`. It not done in the same way. At the end of the transaction we remove impacted and recompute them.

The goal is to update more quickly the content of derivated table by just updating the changing content. It replaces the update of materialized view because their need a full recompute (with lock issue).

Note, an advanced version of differential update over materialized view as already implemented in the building cluster PR #725.

It addresses #814 and a part of #809.
pull/882/head^2
Frédéric Rodrigo 2020-05-20 19:14:22 +02:00 zatwierdzone przez GitHub
rodzic be75e7662f
commit 97216c5c19
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
3 zmienionych plików z 223 dodań i 77 usunięć

Wyświetl plik

@ -1,11 +1,8 @@
DROP TRIGGER IF EXISTS trigger_flag_line ON osm_water_polygon;
DROP TRIGGER IF EXISTS trigger_refresh ON water_lakeline.updates;
DROP TRIGGER IF EXISTS trigger_delete_line ON osm_water_polygon;
DROP TRIGGER IF EXISTS trigger_update_line ON osm_water_polygon;
DROP TRIGGER IF EXISTS trigger_insert_line ON osm_water_polygon;
-- etldoc: osm_water_polygon -> osm_water_lakeline
-- etldoc: lake_centerline -> osm_water_lakeline
DROP MATERIALIZED VIEW IF EXISTS osm_water_lakeline CASCADE;
CREATE MATERIALIZED VIEW osm_water_lakeline AS (
CREATE OR REPLACE VIEW osm_water_lakeline_view AS
SELECT wp.osm_id,
ll.wkb_geometry AS geometry,
name, name_en, name_de,
@ -15,39 +12,67 @@ CREATE MATERIALIZED VIEW osm_water_lakeline AS (
FROM osm_water_polygon AS wp
INNER JOIN lake_centerline ll ON wp.osm_id = ll.osm_id
WHERE wp.name <> '' AND ST_IsValid(wp.geometry)
) /* DELAY_MATERIALIZED_VIEW_CREATION */;
;
-- etldoc: osm_water_polygon -> osm_water_lakeline
-- etldoc: lake_centerline -> osm_water_lakeline
CREATE TABLE IF NOT EXISTS osm_water_lakeline AS
SELECT * FROM osm_water_lakeline_view;
DO $$
BEGIN
ALTER TABLE osm_water_lakeline ADD CONSTRAINT osm_water_lakeline_pk PRIMARY KEY (osm_id);
EXCEPTION WHEN others then
RAISE NOTICE 'primary key osm_water_lakeline_pk already exists in osm_water_lakeline.';
END;
$$;
CREATE INDEX IF NOT EXISTS osm_water_lakeline_geometry_idx ON osm_water_lakeline USING gist(geometry);
-- Handle updates
CREATE SCHEMA IF NOT EXISTS water_lakeline;
CREATE TABLE IF NOT EXISTS water_lakeline.updates(id serial primary key, t text, unique (t));
CREATE OR REPLACE FUNCTION water_lakeline.flag() RETURNS trigger AS $$
CREATE OR REPLACE FUNCTION water_lakeline.delete() RETURNS trigger AS $BODY$
BEGIN
INSERT INTO water_lakeline.updates(t) VALUES ('y') ON CONFLICT(t) DO NOTHING;
DELETE FROM osm_water_lakeline
WHERE osm_water_lakeline.osm_id = OLD.osm_id ;
RETURN null;
END;
$$ language plpgsql;
$BODY$ language plpgsql;
CREATE OR REPLACE FUNCTION water_lakeline.update() RETURNS trigger AS $BODY$
BEGIN
UPDATE osm_water_lakeline
SET (osm_id, geometry, name, name_en, name_de, tags, area, is_intermittent) =
(SELECT * FROM osm_water_lakeline_view WHERE osm_water_lakeline_view.osm_id = NEW.osm_id)
WHERE osm_water_lakeline.osm_id = NEW.osm_id;
CREATE OR REPLACE FUNCTION water_lakeline.refresh() RETURNS trigger AS
$BODY$
BEGIN
RAISE LOG 'Refresh water_lakeline';
REFRESH MATERIALIZED VIEW osm_water_lakeline;
DELETE FROM water_lakeline.updates;
RETURN null;
END;
$BODY$
language plpgsql;
END;
$BODY$ language plpgsql;
CREATE TRIGGER trigger_flag_line
AFTER INSERT OR UPDATE OR DELETE ON osm_water_polygon
FOR EACH STATEMENT
EXECUTE PROCEDURE water_lakeline.flag();
CREATE OR REPLACE FUNCTION water_lakeline.insert() RETURNS trigger AS $BODY$
BEGIN
INSERT INTO osm_water_lakeline
SELECT *
FROM osm_water_lakeline_view
WHERE osm_water_lakeline_view.osm_id = NEW.osm_id;
CREATE CONSTRAINT TRIGGER trigger_refresh
AFTER INSERT ON water_lakeline.updates
INITIALLY DEFERRED
RETURN null;
END;
$BODY$ language plpgsql;
CREATE TRIGGER trigger_delete_line
AFTER DELETE ON osm_water_polygon
FOR EACH ROW
EXECUTE PROCEDURE water_lakeline.refresh();
EXECUTE PROCEDURE water_lakeline.delete();
CREATE TRIGGER trigger_update_line
AFTER UPDATE ON osm_water_polygon
FOR EACH ROW
EXECUTE PROCEDURE water_lakeline.update();
CREATE TRIGGER trigger_insert_line
AFTER INSERT ON osm_water_polygon
FOR EACH ROW
EXECUTE PROCEDURE water_lakeline.insert();

Wyświetl plik

@ -1,11 +1,8 @@
DROP TRIGGER IF EXISTS trigger_flag_point ON osm_water_polygon;
DROP TRIGGER IF EXISTS trigger_refresh ON water_point.updates;
DROP TRIGGER IF EXISTS trigger_delete_point ON osm_water_polygon;
DROP TRIGGER IF EXISTS trigger_update_point ON osm_water_polygon;
DROP TRIGGER IF EXISTS trigger_insert_point ON osm_water_polygon;
-- etldoc: osm_water_polygon -> osm_water_point
-- etldoc: lake_centerline -> osm_water_point
DROP MATERIALIZED VIEW IF EXISTS osm_water_point CASCADE;
CREATE MATERIALIZED VIEW osm_water_point AS (
CREATE OR REPLACE VIEW osm_water_point_view AS
SELECT
wp.osm_id, ST_PointOnSurface(wp.geometry) AS geometry,
wp.name, wp.name_en, wp.name_de,
@ -15,39 +12,67 @@ CREATE MATERIALIZED VIEW osm_water_point AS (
FROM osm_water_polygon AS wp
LEFT JOIN lake_centerline ll ON wp.osm_id = ll.osm_id
WHERE ll.osm_id IS NULL AND wp.name <> ''
) /* DELAY_MATERIALIZED_VIEW_CREATION */;
;
-- etldoc: osm_water_polygon -> osm_water_point
-- etldoc: lake_centerline -> osm_water_point
CREATE TABLE IF NOT EXISTS osm_water_point AS
SELECT * FROM osm_water_point_view;
DO $$
BEGIN
ALTER TABLE osm_water_point ADD CONSTRAINT osm_water_point_pk PRIMARY KEY (osm_id);
EXCEPTION WHEN others then
RAISE NOTICE 'primary key osm_water_point_pk already exists in osm_water_point.';
END;
$$;
CREATE INDEX IF NOT EXISTS osm_water_point_geometry_idx ON osm_water_point USING gist (geometry);
-- Handle updates
CREATE SCHEMA IF NOT EXISTS water_point;
CREATE TABLE IF NOT EXISTS water_point.updates(id serial primary key, t text, unique (t));
CREATE OR REPLACE FUNCTION water_point.flag() RETURNS trigger AS $$
CREATE OR REPLACE FUNCTION water_point.delete() RETURNS trigger AS $BODY$
BEGIN
INSERT INTO water_point.updates(t) VALUES ('y') ON CONFLICT(t) DO NOTHING;
DELETE FROM osm_water_point
WHERE osm_water_point.osm_id = OLD.osm_id ;
RETURN null;
END;
$$ language plpgsql;
$BODY$ language plpgsql;
CREATE OR REPLACE FUNCTION water_point.update() RETURNS trigger AS $BODY$
BEGIN
UPDATE osm_water_point
SET (osm_id, geometry, name, name_en, name_de, tags, area, is_intermittent) =
(SELECT * FROM osm_water_point_view WHERE osm_water_point_view.osm_id = NEW.osm_id)
WHERE osm_water_point.osm_id = NEW.osm_id;
CREATE OR REPLACE FUNCTION water_point.refresh() RETURNS trigger AS
$BODY$
BEGIN
RAISE LOG 'Refresh water_point';
REFRESH MATERIALIZED VIEW osm_water_point;
DELETE FROM water_point.updates;
RETURN null;
END;
$BODY$
language plpgsql;
END;
$BODY$ language plpgsql;
CREATE TRIGGER trigger_flag_point
AFTER INSERT OR UPDATE OR DELETE ON osm_water_polygon
FOR EACH STATEMENT
EXECUTE PROCEDURE water_point.flag();
CREATE OR REPLACE FUNCTION water_point.insert() RETURNS trigger AS $BODY$
BEGIN
INSERT INTO osm_water_point
SELECT *
FROM osm_water_point_view
WHERE osm_water_point_view.osm_id = NEW.osm_id;
CREATE CONSTRAINT TRIGGER trigger_refresh
AFTER INSERT ON water_point.updates
INITIALLY DEFERRED
RETURN null;
END;
$BODY$ language plpgsql;
CREATE TRIGGER trigger_delete_point
AFTER DELETE ON osm_water_polygon
FOR EACH ROW
EXECUTE PROCEDURE water_point.refresh();
EXECUTE PROCEDURE water_point.delete();
CREATE TRIGGER trigger_update_point
AFTER UPDATE ON osm_water_polygon
FOR EACH ROW
EXECUTE PROCEDURE water_point.update();
CREATE TRIGGER trigger_insert_point
AFTER INSERT ON osm_water_polygon
FOR EACH ROW
EXECUTE PROCEDURE water_point.insert();

Wyświetl plik

@ -1,15 +1,11 @@
DROP TRIGGER IF EXISTS trigger_store ON osm_waterway_linestring;
DROP TRIGGER IF EXISTS trigger_flag ON osm_waterway_linestring;
DROP TRIGGER IF EXISTS trigger_refresh ON waterway_important.updates;
-- We merge the waterways by name like the highways
-- This helps to drop not important rivers (since they do not have a name)
-- and also makes it possible to filter out too short rivers
-- etldoc: osm_waterway_linestring -> osm_important_waterway_linestring
DROP MATERIALIZED VIEW IF EXISTS osm_important_waterway_linestring CASCADE;
DROP MATERIALIZED VIEW IF EXISTS osm_important_waterway_linestring_gen1 CASCADE;
DROP MATERIALIZED VIEW IF EXISTS osm_important_waterway_linestring_gen2 CASCADE;
DROP MATERIALIZED VIEW IF EXISTS osm_important_waterway_linestring_gen3 CASCADE;
CREATE INDEX IF NOT EXISTS osm_waterway_linestring_waterway_partial_idx
ON osm_waterway_linestring(waterway)
WHERE waterway = 'river';
@ -18,7 +14,8 @@ CREATE INDEX IF NOT EXISTS osm_waterway_linestring_name_partial_idx
ON osm_waterway_linestring(name)
WHERE name <> '';
CREATE MATERIALIZED VIEW osm_important_waterway_linestring AS (
-- etldoc: osm_waterway_linestring -> osm_important_waterway_linestring
CREATE TABLE IF NOT EXISTS osm_important_waterway_linestring AS
SELECT
(ST_Dump(geometry)).geom AS geometry,
name, name_en, name_de, tags
@ -29,38 +26,72 @@ CREATE MATERIALIZED VIEW osm_important_waterway_linestring AS (
FROM osm_waterway_linestring
WHERE name <> '' AND waterway = 'river' AND ST_IsValid(geometry)
GROUP BY name, name_en, name_de, slice_language_tags(tags)
) AS waterway_union
) /* DELAY_MATERIALIZED_VIEW_CREATION */;
) AS waterway_union;
CREATE INDEX IF NOT EXISTS osm_important_waterway_linestring_names ON osm_important_waterway_linestring(name);
CREATE INDEX IF NOT EXISTS osm_important_waterway_linestring_geometry_idx ON osm_important_waterway_linestring USING gist(geometry);
-- etldoc: osm_important_waterway_linestring -> osm_important_waterway_linestring_gen1
CREATE MATERIALIZED VIEW osm_important_waterway_linestring_gen1 AS (
CREATE OR REPLACE VIEW osm_important_waterway_linestring_gen1_view AS
SELECT ST_Simplify(geometry, 60) AS geometry, name, name_en, name_de, tags
FROM osm_important_waterway_linestring
WHERE ST_Length(geometry) > 1000
) /* DELAY_MATERIALIZED_VIEW_CREATION */;
;
CREATE TABLE IF NOT EXISTS osm_important_waterway_linestring_gen1 AS
SELECT * FROM osm_important_waterway_linestring_gen1_view;
CREATE INDEX IF NOT EXISTS osm_important_waterway_linestring_gen1_name_idx ON osm_important_waterway_linestring_gen1(name);
CREATE INDEX IF NOT EXISTS osm_important_waterway_linestring_gen1_geometry_idx ON osm_important_waterway_linestring_gen1 USING gist(geometry);
-- etldoc: osm_important_waterway_linestring_gen1 -> osm_important_waterway_linestring_gen2
CREATE MATERIALIZED VIEW osm_important_waterway_linestring_gen2 AS (
CREATE OR REPLACE VIEW osm_important_waterway_linestring_gen2_view AS
SELECT ST_Simplify(geometry, 100) AS geometry, name, name_en, name_de, tags
FROM osm_important_waterway_linestring_gen1
WHERE ST_Length(geometry) > 4000
) /* DELAY_MATERIALIZED_VIEW_CREATION */;
;
CREATE TABLE IF NOT EXISTS osm_important_waterway_linestring_gen2 AS
SELECT * FROM osm_important_waterway_linestring_gen2_view;
CREATE INDEX IF NOT EXISTS osm_important_waterway_linestring_gen2_name_idx ON osm_important_waterway_linestring_gen2(name);
CREATE INDEX IF NOT EXISTS osm_important_waterway_linestring_gen2_geometry_idx ON osm_important_waterway_linestring_gen2 USING gist(geometry);
-- etldoc: osm_important_waterway_linestring_gen2 -> osm_important_waterway_linestring_gen3
CREATE MATERIALIZED VIEW osm_important_waterway_linestring_gen3 AS (
CREATE OR REPLACE VIEW osm_important_waterway_linestring_gen3_view AS
SELECT ST_Simplify(geometry, 200) AS geometry, name, name_en, name_de, tags
FROM osm_important_waterway_linestring_gen2
WHERE ST_Length(geometry) > 8000
) /* DELAY_MATERIALIZED_VIEW_CREATION */;
;
CREATE TABLE IF NOT EXISTS osm_important_waterway_linestring_gen3 AS
SELECT * FROM osm_important_waterway_linestring_gen3_view;
CREATE INDEX IF NOT EXISTS osm_important_waterway_linestring_gen3_name_idx ON osm_important_waterway_linestring_gen3(name);
CREATE INDEX IF NOT EXISTS osm_important_waterway_linestring_gen3_geometry_idx ON osm_important_waterway_linestring_gen3 USING gist(geometry);
-- Handle updates
CREATE SCHEMA IF NOT EXISTS waterway_important;
CREATE TABLE IF NOT EXISTS waterway_important.changes(
id serial primary key,
is_old boolean,
name character varying,
name_en character varying,
name_de character varying,
tags hstore,
unique (is_old, name, name_en, name_de, tags)
);
CREATE OR REPLACE FUNCTION waterway_important.store() RETURNS trigger AS $$
BEGIN
IF (TG_OP IN ('DELETE', 'UPDATE')) AND OLD.name <> '' AND OLD.waterway = 'river' THEN
INSERT INTO waterway_important.changes(is_old, name, name_en, name_de, tags)
VALUES (true, OLD.name, OLD.name_en, OLD.name_de, slice_language_tags(OLD.tags))
ON CONFLICT(is_old, name, name_en, name_de, tags) DO NOTHING;
END IF;
IF (TG_OP IN ('UPDATE', 'INSERT')) AND NEW.name <> '' AND NEW.waterway = 'river' THEN
INSERT INTO waterway_important.changes(is_old, name, name_en, name_de, tags)
VALUES (false, NEW.name, NEW.name_en, NEW.name_de, slice_language_tags(NEW.tags))
ON CONFLICT(is_old, name, name_en, name_de, tags) DO NOTHING;
END IF;
RETURN NULL;
END;
$$ language plpgsql;
CREATE TABLE IF NOT EXISTS waterway_important.updates(id serial primary key, t text, unique (t));
CREATE OR REPLACE FUNCTION waterway_important.flag() RETURNS trigger AS $$
BEGIN
@ -73,16 +104,81 @@ CREATE OR REPLACE FUNCTION waterway_important.refresh() RETURNS trigger AS
$BODY$
BEGIN
RAISE LOG 'Refresh waterway';
REFRESH MATERIALIZED VIEW osm_important_waterway_linestring;
REFRESH MATERIALIZED VIEW osm_important_waterway_linestring_gen1;
REFRESH MATERIALIZED VIEW osm_important_waterway_linestring_gen2;
REFRESH MATERIALIZED VIEW osm_important_waterway_linestring_gen3;
-- REFRESH osm_important_waterway_linestring
DELETE FROM osm_important_waterway_linestring AS w
USING waterway_important.changes AS c
WHERE
c.is_old AND
w.name = c.name AND w.name_en IS NOT DISTINCT FROM c.name_en AND w.name_de IS NOT DISTINCT FROM c.name_de AND w.tags IS NOT DISTINCT FROM c.tags;
INSERT INTO osm_important_waterway_linestring
SELECT
(ST_Dump(geometry)).geom AS geometry,
name, name_en, name_de, tags
FROM (
SELECT
ST_LineMerge(ST_Union(geometry)) AS geometry,
w.name, w.name_en, w.name_de, slice_language_tags(w.tags) AS tags
FROM osm_waterway_linestring AS w
JOIN waterway_important.changes AS c ON
w.name = c.name AND w.name_en IS NOT DISTINCT FROM c.name_en AND w.name_de IS NOT DISTINCT FROM c.name_de AND slice_language_tags(w.tags) IS NOT DISTINCT FROM c.tags
WHERE w.name <> '' AND w.waterway = 'river' AND ST_IsValid(geometry) AND
NOT c.is_old
GROUP BY w.name, w.name_en, w.name_de, slice_language_tags(w.tags)
) AS waterway_union;
-- REFRESH sm_important_waterway_linestring_gen1
DELETE FROM osm_important_waterway_linestring_gen1 AS w
USING waterway_important.changes AS c
WHERE
c.is_old AND
w.name = c.name AND w.name_en IS NOT DISTINCT FROM c.name_en AND w.name_de IS NOT DISTINCT FROM c.name_de AND w.tags IS NOT DISTINCT FROM c.tags;
INSERT INTO osm_important_waterway_linestring_gen1
SELECT w.*
FROM osm_important_waterway_linestring_gen1_view AS w
NATURAL JOIN waterway_important.changes AS c
WHERE NOT c.is_old;
-- REFRESH osm_important_waterway_linestring_gen2
DELETE FROM osm_important_waterway_linestring_gen2 AS w
USING waterway_important.changes AS c
WHERE
c.is_old AND
w.name = c.name AND w.name_en IS NOT DISTINCT FROM c.name_en AND w.name_de IS NOT DISTINCT FROM c.name_de AND w.tags IS NOT DISTINCT FROM c.tags;
INSERT INTO osm_important_waterway_linestring_gen2
SELECT w.*
FROM osm_important_waterway_linestring_gen2_view AS w
NATURAL JOIN waterway_important.changes AS c
WHERE NOT c.is_old;
-- REFRESH osm_important_waterway_linestring_gen3
DELETE FROM osm_important_waterway_linestring_gen3 AS w
USING waterway_important.changes AS c
WHERE
c.is_old AND
w.name = c.name AND w.name_en IS NOT DISTINCT FROM c.name_en AND w.name_de IS NOT DISTINCT FROM c.name_de AND w.tags IS NOT DISTINCT FROM c.tags;
INSERT INTO osm_important_waterway_linestring_gen3
SELECT w.*
FROM osm_important_waterway_linestring_gen3_view AS w
NATURAL JOIN waterway_important.changes AS c
WHERE NOT c.is_old;
DELETE FROM waterway_important.changes;
DELETE FROM waterway_important.updates;
RETURN null;
END;
$BODY$
language plpgsql;
CREATE TRIGGER trigger_store
AFTER INSERT OR UPDATE OR DELETE ON osm_waterway_linestring
FOR EACH ROW
EXECUTE PROCEDURE waterway_important.store();
CREATE TRIGGER trigger_flag
AFTER INSERT OR UPDATE OR DELETE ON osm_waterway_linestring
FOR EACH STATEMENT