kopia lustrzana https://github.com/kartoza/docker-postgis
179 wiersze
5.7 KiB
Python
179 wiersze
5.7 KiB
Python
from time import sleep
|
|
|
|
import psycopg2
|
|
import unittest
|
|
from utils.utils import DBConnection
|
|
|
|
|
|
class TestReplicationPublisher(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.db = DBConnection()
|
|
|
|
def test_create_new_data(self):
|
|
# create new table
|
|
self.db.conn.autocommit = True
|
|
with self.db.cursor() as c:
|
|
c.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS block (
|
|
id serial NOT NULL primary key,
|
|
geom geometry(Point,4326),
|
|
tile_name character varying,
|
|
location character varying
|
|
);
|
|
"""
|
|
)
|
|
|
|
# Add table to publication if it doesn't included already
|
|
# Using PL/PGSQL
|
|
c.execute(
|
|
"""
|
|
do
|
|
$$
|
|
begin
|
|
if not exists(
|
|
select * from pg_publication_tables
|
|
where tablename = 'block'
|
|
and pubname = 'logical_replication') then
|
|
alter publication logical_replication add table block;
|
|
end if;
|
|
end;
|
|
$$
|
|
"""
|
|
)
|
|
|
|
c.execute(
|
|
"""
|
|
INSERT INTO block (id, geom, tile_name, location)
|
|
VALUES
|
|
(
|
|
1,
|
|
st_setsrid(st_makepoint(107.6097, 6.9120),4326),
|
|
'2956BC',
|
|
'Oceanic'
|
|
) ON CONFLICT (id) DO NOTHING;
|
|
"""
|
|
)
|
|
|
|
|
|
class TestReplicationSubscriber(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.db = DBConnection()
|
|
|
|
@classmethod
|
|
def assert_in_loop(
|
|
cls, func_action, func_assert,
|
|
back_off_limit=5, base_seconds=2, const_seconds=5):
|
|
retry = 0
|
|
last_error = None
|
|
while retry < back_off_limit:
|
|
try:
|
|
output = func_action()
|
|
func_assert(output)
|
|
print('Assertion success')
|
|
return
|
|
except Exception as e:
|
|
last_error = e
|
|
print(e)
|
|
retry += 1
|
|
print('Retry [{}]. Attempting to try again later.'.format(
|
|
retry))
|
|
sleep(const_seconds + base_seconds ** retry)
|
|
raise last_error
|
|
|
|
def test_read_data(self):
|
|
# create new table
|
|
self.db.conn.autocommit = True
|
|
with self.db.cursor() as c:
|
|
c.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS public.block (
|
|
id serial NOT NULL primary key ,
|
|
geom public.geometry(Point,4326),
|
|
fid bigint,
|
|
tile_name character varying,
|
|
location character varying
|
|
);
|
|
"""
|
|
)
|
|
|
|
# Hardcoded because the replication is setup using manual query
|
|
publisher_conn_string = 'host=pg-publisher port=5432 ' \
|
|
'password=docker user=docker dbname=gis'
|
|
|
|
# Subscribe to the table that is published if it doesn't
|
|
# subscribed already
|
|
try:
|
|
# Apparently create subscription cannot run inside
|
|
# a transaction block.
|
|
# So we run it without transaction block
|
|
c.execute(
|
|
f"""
|
|
create subscription logical_subscription
|
|
connection '{publisher_conn_string}'
|
|
publication logical_replication
|
|
"""
|
|
)
|
|
# Make sure that new changes are replicated immediately.
|
|
c.execute(
|
|
"""
|
|
alter subscription logical_subscription
|
|
refresh publication
|
|
"""
|
|
)
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
# We don't know when the changes are sync'd so we loop the test
|
|
# Testing insertion sync
|
|
print('Insertion sync test')
|
|
self.assert_in_loop(
|
|
lambda : c.execute(
|
|
"""
|
|
SELECT * FROM block
|
|
"""
|
|
) or c.fetchall(),
|
|
lambda _rows: self.assertEqual(len(_rows), 1)
|
|
)
|
|
|
|
# Testing update sync
|
|
print('Update sync test')
|
|
publisher_conn = psycopg2.connect(publisher_conn_string)
|
|
publisher_conn.autocommit = True
|
|
with publisher_conn.cursor() as publisher_c:
|
|
publisher_c.execute(
|
|
"""
|
|
UPDATE block set location = 'Oceanic territory'
|
|
WHERE id = 1
|
|
"""
|
|
)
|
|
self.assert_in_loop(
|
|
lambda : c.execute(
|
|
"""
|
|
SELECT location FROM block WHERE id = 1;
|
|
"""
|
|
) or c.fetchone(),
|
|
lambda _rows: self.assertEqual(
|
|
_rows[0], 'Oceanic territory')
|
|
)
|
|
|
|
# Testing delete sync
|
|
print('Delete sync test')
|
|
with publisher_conn.cursor() as publisher_c:
|
|
publisher_c.execute(
|
|
"""
|
|
DELETE FROM block WHERE id = 1
|
|
"""
|
|
)
|
|
|
|
self.assert_in_loop(
|
|
lambda: c.execute(
|
|
"""
|
|
SELECT * FROM block
|
|
"""
|
|
) or c.fetchall(),
|
|
lambda _rows: self.assertEqual(len(_rows), 0)
|
|
)
|