Jump to content

Postgres/TimescaleDB

From Wiki

Setup Timescale for Unified Namespace data logging

  • connect
psql -d "postgres://user:password@localhost/postgres"
  • check
\dx                               # list extensions
\dt                               # list all tables
  • Create database
# DROP DATABASE unifiednamespace;
CREATE DATABASE unifiednamespace OWNER user;
GRANT ALL PRIVILEGES ON DATABASE unifiednamespace TO user;

\c unifiednamespace               # switch to database 

Option A: Simple table (with topic as text column)

-- Create table
CREATE TABLE IF NOT EXISTS process_value_numeric (
    timestamp TIMESTAMPTZ NOT NULL,
    topic TEXT NOT NULL,
    value DOUBLE PRECISION NULL    
) WITH (
   tsdb.hypertable,
   tsdb.partition_column='timestamp',
   tsdb.segmentby = 'topic',
   tsdb.orderby = 'timestamp DESC'
);

-- Create hypertable
-- SELECT create_hypertable('process_value_numeric', 'timestamp');   #deprecated


-- add index
CREATE INDEX ON process_value_numeric (topic, timestamp DESC);
  • View table information
 \d process_value_numeric

Option B: Linked tables (with topic as id)

  • create topic table
CREATE TABLE IF NOT EXISTS topic (
    id SERIAL PRIMARY KEY,
    topic TEXT NOT NULL,
    created_timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP NOT NULL,
    UNIQUE(topic)
);
  • create data table
CREATE TABLE IF NOT EXISTS process_value_numeric (
    timestamp TIMESTAMPTZ NOT NULL,
    topic_id INTEGER NOT NULL REFERENCES topic(id),
    value DOUBLE PRECISION NULL,
    UNIQUE(timestamp, topic_id)
) WITH (
   tsdb.hypertable,
   tsdb.partition_column='timestamp',
   tsdb.segmentby = 'topic_id',
   tsdb.orderby = 'timestamp DESC'
);


CREATE TABLE IF NOT EXISTS process_value_text (
    timestamp TIMESTAMPTZ NOT NULL,
    topic_id INTEGER NOT NULL REFERENCES topic(id),
    value TEXT NULL,
    UNIQUE(timestamp, topic_id)
) WITH (
   tsdb.hypertable,
   tsdb.partition_column='timestamp',
   tsdb.segmentby = 'topic_id',
   tsdb.orderby = 'timestamp DESC'
);


-- SELECT create_hypertable('process_value_numeric', 'timestamp');   #deprecated
-- SELECT create_hypertable('process_value_text', 'timestamp');      #deprecated
  • insert sample topics
INSERT INTO topic (topic) VALUES 
('uns/v1/home/plug-fridge/sensor/energy/state'),
('uns/v1/home/plug-f3e8e6-dishwasher/sensor/energy/state'),
('uns/v1/home/modbus-meter/sensor/total_import_kwh/state'),
('uns/v1/home/modbus-meter/sensor/total_system_power_w/state');
  • insert sample data
INSERT INTO process_value_numeric (timestamp, topic_id, value) VALUES 
(now(), '1', '132');

INSERT INTO process_value_text (timestamp, topic_id, value) VALUES 
(now(), '1', '{some json}');
  • insert data with topic_id lookup
-- step 1: insert topic if not exists
INSERT INTO topic (topic)
SELECT 'uns/v1/home/plug-fridge/sensor/energy/state'
WHERE NOT EXISTS (
    SELECT 1
    FROM topic
    WHERE topic = 'uns/v1/home/plug-fridge/sensor/energy/state'
);

-- step 2: insert data point
INSERT INTO process_value_numeric (timestamp, topic_id, value) VALUES (
    NOW(), 
    (SELECT id FROM topic WHERE topic = 'uns/v1/home/plug-fridge/sensor/energy/state'),
	2135.5
);
  • create view to show process value together with topic name
CREATE VIEW process_value_numeric_with_topic AS
SELECT pv.timestamp, t.topic, pv.value
FROM "process_value_numeric" pv
INNER JOIN topic t ON pv.topic_id = t.id
  • create view to show process value (numeric + text) together with topic name
CREATE VIEW process_value_with_topic AS
SELECT pv.timestamp, t.topic, pv.value
FROM (
    SELECT timestamp, topic_id, value::TEXT
    FROM process_value_numeric
    UNION ALL
    SELECT timestamp, topic_id, value
    FROM process_value_text
) as pv
INNER JOIN topic t ON pv.topic_id = t.id

Insert data

  • via Node-RED
INSERT INTO process_value (timestamp, topic, value) VALUES (
    now(),
    '{{{ msg.topic }}}',
    '{{{ msg.payload }}}'
);


Get table size

SELECT hypertable_size('process_value');

Compression

  • Show status
SELECT * FROM timescaledb_information.hypertables;

SELECT * FROM timescaledb_information.jobs
  WHERE proc_name='policy_compression';

SELECT * FROM hypertable_compression_stats('process_value_numeric');
SELECT chunk_name, compression_status FROM chunk_compression_stats('process_value_numeric');

New >2.18.0

  • Show status
SELECT * FROM timescaledb_information.hypertable_columnstore_settings;
SELECT * FROM hypertable_columnstore_stats('process_value_numeric');
  • Enable compression
ALTER TABLE "process_value_numeric" SET(
   timescaledb.enable_columnstore, 
   timescaledb.orderby = 'timestamp DESC', 
   timescaledb.segmentby = 'topic_id');	
  • Add policy
CALL add_columnstore_policy('process_value_numeric', INTERVAL '4 weeks');
  • Remove policy
CALL remove_columnstore_policy('process_value_numeric');

Old <2.18.0

  • Enable compression
ALTER TABLE "process_value_numeric" SET (
  timescaledb.compress,
  timescaledb.compress_orderby = 'timestamp desc',
  timescaledb.compress_segmentby = 'topic_id'
);
  • Add policy
SELECT add_compression_policy('process_value_numeric', INTERVAL '4 weeks');
  • Remove policy
SELECT remove_compression_policy('process_value_numeric');
  • Decompress chunks
SELECT decompress_chunk(c, true) FROM show_chunks('metrics') c;
  • Disable compression (necessary?)
ALTER TABLE process_value_numeric SET (timescaledb.compress = false);

Query

Grafana timefilter macro

  • limits query to timeframe selected in grafana
SELECT * 
FROM process_value
WHERE (topic = 'home/modbus-meter/sensor/total_system_power_w/state' 
AND $__timeFilter("time")) 

Time buckets

SELECT time_bucket('1 hour', timestamp) as time,
  avg(value) AS avg_value
FROM sensor_data
WHERE
  topic = 'home/modbus-meter/sensor/total_system_power_w/state'
  AND 
  $__timeFilter("timestamp")
GROUP by time

Meter / increasing counter

SELECT time_bucket('1 hour', time) as bucket,
 first(time,time) as period_begin,
 last(time,time) as period_end,
 MAX(counter) - MIN(counter) as total_consumption
FROM 
GROUP BY 1;

first() + last ()

Grafana > Dashboards > Settings > Variables

topic = SELECT DISTINCT topic FROM "sensor_data"

Number of values per topic

SELECT topic, COUNT(*) AS num
FROM process_value
GROUP BY topic
ORDER BY num DESC;


Continuous aggregates

Links