Postgres/TimescaleDB: Difference between revisions
Appearance
< Postgres
mNo edit summary |
|||
| (64 intermediate revisions by the same user not shown) | |||
| Line 1: | Line 1: | ||
* [[Linux/Postgres|Postgres]] | * [[Linux/Postgres|Postgres]] | ||
= Setup Timescale for Unified Namespace data logging = | |||
* connect | |||
<blockquote> | <blockquote> | ||
<pre> | <pre> | ||
CREATE TABLE | psql -d "postgres://user:password@localhost/postgres" | ||
</pre> | |||
</blockquote> | |||
* check | |||
<blockquote> | |||
<pre> | |||
\dx # list extensions | |||
\dt # list all tables | |||
</pre> | |||
</blockquote> | |||
* Create database | |||
<blockquote> | |||
<pre> | |||
# DROP DATABASE unifiednamespace; | |||
CREATE DATABASE unifiednamespace OWNER user; | |||
GRANT ALL PRIVILEGES ON DATABASE unifiednamespace TO user; | |||
\c unifiednamespace # switch to database | |||
</pre> | |||
</blockquote> | |||
== Option A: Simple table (with topic as text column) == | |||
<blockquote> | |||
<pre>-- Create table | |||
CREATE TABLE IF NOT EXISTS process_value_numeric ( | |||
timestamp TIMESTAMPTZ NOT NULL, | |||
topic TEXT NOT NULL, | |||
value DOUBLE PRECISION NULL | |||
) WITH ( | |||
tsdb.hypertable, | |||
tsdb.partition_column='timestamp', | |||
tsdb.segmentby = 'topic', | |||
tsdb.orderby = 'timestamp DESC' | |||
); | |||
-- Create hypertable | |||
-- SELECT create_hypertable('process_value_numeric', 'timestamp'); #deprecated | |||
-- add index | |||
CREATE INDEX ON process_value_numeric (topic, timestamp DESC);</pre> | |||
</blockquote> | |||
* View table information | |||
<blockquote> | |||
<pre> | |||
\d process_value_numeric | |||
</pre> | |||
</blockquote> | |||
== Option B: Linked tables (with topic as id) == | |||
* create topic table | |||
<blockquote> | |||
<pre> | |||
CREATE TABLE IF NOT EXISTS topic ( | |||
id SERIAL PRIMARY KEY, | |||
topic TEXT NOT NULL, | |||
created_timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL, | |||
UNIQUE(topic) | |||
); | |||
</pre> | |||
</blockquote> | |||
* create data table | |||
<blockquote> | |||
<pre>CREATE TABLE IF NOT EXISTS process_value_numeric ( | |||
timestamp TIMESTAMPTZ NOT NULL, | |||
topic_id INTEGER NOT NULL REFERENCES topic(id), | |||
value DOUBLE PRECISION NULL, | |||
UNIQUE(timestamp, topic_id) | |||
) WITH ( | |||
tsdb.hypertable, | |||
tsdb.partition_column='timestamp', | |||
tsdb.segmentby = 'topic_id', | |||
tsdb.orderby = 'timestamp DESC' | |||
); | |||
CREATE TABLE IF NOT EXISTS process_value_text ( | |||
timestamp TIMESTAMPTZ NOT NULL, | |||
topic_id INTEGER NOT NULL REFERENCES topic(id), | |||
value TEXT NULL, | |||
UNIQUE(timestamp, topic_id) | |||
) WITH ( | |||
tsdb.hypertable, | |||
tsdb.partition_column='timestamp', | |||
tsdb.segmentby = 'topic_id', | |||
tsdb.orderby = 'timestamp DESC' | |||
); | ); | ||
-- SELECT create_hypertable('process_value_numeric', 'timestamp'); #deprecated | |||
-- SELECT create_hypertable('process_value_text', 'timestamp'); #deprecated</pre> | |||
</blockquote> | |||
* insert sample topics | |||
<blockquote> | |||
<pre> | |||
INSERT INTO topic (topic) VALUES | |||
('uns/v1/home/plug-fridge/sensor/energy/state'), | |||
('uns/v1/home/plug-f3e8e6-dishwasher/sensor/energy/state'), | |||
('uns/v1/home/modbus-meter/sensor/total_import_kwh/state'), | |||
('uns/v1/home/modbus-meter/sensor/total_system_power_w/state'); | |||
</pre> | |||
</blockquote> | |||
* insert sample data | |||
<blockquote> | |||
<pre> | |||
INSERT INTO process_value_numeric (timestamp, topic_id, value) VALUES | |||
(now(), '1', '132'); | |||
INSERT INTO process_value_text (timestamp, topic_id, value) VALUES | |||
(now(), '1', '{some json}'); | |||
</pre> | |||
</blockquote> | |||
* insert data with topic_id lookup | |||
<blockquote> | |||
<pre> | |||
-- step 1: insert topic if not exists | |||
INSERT INTO topic (topic) | |||
SELECT 'uns/v1/home/plug-fridge/sensor/energy/state' | |||
WHERE NOT EXISTS ( | |||
SELECT 1 | |||
FROM topic | |||
WHERE topic = 'uns/v1/home/plug-fridge/sensor/energy/state' | |||
); | |||
-- step 2: insert data point | |||
INSERT INTO process_value_numeric (timestamp, topic_id, value) VALUES ( | |||
NOW(), | |||
(SELECT id FROM topic WHERE topic = 'uns/v1/home/plug-fridge/sensor/energy/state'), | |||
2135.5 | |||
); | |||
</pre> | |||
</blockquote> | |||
* create view to show process value together with topic name | |||
<blockquote> | |||
<pre> | |||
CREATE VIEW process_value_numeric_with_topic AS | |||
SELECT pv.timestamp, t.topic, pv.value | |||
FROM "process_value_numeric" pv | |||
INNER JOIN topic t ON pv.topic_id = t.id | |||
</pre> | </pre> | ||
</blockquote> | </blockquote> | ||
* create view to show process value (numeric + text) together with topic name | |||
<blockquote> | <blockquote> | ||
<pre> | <pre> | ||
SELECT | CREATE VIEW process_value_with_topic AS | ||
SELECT pv.timestamp, t.topic, pv.value | |||
FROM ( | |||
SELECT timestamp, topic_id, value::TEXT | |||
FROM process_value_numeric | |||
UNION ALL | |||
SELECT timestamp, topic_id, value | |||
FROM process_value_text | |||
) as pv | |||
INNER JOIN topic t ON pv.topic_id = t.id | |||
</pre> | </pre> | ||
</blockquote> | </blockquote> | ||
= Insert data = | |||
* via Node-RED | |||
<blockquote> | <blockquote> | ||
<pre> | <pre> | ||
INSERT INTO | INSERT INTO process_value (timestamp, topic, value) VALUES ( | ||
now(), | now(), | ||
'{{{ msg.topic }}}', | '{{{ msg.topic }}}', | ||
| Line 29: | Line 183: | ||
</blockquote> | </blockquote> | ||
= Get table size = | |||
<blockquote> | <blockquote> | ||
<pre> | <pre> | ||
SELECT hypertable_size(' | SELECT hypertable_size('process_value'); | ||
</pre> | </pre> | ||
</blockquote> | </blockquote> | ||
= Compression = | |||
* Show status | |||
<pre> | |||
SELECT * FROM timescaledb_information.hypertables; | |||
SELECT * FROM timescaledb_information.jobs | |||
WHERE proc_name='policy_compression'; | |||
SELECT * FROM hypertable_compression_stats('process_value_numeric'); | |||
SELECT chunk_name, compression_status FROM chunk_compression_stats('process_value_numeric'); | |||
</pre> | |||
== New >2.18.0 == | |||
* Show status | |||
<pre> | |||
SELECT * FROM timescaledb_information.hypertable_columnstore_settings; | |||
SELECT * FROM hypertable_columnstore_stats('process_value_numeric'); | |||
</pre> | |||
* Enable compression | |||
<pre> | |||
ALTER TABLE "process_value_numeric" SET( | |||
timescaledb.enable_columnstore, | |||
timescaledb.orderby = 'timestamp DESC', | |||
timescaledb.segmentby = 'topic_id'); | |||
</pre> | |||
* Add policy | |||
<pre> | |||
CALL add_columnstore_policy('process_value_numeric', INTERVAL '4 weeks'); | |||
</pre> | |||
* Remove policy | |||
<pre> | |||
CALL remove_columnstore_policy('process_value_numeric'); | |||
</pre> | |||
== Old <2.18.0 == | |||
* Enable compression | |||
<pre> | |||
ALTER TABLE "process_value_numeric" SET ( | |||
timescaledb.compress, | |||
timescaledb.compress_orderby = 'timestamp desc', | |||
timescaledb.compress_segmentby = 'topic_id' | |||
); | |||
</pre> | |||
* Add policy | |||
<pre> | |||
SELECT add_compression_policy('process_value_numeric', INTERVAL '4 weeks'); | |||
</pre> | |||
* Remove policy | |||
<pre> | |||
SELECT remove_compression_policy('process_value_numeric'); | |||
</pre> | |||
* Decompress chunks | |||
<pre> | |||
SELECT decompress_chunk(c, true) FROM show_chunks('metrics') c; | |||
</pre> | |||
= | * Disable compression (necessary?) | ||
<pre> | |||
ALTER TABLE process_value_numeric SET (timescaledb.compress = false); | |||
</pre> | |||
= Query = | |||
== Grafana timefilter macro == | |||
* limits query to timeframe selected in grafana | |||
<blockquote> | |||
<pre> | |||
SELECT * | |||
FROM process_value | |||
WHERE (topic = 'home/modbus-meter/sensor/total_system_power_w/state' | |||
AND $__timeFilter("time")) | |||
</pre> | |||
</blockquote> | |||
== Time buckets == | |||
* https://docs.timescale.com/use-timescale/latest/time-buckets/about-time-buckets/ | * https://docs.timescale.com/use-timescale/latest/time-buckets/about-time-buckets/ | ||
* https://docs.timescale.com/use-timescale/latest/time-buckets/use-time-buckets/ | |||
* 15min average time_buckets | * 15min average time_buckets | ||
<blockquote> | <blockquote> | ||
<pre> | <pre> | ||
SELECT time_bucket(' | SELECT time_bucket('1 hour', timestamp) as time, | ||
avg(value) AS avg_value | avg(value) AS avg_value | ||
FROM | FROM sensor_data | ||
WHERE | WHERE | ||
GROUP by | topic = 'home/modbus-meter/sensor/total_system_power_w/state' | ||
AND | |||
$__timeFilter("timestamp") | |||
GROUP by time | |||
</pre> | </pre> | ||
</blockquote> | </blockquote> | ||
== | == Meter / increasing counter == | ||
* https://github.com/timescale/timescaledb-toolkit/blob/main/docs/counter_agg.md | * https://github.com/timescale/timescaledb-toolkit/blob/main/docs/counter_agg.md | ||
* https://stackoverflow.com/questions/76774353/get-complete-hours-time-buckets-with-timescaledb | |||
* https://docs.timescale.com/use-timescale/latest/query-data/advanced-analytic-queries/#calculate-the-increase-in-a-value | |||
<blockquote> | |||
<pre> | |||
SELECT time_bucket('1 hour', time) as bucket, | |||
first(time,time) as period_begin, | |||
last(time,time) as period_end, | |||
MAX(counter) - MIN(counter) as total_consumption | |||
FROM | |||
GROUP BY 1; | |||
</pre> | |||
</blockquote> | |||
== first() + last () == | == first() + last () == | ||
* | |||
== Grafana > Dashboards > Settings > Variables == | |||
<blockquote> | |||
<pre> | |||
topic = SELECT DISTINCT topic FROM "sensor_data" | |||
</pre> | |||
</blockquote> | |||
* | == Number of values per topic == | ||
<blockquote> | |||
<pre> | |||
SELECT topic, COUNT(*) AS num | |||
FROM process_value | |||
GROUP BY topic | |||
ORDER BY num DESC; | |||
</pre> | |||
</blockquote> | |||
== Continuous aggregates == | |||
* https://docs.timescale.com/tutorials/latest/energy-data/dataset-energy/ | |||
== Links == | == Links == | ||
Latest revision as of 08:51, 15 July 2025
Setup Timescale for Unified Namespace data logging
- connect
psql -d "postgres://user:password@localhost/postgres"
- check
\dx # list extensions \dt # list all tables
- Create database
# DROP DATABASE unifiednamespace; CREATE DATABASE unifiednamespace OWNER user; GRANT ALL PRIVILEGES ON DATABASE unifiednamespace TO user; \c unifiednamespace # switch to database
Option A: Simple table (with topic as text column)
-- Create table
CREATE TABLE IF NOT EXISTS process_value_numeric (
timestamp TIMESTAMPTZ NOT NULL,
topic TEXT NOT NULL,
value DOUBLE PRECISION NULL
) WITH (
tsdb.hypertable,
tsdb.partition_column='timestamp',
tsdb.segmentby = 'topic',
tsdb.orderby = 'timestamp DESC'
);
-- Create hypertable
-- SELECT create_hypertable('process_value_numeric', 'timestamp'); #deprecated
-- add index
CREATE INDEX ON process_value_numeric (topic, timestamp DESC);
- View table information
\d process_value_numeric
Option B: Linked tables (with topic as id)
- create topic table
CREATE TABLE IF NOT EXISTS topic (
id SERIAL PRIMARY KEY,
topic TEXT NOT NULL,
created_timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL,
UNIQUE(topic)
);
- create data table
CREATE TABLE IF NOT EXISTS process_value_numeric (
timestamp TIMESTAMPTZ NOT NULL,
topic_id INTEGER NOT NULL REFERENCES topic(id),
value DOUBLE PRECISION NULL,
UNIQUE(timestamp, topic_id)
) WITH (
tsdb.hypertable,
tsdb.partition_column='timestamp',
tsdb.segmentby = 'topic_id',
tsdb.orderby = 'timestamp DESC'
);
CREATE TABLE IF NOT EXISTS process_value_text (
timestamp TIMESTAMPTZ NOT NULL,
topic_id INTEGER NOT NULL REFERENCES topic(id),
value TEXT NULL,
UNIQUE(timestamp, topic_id)
) WITH (
tsdb.hypertable,
tsdb.partition_column='timestamp',
tsdb.segmentby = 'topic_id',
tsdb.orderby = 'timestamp DESC'
);
-- SELECT create_hypertable('process_value_numeric', 'timestamp'); #deprecated
-- SELECT create_hypertable('process_value_text', 'timestamp'); #deprecated
- insert sample topics
INSERT INTO topic (topic) VALUES
('uns/v1/home/plug-fridge/sensor/energy/state'),
('uns/v1/home/plug-f3e8e6-dishwasher/sensor/energy/state'),
('uns/v1/home/modbus-meter/sensor/total_import_kwh/state'),
('uns/v1/home/modbus-meter/sensor/total_system_power_w/state');
- insert sample data
INSERT INTO process_value_numeric (timestamp, topic_id, value) VALUES
(now(), '1', '132');
INSERT INTO process_value_text (timestamp, topic_id, value) VALUES
(now(), '1', '{some json}');
- insert data with topic_id lookup
-- step 1: insert topic if not exists
INSERT INTO topic (topic)
SELECT 'uns/v1/home/plug-fridge/sensor/energy/state'
WHERE NOT EXISTS (
SELECT 1
FROM topic
WHERE topic = 'uns/v1/home/plug-fridge/sensor/energy/state'
);
-- step 2: insert data point
INSERT INTO process_value_numeric (timestamp, topic_id, value) VALUES (
NOW(),
(SELECT id FROM topic WHERE topic = 'uns/v1/home/plug-fridge/sensor/energy/state'),
2135.5
);
- create view to show process value together with topic name
CREATE VIEW process_value_numeric_with_topic AS SELECT pv.timestamp, t.topic, pv.value FROM "process_value_numeric" pv INNER JOIN topic t ON pv.topic_id = t.id
- create view to show process value (numeric + text) together with topic name
CREATE VIEW process_value_with_topic AS
SELECT pv.timestamp, t.topic, pv.value
FROM (
SELECT timestamp, topic_id, value::TEXT
FROM process_value_numeric
UNION ALL
SELECT timestamp, topic_id, value
FROM process_value_text
) as pv
INNER JOIN topic t ON pv.topic_id = t.id
Insert data
- via Node-RED
INSERT INTO process_value (timestamp, topic, value) VALUES (
now(),
'{{{ msg.topic }}}',
'{{{ msg.payload }}}'
);
Get table size
SELECT hypertable_size('process_value');
Compression
- Show status
SELECT * FROM timescaledb_information.hypertables;
SELECT * FROM timescaledb_information.jobs
WHERE proc_name='policy_compression';
SELECT * FROM hypertable_compression_stats('process_value_numeric');
SELECT chunk_name, compression_status FROM chunk_compression_stats('process_value_numeric');
New >2.18.0
- Show status
SELECT * FROM timescaledb_information.hypertable_columnstore_settings;
SELECT * FROM hypertable_columnstore_stats('process_value_numeric');
- Enable compression
ALTER TABLE "process_value_numeric" SET( timescaledb.enable_columnstore, timescaledb.orderby = 'timestamp DESC', timescaledb.segmentby = 'topic_id');
- Add policy
CALL add_columnstore_policy('process_value_numeric', INTERVAL '4 weeks');
- Remove policy
CALL remove_columnstore_policy('process_value_numeric');
Old <2.18.0
- Enable compression
ALTER TABLE "process_value_numeric" SET ( timescaledb.compress, timescaledb.compress_orderby = 'timestamp desc', timescaledb.compress_segmentby = 'topic_id' );
- Add policy
SELECT add_compression_policy('process_value_numeric', INTERVAL '4 weeks');
- Remove policy
SELECT remove_compression_policy('process_value_numeric');
- Decompress chunks
SELECT decompress_chunk(c, true) FROM show_chunks('metrics') c;
- Disable compression (necessary?)
ALTER TABLE process_value_numeric SET (timescaledb.compress = false);
Query
Grafana timefilter macro
- limits query to timeframe selected in grafana
SELECT *
FROM process_value
WHERE (topic = 'home/modbus-meter/sensor/total_system_power_w/state'
AND $__timeFilter("time"))
Time buckets
- https://docs.timescale.com/use-timescale/latest/time-buckets/about-time-buckets/
- https://docs.timescale.com/use-timescale/latest/time-buckets/use-time-buckets/
- 15min average time_buckets
SELECT time_bucket('1 hour', timestamp) as time,
avg(value) AS avg_value
FROM sensor_data
WHERE
topic = 'home/modbus-meter/sensor/total_system_power_w/state'
AND
$__timeFilter("timestamp")
GROUP by time
Meter / increasing counter
- https://github.com/timescale/timescaledb-toolkit/blob/main/docs/counter_agg.md
- https://stackoverflow.com/questions/76774353/get-complete-hours-time-buckets-with-timescaledb
- https://docs.timescale.com/use-timescale/latest/query-data/advanced-analytic-queries/#calculate-the-increase-in-a-value
SELECT time_bucket('1 hour', time) as bucket,
first(time,time) as period_begin,
last(time,time) as period_end,
MAX(counter) - MIN(counter) as total_consumption
FROM
GROUP BY 1;
first() + last ()
Grafana > Dashboards > Settings > Variables
topic = SELECT DISTINCT topic FROM "sensor_data"
Number of values per topic
SELECT topic, COUNT(*) AS num FROM process_value GROUP BY topic ORDER BY num DESC;