schema-ts-psql.sql
12.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
--
-- Copyright © 2016-2020 The Thingsboard Authors
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
CREATE TABLE IF NOT EXISTS ts_kv
(
entity_id uuid NOT NULL,
key int NOT NULL,
ts bigint NOT NULL,
bool_v boolean,
str_v varchar(10000000),
long_v bigint,
dbl_v double precision,
json_v json,
CONSTRAINT ts_kv_pkey PRIMARY KEY (entity_id, key, ts)
) PARTITION BY RANGE (ts);
CREATE TABLE IF NOT EXISTS ts_kv_latest
(
entity_id uuid NOT NULL,
key int NOT NULL,
ts bigint NOT NULL,
bool_v boolean,
str_v varchar(10000000),
long_v bigint,
dbl_v double precision,
json_v json,
CONSTRAINT ts_kv_latest_pkey PRIMARY KEY (entity_id, key)
);
CREATE TABLE IF NOT EXISTS ts_kv_dictionary
(
key varchar(255) NOT NULL,
key_id serial UNIQUE,
CONSTRAINT ts_key_id_pkey PRIMARY KEY (key)
);
CREATE TABLE IF NOT EXISTS tb_schema_settings
(
schema_version bigint NOT NULL,
CONSTRAINT tb_schema_settings_pkey PRIMARY KEY (schema_version)
);
INSERT INTO tb_schema_settings (schema_version) VALUES (2005001) ON CONFLICT (schema_version) DO UPDATE SET schema_version = 2005001;
CREATE OR REPLACE PROCEDURE drop_partitions_by_max_ttl(IN partition_type varchar, IN system_ttl bigint, INOUT deleted bigint)
LANGUAGE plpgsql AS
$$
DECLARE
max_tenant_ttl bigint;
max_customer_ttl bigint;
max_ttl bigint;
date timestamp;
partition_by_max_ttl_date varchar;
partition_month varchar;
partition_day varchar;
partition_year varchar;
partition varchar;
partition_to_delete varchar;
BEGIN
SELECT max(attribute_kv.long_v)
FROM tenant
INNER JOIN attribute_kv ON tenant.id = attribute_kv.entity_id
WHERE attribute_kv.attribute_key = 'TTL'
into max_tenant_ttl;
SELECT max(attribute_kv.long_v)
FROM customer
INNER JOIN attribute_kv ON customer.id = attribute_kv.entity_id
WHERE attribute_kv.attribute_key = 'TTL'
into max_customer_ttl;
max_ttl := GREATEST(system_ttl, max_customer_ttl, max_tenant_ttl);
if max_ttl IS NOT NULL AND max_ttl > 0 THEN
date := to_timestamp(EXTRACT(EPOCH FROM current_timestamp) - (max_ttl / 1000));
partition_by_max_ttl_date := get_partition_by_max_ttl_date(partition_type, date);
RAISE NOTICE 'Partition by max ttl: %', partition_by_max_ttl_date;
IF partition_by_max_ttl_date IS NOT NULL THEN
CASE
WHEN partition_type = 'DAYS' THEN
partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
partition_day := SPLIT_PART(partition_by_max_ttl_date, '_', 5);
WHEN partition_type = 'MONTHS' THEN
partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
partition_month := SPLIT_PART(partition_by_max_ttl_date, '_', 4);
ELSE
partition_year := SPLIT_PART(partition_by_max_ttl_date, '_', 3);
END CASE;
FOR partition IN SELECT tablename
FROM pg_tables
WHERE schemaname = 'public'
AND tablename like 'ts_kv_' || '%'
AND tablename != 'ts_kv_latest'
AND tablename != 'ts_kv_dictionary'
LOOP
IF partition != partition_by_max_ttl_date THEN
IF partition_year IS NOT NULL THEN
IF SPLIT_PART(partition, '_', 3)::integer < partition_year::integer THEN
partition_to_delete := partition;
ELSE
IF partition_month IS NOT NULL THEN
IF SPLIT_PART(partition, '_', 4)::integer < partition_month::integer THEN
partition_to_delete := partition;
ELSE
IF partition_day IS NOT NULL THEN
IF SPLIT_PART(partition, '_', 5)::integer < partition_day::integer THEN
partition_to_delete := partition;
END IF;
END IF;
END IF;
END IF;
END IF;
END IF;
END IF;
IF partition_to_delete IS NOT NULL THEN
RAISE NOTICE 'Partition to delete by max ttl: %', partition_to_delete;
EXECUTE format('DROP TABLE %I', partition_to_delete);
deleted := deleted + 1;
END IF;
END LOOP;
END IF;
END IF;
END
$$;
CREATE OR REPLACE FUNCTION get_partition_by_max_ttl_date(IN partition_type varchar, IN date timestamp, OUT partition varchar) AS
$$
BEGIN
CASE
WHEN partition_type = 'DAYS' THEN
partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM') || '_' || to_char(date, 'dd');
WHEN partition_type = 'MONTHS' THEN
partition := 'ts_kv_' || to_char(date, 'yyyy') || '_' || to_char(date, 'MM');
WHEN partition_type = 'YEARS' THEN
partition := 'ts_kv_' || to_char(date, 'yyyy');
WHEN partition_type = 'INDEFINITE' THEN
partition := NULL;
ELSE
partition := NULL;
END CASE;
IF partition IS NOT NULL THEN
IF NOT EXISTS(SELECT
FROM pg_tables
WHERE schemaname = 'public'
AND tablename = partition) THEN
partition := NULL;
RAISE NOTICE 'Failed to found partition by ttl';
END IF;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION to_uuid(IN entity_id varchar, OUT uuid_id uuid) AS
$$
BEGIN
uuid_id := substring(entity_id, 8, 8) || '-' || substring(entity_id, 4, 4) || '-1' || substring(entity_id, 1, 3) ||
'-' || substring(entity_id, 16, 4) || '-' || substring(entity_id, 20, 12);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION delete_device_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint,
OUT deleted bigint) AS
$$
BEGIN
EXECUTE format(
'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(device.id) as entity_id FROM device WHERE tenant_id = %L and customer_id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted',
tenant_id, customer_id, ttl) into deleted;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION delete_asset_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint,
OUT deleted bigint) AS
$$
BEGIN
EXECUTE format(
'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(asset.id) as entity_id FROM asset WHERE tenant_id = %L and customer_id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted',
tenant_id, customer_id, ttl) into deleted;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION delete_customer_records_from_ts_kv(tenant_id varchar, customer_id varchar, ttl bigint,
OUT deleted bigint) AS
$$
BEGIN
EXECUTE format(
'WITH deleted AS (DELETE FROM ts_kv WHERE entity_id IN (SELECT to_uuid(customer.id) as entity_id FROM customer WHERE tenant_id = %L and id = %L) AND ts < %L::bigint RETURNING *) SELECT count(*) FROM deleted',
tenant_id, customer_id, ttl) into deleted;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE PROCEDURE cleanup_timeseries_by_ttl(IN null_uuid varchar(31),
IN system_ttl bigint, INOUT deleted bigint)
LANGUAGE plpgsql AS
$$
DECLARE
tenant_cursor CURSOR FOR select tenant.id as tenant_id
from tenant;
tenant_id_record varchar;
customer_id_record varchar;
tenant_ttl bigint;
customer_ttl bigint;
deleted_for_entities bigint;
tenant_ttl_ts bigint;
customer_ttl_ts bigint;
BEGIN
OPEN tenant_cursor;
FETCH tenant_cursor INTO tenant_id_record;
WHILE FOUND
LOOP
EXECUTE format(
'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L',
tenant_id_record, 'TTL') INTO tenant_ttl;
if tenant_ttl IS NULL THEN
tenant_ttl := system_ttl;
END IF;
IF tenant_ttl > 0 THEN
tenant_ttl_ts := (EXTRACT(EPOCH FROM current_timestamp) * 1000 - tenant_ttl::bigint * 1000)::bigint;
deleted_for_entities := delete_device_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts);
deleted := deleted + deleted_for_entities;
RAISE NOTICE '% telemetry removed for devices where tenant_id = %', deleted_for_entities, tenant_id_record;
deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record, null_uuid, tenant_ttl_ts);
deleted := deleted + deleted_for_entities;
RAISE NOTICE '% telemetry removed for assets where tenant_id = %', deleted_for_entities, tenant_id_record;
END IF;
FOR customer_id_record IN
SELECT customer.id AS customer_id FROM customer WHERE customer.tenant_id = tenant_id_record
LOOP
EXECUTE format(
'select attribute_kv.long_v from attribute_kv where attribute_kv.entity_id = %L and attribute_kv.attribute_key = %L',
customer_id_record, 'TTL') INTO customer_ttl;
IF customer_ttl IS NULL THEN
customer_ttl_ts := tenant_ttl_ts;
ELSE
IF customer_ttl > 0 THEN
customer_ttl_ts :=
(EXTRACT(EPOCH FROM current_timestamp) * 1000 -
customer_ttl::bigint * 1000)::bigint;
END IF;
END IF;
IF customer_ttl_ts IS NOT NULL AND customer_ttl_ts > 0 THEN
deleted_for_entities :=
delete_customer_records_from_ts_kv(tenant_id_record, customer_id_record,
customer_ttl_ts);
deleted := deleted + deleted_for_entities;
RAISE NOTICE '% telemetry removed for customer with id = % where tenant_id = %', deleted_for_entities, customer_id_record, tenant_id_record;
deleted_for_entities :=
delete_device_records_from_ts_kv(tenant_id_record, customer_id_record,
customer_ttl_ts);
deleted := deleted + deleted_for_entities;
RAISE NOTICE '% telemetry removed for devices where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record;
deleted_for_entities := delete_asset_records_from_ts_kv(tenant_id_record,
customer_id_record,
customer_ttl_ts);
deleted := deleted + deleted_for_entities;
RAISE NOTICE '% telemetry removed for assets where tenant_id = % and customer_id = %', deleted_for_entities, tenant_id_record, customer_id_record;
END IF;
END LOOP;
FETCH tenant_cursor INTO tenant_id_record;
END LOOP;
END
$$;