continuous view 是 pipelinedb的核心,类似一个view,但是数据是合并了stream以及table的数据输入数据,并且是
实时根据输入数据进行更新的语法
CREATE CONTINUOUS VIEW name AS queryquery是一个pg 的select 格式的语法,格式如下:SELECT [ DISTINCT [ ON ( expression [, ...] ) ] ] expression [ [ AS ] output_name ] [, ...] [ FROM from_item [, ...] ] [ WHERE condition ] [ GROUP BY expression [, ...] ] [ WINDOW window_name AS ( window_definition ) [, ...] ]where from_item can be one of: stream_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ] table_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ] from_item [ NATURAL ] join_type from_item [ ON join_condition ]
环境准备
项目使用docker运行同时结合hasura graphql 引擎
- docker-compose
version: '3.6'services: postgres: image: pipelinedb/pipelinedb ports: - "5432:5432" graphql-engine: image: hasura/graphql-engine:v1.0.0-alpha06 ports: - "8080:8080" depends_on: - "postgres" command: > /bin/sh -c " graphql-engine --database-url postgres://pipeline:pipeline@postgres:5432/pipeline serve --enable-console; "
数据源从基本数据tabley以及stream 获取(比较综合的例子)
- 创建基本表
CREATE TABLE userlogin ( id SERIAL PRIMARY KEY, username text NOT NULL, userid integer NOT NULL, usertype text NOT NULL, logintype text NOT NULL); ```* 创建stream:```codeCREATE STREAM loginlogs (logintype text, userid integer);
- 创建continuous view
CREATE CONTINUOUS VIEW userloginview AS select a.logintype,b.username,b.userid, b.logintype as logintype_ from loginlogs a join userlogin bon a.userid=b.userid
- 插入数据&&查询
insert into loginlogs(logintype,userid) values ('mobile',333),('pc',333),('web',333)
select * from userloginview
- graphql 集成
- graphql 查询
- 说明 实际使用中我们的view一般都是一个聚合函数的操作,比如统计状态,异常信息排查,同时view 支持ttl 可以支持有效期控制
官方提供的一个比较有意思的demo
- 延迟百分比 90 95 99 延迟占比
CREATE CONTINUOUS VIEW latency ASSELECT percentile_cont(array[90, 95, 99]) WITHIN GROUP (ORDER BY latency)FROM latency_stream;
- 最新5分钟广告的曝光
CREATE CONTINUOUS VIEW imps ASSELECT COUNT(*) FROM imps_streamWHERE (arrival_timestamp > clock_timestamp() - interval '5 minutes');