Postgres/TimescaleDB: Difference between revisions
Appearance
< Postgres
| (22 intermediate revisions by the same user not shown) | |||
| Line 28: | Line 28: | ||
</blockquote> | </blockquote> | ||
== Option A: Simple table (with topic as text column == | == Option A: Simple table (with topic as text column) == | ||
<blockquote> | <blockquote> | ||
<pre> | <pre>-- Create table | ||
-- Create table | |||
CREATE TABLE IF NOT EXISTS process_value_numeric ( | CREATE TABLE IF NOT EXISTS process_value_numeric ( | ||
timestamp TIMESTAMPTZ NOT NULL, | timestamp TIMESTAMPTZ NOT NULL, | ||
topic TEXT NOT NULL, | topic TEXT NOT NULL, | ||
value DOUBLE PRECISION NULL ); | value DOUBLE PRECISION NULL | ||
) WITH ( | |||
tsdb.hypertable, | |||
tsdb.partition_column='timestamp', | |||
tsdb.segmentby = 'topic', | |||
tsdb.orderby = 'timestamp DESC' | |||
); | |||
-- Create hypertable | -- Create hypertable | ||
SELECT create_hypertable('process_value_numeric', 'timestamp'); | -- SELECT create_hypertable('process_value_numeric', 'timestamp'); #deprecated | ||
-- add index | -- add index | ||
CREATE INDEX ON process_value_numeric (topic, timestamp DESC); | CREATE INDEX ON process_value_numeric (topic, timestamp DESC);</pre> | ||
</pre> | |||
</blockquote> | </blockquote> | ||
| Line 59: | Line 64: | ||
id SERIAL PRIMARY KEY, | id SERIAL PRIMARY KEY, | ||
topic TEXT NOT NULL, | topic TEXT NOT NULL, | ||
created_timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL, | |||
UNIQUE(topic) | UNIQUE(topic) | ||
); | ); | ||
| Line 66: | Line 72: | ||
* create data table | * create data table | ||
<blockquote> | <blockquote> | ||
<pre> | <pre>CREATE TABLE IF NOT EXISTS process_value_numeric ( | ||
CREATE TABLE IF NOT EXISTS process_value_numeric ( | |||
timestamp TIMESTAMPTZ NOT NULL, | timestamp TIMESTAMPTZ NOT NULL, | ||
topic_id INTEGER NOT NULL REFERENCES topic(id), | topic_id INTEGER NOT NULL REFERENCES topic(id), | ||
value DOUBLE PRECISION NULL, | value DOUBLE PRECISION NULL, | ||
UNIQUE(timestamp, topic_id) | UNIQUE(timestamp, topic_id) | ||
) WITH ( | |||
tsdb.hypertable, | |||
tsdb.partition_column='timestamp', | |||
tsdb.segmentby = 'topic_id', | |||
tsdb.orderby = 'timestamp DESC' | |||
); | ); | ||
CREATE TABLE IF NOT EXISTS process_value_text ( | CREATE TABLE IF NOT EXISTS process_value_text ( | ||
| Line 79: | Line 90: | ||
value TEXT NULL, | value TEXT NULL, | ||
UNIQUE(timestamp, topic_id) | UNIQUE(timestamp, topic_id) | ||
) WITH ( | |||
tsdb.hypertable, | |||
tsdb.partition_column='timestamp', | |||
tsdb.segmentby = 'topic_id', | |||
tsdb.orderby = 'timestamp DESC' | |||
); | ); | ||
SELECT create_hypertable('process_value_numeric', 'timestamp'); | |||
SELECT create_hypertable('process_value_text', 'timestamp'); | -- SELECT create_hypertable('process_value_numeric', 'timestamp'); #deprecated | ||
</pre> | -- SELECT create_hypertable('process_value_text', 'timestamp'); #deprecated</pre> | ||
</blockquote> | </blockquote> | ||
| Line 102: | Line 118: | ||
INSERT INTO process_value_numeric (timestamp, topic_id, value) VALUES | INSERT INTO process_value_numeric (timestamp, topic_id, value) VALUES | ||
(now(), '1', '132'); | (now(), '1', '132'); | ||
INSERT INTO process_value_text (timestamp, topic_id, value) VALUES | |||
(now(), '1', '{some json}'); | |||
</pre> | </pre> | ||
</blockquote> | </blockquote> | ||
| Line 109: | Line 128: | ||
<pre> | <pre> | ||
-- step 1: insert topic if not exists | -- step 1: insert topic if not exists | ||
INSERT INTO topic (topic) | INSERT INTO topic (topic) | ||
SELECT 'uns/v1/home/plug-fridge/sensor/energy/state' | |||
WHERE NOT EXISTS ( | |||
SELECT 1 | |||
FROM topic | |||
WHERE topic = 'uns/v1/home/plug-fridge/sensor/energy/state' | |||
); | |||
-- step 2: insert data point | -- step 2: insert data point | ||
| Line 125: | Line 148: | ||
<blockquote> | <blockquote> | ||
<pre> | <pre> | ||
CREATE VIEW | CREATE VIEW process_value_numeric_with_topic AS | ||
SELECT pv.timestamp, t.topic, pv.value | SELECT pv.timestamp, t.topic, pv.value | ||
FROM "process_value_numeric" pv | FROM "process_value_numeric" pv | ||
INNER JOIN topic t ON pv.topic_id = t.id | |||
</pre> | |||
</blockquote> | |||
* create view to show process value (numeric + text) together with topic name | |||
<blockquote> | |||
<pre> | |||
CREATE VIEW process_value_with_topic AS | |||
SELECT pv.timestamp, t.topic, pv.value | |||
FROM ( | |||
SELECT timestamp, topic_id, value::TEXT | |||
FROM process_value_numeric | |||
UNION ALL | |||
SELECT timestamp, topic_id, value | |||
FROM process_value_text | |||
) as pv | |||
INNER JOIN topic t ON pv.topic_id = t.id | INNER JOIN topic t ON pv.topic_id = t.id | ||
</pre> | </pre> | ||
| Line 145: | Line 184: | ||
= Get table size = | |||
<blockquote> | <blockquote> | ||
<pre> | <pre> | ||
| Line 151: | Line 190: | ||
</pre> | </pre> | ||
</blockquote> | </blockquote> | ||
= Compression = | |||
* Show status | |||
<pre> | |||
SELECT * FROM timescaledb_information.hypertables; | |||
SELECT * FROM timescaledb_information.jobs | |||
WHERE proc_name='policy_compression'; | |||
SELECT * FROM hypertable_compression_stats('process_value_numeric'); | |||
SELECT chunk_name, compression_status FROM chunk_compression_stats('process_value_numeric'); | |||
</pre> | |||
== New >2.18.0 == | |||
* Show status | |||
<pre> | |||
SELECT * FROM timescaledb_information.hypertable_columnstore_settings; | |||
SELECT * FROM hypertable_columnstore_stats('process_value_numeric'); | |||
</pre> | |||
* Enable compression | |||
<pre> | |||
ALTER TABLE "process_value_numeric" SET( | |||
timescaledb.enable_columnstore, | |||
timescaledb.orderby = 'timestamp DESC', | |||
timescaledb.segmentby = 'topic_id'); | |||
</pre> | |||
* Add policy | |||
<pre> | |||
CALL add_columnstore_policy('process_value_numeric', INTERVAL '4 weeks'); | |||
</pre> | |||
* Remove policy | |||
<pre> | |||
CALL remove_columnstore_policy('process_value_numeric'); | |||
</pre> | |||
== Old <2.18.0 == | |||
* Enable compression | |||
<pre> | |||
ALTER TABLE "process_value_numeric" SET ( | |||
timescaledb.compress, | |||
timescaledb.compress_orderby = 'timestamp desc', | |||
timescaledb.compress_segmentby = 'topic_id' | |||
); | |||
</pre> | |||
* Add policy | |||
<pre> | |||
SELECT add_compression_policy('process_value_numeric', INTERVAL '4 weeks'); | |||
</pre> | |||
* Remove policy | |||
<pre> | |||
SELECT remove_compression_policy('process_value_numeric'); | |||
</pre> | |||
* Decompress chunks | |||
<pre> | |||
SELECT decompress_chunk(c, true) FROM show_chunks('metrics') c; | |||
</pre> | |||
* Disable compression (necessary?) | |||
<pre> | |||
ALTER TABLE process_value_numeric SET (timescaledb.compress = false); | |||
</pre> | |||
= Query = | = Query = | ||
| Line 200: | Line 305: | ||
== Grafana > Dashboards > Settings > Variables == | == Grafana > Dashboards > Settings > Variables == | ||
<blockquote> | |||
<pre> | |||
topic = SELECT DISTINCT topic FROM "sensor_data" | |||
</pre> | |||
</blockquote> | |||
== Number of values per topic == | |||
<blockquote> | |||
<pre> | |||
SELECT topic, COUNT(*) AS num | |||
FROM process_value | |||
GROUP BY topic | |||
ORDER BY num DESC; | |||
</pre> | |||
</blockquote> | |||
== Continuous aggregates == | == Continuous aggregates == | ||
Latest revision as of 08:51, 15 July 2025
Setup Timescale for Unified Namespace data logging
- connect
psql -d "postgres://user:password@localhost/postgres"
- check
\dx # list extensions \dt # list all tables
- Create database
# DROP DATABASE unifiednamespace; CREATE DATABASE unifiednamespace OWNER user; GRANT ALL PRIVILEGES ON DATABASE unifiednamespace TO user; \c unifiednamespace # switch to database
Option A: Simple table (with topic as text column)
-- Create table
CREATE TABLE IF NOT EXISTS process_value_numeric (
timestamp TIMESTAMPTZ NOT NULL,
topic TEXT NOT NULL,
value DOUBLE PRECISION NULL
) WITH (
tsdb.hypertable,
tsdb.partition_column='timestamp',
tsdb.segmentby = 'topic',
tsdb.orderby = 'timestamp DESC'
);
-- Create hypertable
-- SELECT create_hypertable('process_value_numeric', 'timestamp'); #deprecated
-- add index
CREATE INDEX ON process_value_numeric (topic, timestamp DESC);
- View table information
\d process_value_numeric
Option B: Linked tables (with topic as id)
- create topic table
CREATE TABLE IF NOT EXISTS topic (
id SERIAL PRIMARY KEY,
topic TEXT NOT NULL,
created_timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL,
UNIQUE(topic)
);
- create data table
CREATE TABLE IF NOT EXISTS process_value_numeric (
timestamp TIMESTAMPTZ NOT NULL,
topic_id INTEGER NOT NULL REFERENCES topic(id),
value DOUBLE PRECISION NULL,
UNIQUE(timestamp, topic_id)
) WITH (
tsdb.hypertable,
tsdb.partition_column='timestamp',
tsdb.segmentby = 'topic_id',
tsdb.orderby = 'timestamp DESC'
);
CREATE TABLE IF NOT EXISTS process_value_text (
timestamp TIMESTAMPTZ NOT NULL,
topic_id INTEGER NOT NULL REFERENCES topic(id),
value TEXT NULL,
UNIQUE(timestamp, topic_id)
) WITH (
tsdb.hypertable,
tsdb.partition_column='timestamp',
tsdb.segmentby = 'topic_id',
tsdb.orderby = 'timestamp DESC'
);
-- SELECT create_hypertable('process_value_numeric', 'timestamp'); #deprecated
-- SELECT create_hypertable('process_value_text', 'timestamp'); #deprecated
- insert sample topics
INSERT INTO topic (topic) VALUES
('uns/v1/home/plug-fridge/sensor/energy/state'),
('uns/v1/home/plug-f3e8e6-dishwasher/sensor/energy/state'),
('uns/v1/home/modbus-meter/sensor/total_import_kwh/state'),
('uns/v1/home/modbus-meter/sensor/total_system_power_w/state');
- insert sample data
INSERT INTO process_value_numeric (timestamp, topic_id, value) VALUES
(now(), '1', '132');
INSERT INTO process_value_text (timestamp, topic_id, value) VALUES
(now(), '1', '{some json}');
- insert data with topic_id lookup
-- step 1: insert topic if not exists
INSERT INTO topic (topic)
SELECT 'uns/v1/home/plug-fridge/sensor/energy/state'
WHERE NOT EXISTS (
SELECT 1
FROM topic
WHERE topic = 'uns/v1/home/plug-fridge/sensor/energy/state'
);
-- step 2: insert data point
INSERT INTO process_value_numeric (timestamp, topic_id, value) VALUES (
NOW(),
(SELECT id FROM topic WHERE topic = 'uns/v1/home/plug-fridge/sensor/energy/state'),
2135.5
);
- create view to show process value together with topic name
CREATE VIEW process_value_numeric_with_topic AS SELECT pv.timestamp, t.topic, pv.value FROM "process_value_numeric" pv INNER JOIN topic t ON pv.topic_id = t.id
- create view to show process value (numeric + text) together with topic name
CREATE VIEW process_value_with_topic AS
SELECT pv.timestamp, t.topic, pv.value
FROM (
SELECT timestamp, topic_id, value::TEXT
FROM process_value_numeric
UNION ALL
SELECT timestamp, topic_id, value
FROM process_value_text
) as pv
INNER JOIN topic t ON pv.topic_id = t.id
Insert data
- via Node-RED
INSERT INTO process_value (timestamp, topic, value) VALUES (
now(),
'{{{ msg.topic }}}',
'{{{ msg.payload }}}'
);
Get table size
SELECT hypertable_size('process_value');
Compression
- Show status
SELECT * FROM timescaledb_information.hypertables;
SELECT * FROM timescaledb_information.jobs
WHERE proc_name='policy_compression';
SELECT * FROM hypertable_compression_stats('process_value_numeric');
SELECT chunk_name, compression_status FROM chunk_compression_stats('process_value_numeric');
New >2.18.0
- Show status
SELECT * FROM timescaledb_information.hypertable_columnstore_settings;
SELECT * FROM hypertable_columnstore_stats('process_value_numeric');
- Enable compression
ALTER TABLE "process_value_numeric" SET( timescaledb.enable_columnstore, timescaledb.orderby = 'timestamp DESC', timescaledb.segmentby = 'topic_id');
- Add policy
CALL add_columnstore_policy('process_value_numeric', INTERVAL '4 weeks');
- Remove policy
CALL remove_columnstore_policy('process_value_numeric');
Old <2.18.0
- Enable compression
ALTER TABLE "process_value_numeric" SET ( timescaledb.compress, timescaledb.compress_orderby = 'timestamp desc', timescaledb.compress_segmentby = 'topic_id' );
- Add policy
SELECT add_compression_policy('process_value_numeric', INTERVAL '4 weeks');
- Remove policy
SELECT remove_compression_policy('process_value_numeric');
- Decompress chunks
SELECT decompress_chunk(c, true) FROM show_chunks('metrics') c;
- Disable compression (necessary?)
ALTER TABLE process_value_numeric SET (timescaledb.compress = false);
Query
Grafana timefilter macro
- limits query to timeframe selected in grafana
SELECT *
FROM process_value
WHERE (topic = 'home/modbus-meter/sensor/total_system_power_w/state'
AND $__timeFilter("time"))
Time buckets
- https://docs.timescale.com/use-timescale/latest/time-buckets/about-time-buckets/
- https://docs.timescale.com/use-timescale/latest/time-buckets/use-time-buckets/
- 15min average time_buckets
SELECT time_bucket('1 hour', timestamp) as time,
avg(value) AS avg_value
FROM sensor_data
WHERE
topic = 'home/modbus-meter/sensor/total_system_power_w/state'
AND
$__timeFilter("timestamp")
GROUP by time
Meter / increasing counter
- https://github.com/timescale/timescaledb-toolkit/blob/main/docs/counter_agg.md
- https://stackoverflow.com/questions/76774353/get-complete-hours-time-buckets-with-timescaledb
- https://docs.timescale.com/use-timescale/latest/query-data/advanced-analytic-queries/#calculate-the-increase-in-a-value
SELECT time_bucket('1 hour', time) as bucket,
first(time,time) as period_begin,
last(time,time) as period_end,
MAX(counter) - MIN(counter) as total_consumption
FROM
GROUP BY 1;
first() + last ()
Grafana > Dashboards > Settings > Variables
topic = SELECT DISTINCT topic FROM "sensor_data"
Number of values per topic
SELECT topic, COUNT(*) AS num FROM process_value GROUP BY topic ORDER BY num DESC;