一、简介
InfluxDB(时序数据库)influxdb是一个开源分布式时序、时间和指标数据库,使用 Go 语言编写,无需外部依赖。其设计目标是实现分布式和水平伸缩扩展,是 InfluxData 的核心产品。常用的一种使用场景:监控数据统计,物联网传感器数据和实
时分析等的后端存储。每毫秒记录一下电脑内存的使用情况,然后就可以根据统计的数据,利用图形化界面(InfluxDB V1一般配合Grafana)制作内存使用情况的折线图;可以理解为按时间记录一些数据(常用的监控数据、埋点统计数据等),然
后制作图表做统计。
时间序列数据:从定义上来说,就是一串按时间维度索引的数据。
时序数据库(TSDB)特点:
持续高并发写入、无更新;
数据压缩存储;
低查询延时。
常见 TSDB:influxdb、opentsdb、timeScaladb、Druid 等。
influxdb 完整的上下游产业还包括:Chronograf、Telegraf、Kapacitor,其具体作用及关系如下:
二、同常见关系型数据库(MySQL)的基础概念对比
概念
MySQL
InfluxDB
数据库(同)
database
database
表(不同)
table
measurement
列(不同)
column
tag(带索引的,非必须)、field(不带索引)、timestemp(唯一主键)
tag set:不同的每组tag key和tag value的集合;
field set:每组field key和field value的集合;
retention policy:数据存储策略(默认策略为autogen)InfluxDB没有删除数据操作,规定数据的保留时间达到清除数据的目的;
series:共同retention policy,measurement和tag set的集合;
示例数据如下: 其中census是measurement,butterflies和honeybees是field key,location和scientist是tag key
name: census
————————————
time butterflies honeybees location scientist
2015-08-18T00:00:00Z 12 23 1 langstroth
2015-08-18T00:00:00Z 1 30 1 perpetua
2015-08-18T00:06:00Z 11 28 1 langstroth
2015-08-18T00:06:00Z 11 28 2 langstroth
point 属性
含义
time
数据记录的时间,是主索引(自动生成)
tags
各种有索引的属性
fields
各种value值(没有索引的属性)
此外,influxdb还有个特有的概念:series(一般由:retention policy, measurement, tagset就共同组成),其含义如下:
所有在数据库中的数据,都需要通过图表来展示,而这个series表示这个表里面的数据,可以在图表上画成几条线:通过tags排列组合算出来。
需要注意的是,influxdb不需要像传统数据库一样创建各种表,其表的创建主要是通过第一次数据插入时自动创建,如下:
insert mytest, server=serverA count=1,name=5 //自动创建表
“mytest”,“server” 是 tags,“count”、“name” 是 fields
fields 中的 value 基本不用于索引
tag 只能为字符串类型
field 类型无限制
不支持join
支持连续查询操作(汇总统计数据):CONTINUOUS QUERY
配合Telegraf服务(Telegraf可以监控系统CPU、内存、网络等数据)
配合Grafana服务(数据展现的图像界面,将influxdb中的数据可视化)
三、常见sql
-- 查看所有的数据库
show databases;
-- 使用特定的数据库
use database_name;
-- 查看所有的measurement
show measurements;
-- 查询10条数据
select * from measurement_name limit 10;
-- 数据中的时间字段默认显示的是一个纳秒时间戳,改成可读格式
precision rfc3339; -- 之后再查询,时间就是rfc3339标准格式
-- 或可以在连接数据库的时候,直接带该参数
influx -precision rfc3339
-- 查看一个measurement中所有的tag key
show tag keys
-- 查看一个measurement中所有的field key
show field keys
-- 查看一个measurement中所有的保存策略(可以有多个,一个标识为default)
show retention policies;
四、 存储引擎
采用TSM存储引擎,TSM是在LSM的基础上优化改善的,引入了serieskey的概念,对数据实现了很好的分类组织。
TSM主要由四个部分组成: cache、wal、tsm file、compactor:
InfluxDB数据保留策略:
每个数据库刚开始会自动创建一个默认的存储策略 autogen,数据保留时间为永久,在集群中的副本个数为1,之后用户可以自己设置(查看、新建、修改、删除),例如保留最近2小时的数据。插入和查询数据时如果不指定存储策略,则使用默认存储策略,且默认存储策略可以修改。InfluxDB 会定期清除过期的数据。
每个数据库可以有多个过期策略:
show retention policies on "db_name"
Shard 在 influxdb中是一个比较重要的概念,它和 retention policy 相关联。每一个存储策略下会存在许多 shard,每一个 shard 存储一个指定时间段内的数据,并且不重复,例如 7点-8点 的数据落入 shard0 中,8点-9点的数据则落入 shard1 中。每一个 shard 都对应一个底层的 tsm 存储引擎,有独立的 cache、wal、tsm file。
这样做的目的就是为了可以通过时间来快速定位到要查询数据的相关资源,加速查询的过程,并且也让之后的批量删除数据的操作变得非常简单且高效。
建议在数据库建立的时候设置存储策略,不建议设置过多且随意切换
create database testdb2 with duration 30d
influxdb的数据存储有三个目录,分别是meta、wal、data:
五、 性能优化与go开发
控制 series 的数量;
使用批量写;
使用恰当的时间粒度;
存储的时候尽量对 Tag 进行排序;
根据数据情况,调整 shard 的 duration;
无关的数据写不同的database;
控制 Tag Key, 与 Tag Value 值的大小;
存储分离 ,将 wal 目录与 data 目录分别映射到不同的磁盘上,以减少读写操作的相互影响。
go语言开发只需要一个依赖包:github.com/influxdata/influxdb/client/v2,需要注意是v1.8版本,直接clone会失败,
可先到:github.com/influxdata/influxdb中选择版本号V1.8,然后clone下载
对influxdb的操作主要有连接、插入、查询、关闭等几个步骤,其中查询的时候需要注意时间,要设置相应的时区,不然可能显示的时间结果不同
import (
"github.com/influxdata/influxdb/client/v2"
…
)
//连接influxdb
func ConnectInflux()(client.Client, error){
conn, err := client.NewHTTPClient(client.HTTPConfig{
Addr:"http://localhost:8086",
Username:username,
Password:password,
})
if nil != err{
fmt.Println(err)
return nil, err
}
return conn, nil
}
//写入point
func WritePoints(con client.Client)error{
batchpoint ,err := client.NewBatchPoints(client.BatchPointsConfig{
Precision:"s",
Database:MyDB,
})
if nil != err{
fmt.Println(err)
return err
}
record := Record{AssertId:"assert_aaaaa", ModelId:"model0", PoinntId:"point1",
ModelPath:"model0/model1/point1", Attr:"", ModelTime:"123456789"}
tags := map[string]string{Tag1:record.AssertId, Tag2:record.ModelId}
fields := map[string]interface{}{Field1:record.PoinntId, Field2:record.ModelPath,
Field3:record.Attr, Field4:record.ModelTime}
point, err := client.NewPoint(Measurement, tags, fields, time.Now())
if nil != err{
fmt.Println(err)
return err
}
batchpoint.AddPoint(point)
if err := con.Write(batchpoint); err != nil{
fmt.Println(err)
return err
}
}
//查询时要注意时区,东八区设置为:tz('Asia/Shanghai'),命令行需要:precision rfc3339
query := fmt.Sprintf("select * from %s limit %d tz('Asia/Shanghai')", Measurement, 5)
res, err := querydb(conn, query)
手机扫一扫
移动阅读更方便
你可能感兴趣的文章