Postgres/TimescaleDB: Difference between revisions
Appearance
< Postgres
| (5 intermediate revisions by the same user not shown) | |||
| Line 30: | Line 30: | ||
== Option A: Simple table (with topic as text column) == | == Option A: Simple table (with topic as text column) == | ||
<blockquote> | <blockquote> | ||
<pre> | <pre>-- Create table | ||
-- Create table | |||
CREATE TABLE IF NOT EXISTS process_value_numeric ( | CREATE TABLE IF NOT EXISTS process_value_numeric ( | ||
timestamp TIMESTAMPTZ NOT NULL, | timestamp TIMESTAMPTZ NOT NULL, | ||
| Line 44: | Line 43: | ||
-- Create hypertable | -- Create hypertable | ||
-- SELECT create_hypertable('process_value_numeric', 'timestamp'); #deprecated | |||
-- add index | -- add index | ||
CREATE INDEX ON process_value_numeric (topic, timestamp DESC); | CREATE INDEX ON process_value_numeric (topic, timestamp DESC);</pre> | ||
</pre> | |||
</blockquote> | </blockquote> | ||
| Line 74: | Line 72: | ||
* create data table | * create data table | ||
<blockquote> | <blockquote> | ||
<pre> | <pre>CREATE TABLE IF NOT EXISTS process_value_numeric ( | ||
CREATE TABLE IF NOT EXISTS process_value_numeric ( | |||
timestamp TIMESTAMPTZ NOT NULL, | timestamp TIMESTAMPTZ NOT NULL, | ||
topic_id INTEGER NOT NULL REFERENCES topic(id), | topic_id INTEGER NOT NULL REFERENCES topic(id), | ||
value DOUBLE PRECISION NULL, | value DOUBLE PRECISION NULL, | ||
UNIQUE(timestamp, topic_id) | UNIQUE(timestamp, topic_id) | ||
) WITH ( | |||
tsdb.hypertable, | |||
tsdb.partition_column='timestamp', | |||
tsdb.segmentby = 'topic_id', | |||
tsdb.orderby = 'timestamp DESC' | |||
); | ); | ||
CREATE TABLE IF NOT EXISTS process_value_text ( | CREATE TABLE IF NOT EXISTS process_value_text ( | ||
| Line 87: | Line 90: | ||
value TEXT NULL, | value TEXT NULL, | ||
UNIQUE(timestamp, topic_id) | UNIQUE(timestamp, topic_id) | ||
) WITH ( | |||
tsdb.hypertable, | |||
tsdb.partition_column='timestamp', | |||
tsdb.segmentby = 'topic_id', | |||
tsdb.orderby = 'timestamp DESC' | |||
); | ); | ||
SELECT create_hypertable('process_value_numeric', 'timestamp'); | |||
SELECT create_hypertable('process_value_text', 'timestamp'); | -- SELECT create_hypertable('process_value_numeric', 'timestamp'); #deprecated | ||
</pre> | -- SELECT create_hypertable('process_value_text', 'timestamp'); #deprecated</pre> | ||
</blockquote> | </blockquote> | ||
| Line 183: | Line 191: | ||
</blockquote> | </blockquote> | ||
= Compression = | = Compression = | ||
* Show status | |||
<pre> | |||
SELECT * FROM timescaledb_information.hypertables; | |||
SELECT * FROM timescaledb_information.jobs | |||
WHERE proc_name='policy_compression'; | |||
SELECT * FROM hypertable_compression_stats('process_value_numeric'); | |||
SELECT chunk_name, compression_status FROM chunk_compression_stats('process_value_numeric'); | |||
</pre> | |||
== New >2.18.0 == | == New >2.18.0 == | ||
* Show status | |||
<pre> | |||
SELECT * FROM timescaledb_information.hypertable_columnstore_settings; | |||
SELECT * FROM hypertable_columnstore_stats('process_value_numeric'); | |||
</pre> | |||
* Enable compression | * Enable compression | ||
<pre> | <pre> | ||
ALTER TABLE | ALTER TABLE "process_value_numeric" SET( | ||
timescaledb.enable_columnstore, | timescaledb.enable_columnstore, | ||
timescaledb.orderby = 'timestamp DESC', | timescaledb.orderby = 'timestamp DESC', | ||
| Line 192: | Line 217: | ||
</pre> | </pre> | ||
* Add policy | |||
<pre> | |||
CALL add_columnstore_policy('process_value_numeric', INTERVAL '4 weeks'); | |||
</pre> | |||
* Remove policy | |||
* | |||
<pre> | <pre> | ||
CALL remove_columnstore_policy('process_value_numeric'); | |||
</pre> | </pre> | ||
== Old <2.18.0 == | |||
* Enable compression | * Enable compression | ||
<pre> | <pre> | ||
| Line 215: | Line 237: | ||
</pre> | </pre> | ||
* Add | * Add policy | ||
<pre> | <pre> | ||
SELECT add_compression_policy('process_value_numeric', INTERVAL '4 weeks'); | SELECT add_compression_policy('process_value_numeric', INTERVAL '4 weeks'); | ||
</pre> | </pre> | ||
* Remove | * Remove policy | ||
<pre> | <pre> | ||
SELECT remove_compression_policy('process_value_numeric'); | SELECT remove_compression_policy('process_value_numeric'); | ||
Latest revision as of 08:51, 15 July 2025
Setup Timescale for Unified Namespace data logging
- connect
psql -d "postgres://user:password@localhost/postgres"
- check
\dx # list extensions \dt # list all tables
- Create database
# DROP DATABASE unifiednamespace; CREATE DATABASE unifiednamespace OWNER user; GRANT ALL PRIVILEGES ON DATABASE unifiednamespace TO user; \c unifiednamespace # switch to database
Option A: Simple table (with topic as text column)
-- Create table
CREATE TABLE IF NOT EXISTS process_value_numeric (
timestamp TIMESTAMPTZ NOT NULL,
topic TEXT NOT NULL,
value DOUBLE PRECISION NULL
) WITH (
tsdb.hypertable,
tsdb.partition_column='timestamp',
tsdb.segmentby = 'topic',
tsdb.orderby = 'timestamp DESC'
);
-- Create hypertable
-- SELECT create_hypertable('process_value_numeric', 'timestamp'); #deprecated
-- add index
CREATE INDEX ON process_value_numeric (topic, timestamp DESC);
- View table information
\d process_value_numeric
Option B: Linked tables (with topic as id)
- create topic table
CREATE TABLE IF NOT EXISTS topic (
id SERIAL PRIMARY KEY,
topic TEXT NOT NULL,
created_timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL,
UNIQUE(topic)
);
- create data table
CREATE TABLE IF NOT EXISTS process_value_numeric (
timestamp TIMESTAMPTZ NOT NULL,
topic_id INTEGER NOT NULL REFERENCES topic(id),
value DOUBLE PRECISION NULL,
UNIQUE(timestamp, topic_id)
) WITH (
tsdb.hypertable,
tsdb.partition_column='timestamp',
tsdb.segmentby = 'topic_id',
tsdb.orderby = 'timestamp DESC'
);
CREATE TABLE IF NOT EXISTS process_value_text (
timestamp TIMESTAMPTZ NOT NULL,
topic_id INTEGER NOT NULL REFERENCES topic(id),
value TEXT NULL,
UNIQUE(timestamp, topic_id)
) WITH (
tsdb.hypertable,
tsdb.partition_column='timestamp',
tsdb.segmentby = 'topic_id',
tsdb.orderby = 'timestamp DESC'
);
-- SELECT create_hypertable('process_value_numeric', 'timestamp'); #deprecated
-- SELECT create_hypertable('process_value_text', 'timestamp'); #deprecated
- insert sample topics
INSERT INTO topic (topic) VALUES
('uns/v1/home/plug-fridge/sensor/energy/state'),
('uns/v1/home/plug-f3e8e6-dishwasher/sensor/energy/state'),
('uns/v1/home/modbus-meter/sensor/total_import_kwh/state'),
('uns/v1/home/modbus-meter/sensor/total_system_power_w/state');
- insert sample data
INSERT INTO process_value_numeric (timestamp, topic_id, value) VALUES
(now(), '1', '132');
INSERT INTO process_value_text (timestamp, topic_id, value) VALUES
(now(), '1', '{some json}');
- insert data with topic_id lookup
-- step 1: insert topic if not exists
INSERT INTO topic (topic)
SELECT 'uns/v1/home/plug-fridge/sensor/energy/state'
WHERE NOT EXISTS (
SELECT 1
FROM topic
WHERE topic = 'uns/v1/home/plug-fridge/sensor/energy/state'
);
-- step 2: insert data point
INSERT INTO process_value_numeric (timestamp, topic_id, value) VALUES (
NOW(),
(SELECT id FROM topic WHERE topic = 'uns/v1/home/plug-fridge/sensor/energy/state'),
2135.5
);
- create view to show process value together with topic name
CREATE VIEW process_value_numeric_with_topic AS SELECT pv.timestamp, t.topic, pv.value FROM "process_value_numeric" pv INNER JOIN topic t ON pv.topic_id = t.id
- create view to show process value (numeric + text) together with topic name
CREATE VIEW process_value_with_topic AS
SELECT pv.timestamp, t.topic, pv.value
FROM (
SELECT timestamp, topic_id, value::TEXT
FROM process_value_numeric
UNION ALL
SELECT timestamp, topic_id, value
FROM process_value_text
) as pv
INNER JOIN topic t ON pv.topic_id = t.id
Insert data
- via Node-RED
INSERT INTO process_value (timestamp, topic, value) VALUES (
now(),
'{{{ msg.topic }}}',
'{{{ msg.payload }}}'
);
Get table size
SELECT hypertable_size('process_value');
Compression
- Show status
SELECT * FROM timescaledb_information.hypertables;
SELECT * FROM timescaledb_information.jobs
WHERE proc_name='policy_compression';
SELECT * FROM hypertable_compression_stats('process_value_numeric');
SELECT chunk_name, compression_status FROM chunk_compression_stats('process_value_numeric');
New >2.18.0
- Show status
SELECT * FROM timescaledb_information.hypertable_columnstore_settings;
SELECT * FROM hypertable_columnstore_stats('process_value_numeric');
- Enable compression
ALTER TABLE "process_value_numeric" SET( timescaledb.enable_columnstore, timescaledb.orderby = 'timestamp DESC', timescaledb.segmentby = 'topic_id');
- Add policy
CALL add_columnstore_policy('process_value_numeric', INTERVAL '4 weeks');
- Remove policy
CALL remove_columnstore_policy('process_value_numeric');
Old <2.18.0
- Enable compression
ALTER TABLE "process_value_numeric" SET ( timescaledb.compress, timescaledb.compress_orderby = 'timestamp desc', timescaledb.compress_segmentby = 'topic_id' );
- Add policy
SELECT add_compression_policy('process_value_numeric', INTERVAL '4 weeks');
- Remove policy
SELECT remove_compression_policy('process_value_numeric');
- Decompress chunks
SELECT decompress_chunk(c, true) FROM show_chunks('metrics') c;
- Disable compression (necessary?)
ALTER TABLE process_value_numeric SET (timescaledb.compress = false);
Query
Grafana timefilter macro
- limits query to timeframe selected in grafana
SELECT *
FROM process_value
WHERE (topic = 'home/modbus-meter/sensor/total_system_power_w/state'
AND $__timeFilter("time"))
Time buckets
- https://docs.timescale.com/use-timescale/latest/time-buckets/about-time-buckets/
- https://docs.timescale.com/use-timescale/latest/time-buckets/use-time-buckets/
- 15min average time_buckets
SELECT time_bucket('1 hour', timestamp) as time,
avg(value) AS avg_value
FROM sensor_data
WHERE
topic = 'home/modbus-meter/sensor/total_system_power_w/state'
AND
$__timeFilter("timestamp")
GROUP by time
Meter / increasing counter
- https://github.com/timescale/timescaledb-toolkit/blob/main/docs/counter_agg.md
- https://stackoverflow.com/questions/76774353/get-complete-hours-time-buckets-with-timescaledb
- https://docs.timescale.com/use-timescale/latest/query-data/advanced-analytic-queries/#calculate-the-increase-in-a-value
SELECT time_bucket('1 hour', time) as bucket,
first(time,time) as period_begin,
last(time,time) as period_end,
MAX(counter) - MIN(counter) as total_consumption
FROM
GROUP BY 1;
first() + last ()
Grafana > Dashboards > Settings > Variables
topic = SELECT DISTINCT topic FROM "sensor_data"
Number of values per topic
SELECT topic, COUNT(*) AS num FROM process_value GROUP BY topic ORDER BY num DESC;