@
目录
Druid servers建议将它们组织为三种服务器类型:Master主服务器、Query查询服务器和Data数据服务器。
服务进程类型细分如下:
Coordinator(协调器):服务管理集群上的数据可用性。协调器进程监视数据服务器上的历史进程,负责将Segments分配到特定的服务器,并确保Segments在各个历史数据之间得到很好的平衡。
Overlord:服务控制数据摄取工作负载的分配。Overlord进程监视Data服务器上的MiddleManager进程,并且是数据摄取到Druid的控制器。负责将摄取任务分配给middlemanager并协调Segments发布。
Broker:代理处理来自外部客户端的查询。代理进程从外部客户端接收查询,并将这些查询转发给数据服务器。当代理从这些子查询接收到结果时,合并这些结果并将它们返回给调用者。用户通常是查询broker而不直接在数据服务器上查询Historicals或MiddleManagers进程。
Router:Router服务是可选的;他们将请求路由到broker、coordinator和Overlords。路由器进程是可选进程,在Druid broker、Overlords和coordinator面前提供统一的API网关。也可以直接请求broker、coordinator和Overlords。Router还运行web控制台、数据源、分段、任务、数据流程(Historicals和MiddleManagers)的管理UI,以及协调器动态配置;还可以在控制台中运行SQL和本地Druid查询。
Historical:处理存储和查询“历史”数据(包括在系统中存在足够长时间以提交的任何流数据)的主力。历史进程从深层存储中下载Segments并响应关于这些Segments的查询,不接受写操作。
MiddleManager:服务摄取数据。负责将新数据导入集群,从外部数据源读取数据并发布新的Druid Segments。
Indexer process:可选的,是MiddleManagers和Peons的替代方案。Indexer不是为每个任务派生单独的JVM进程,而是在单个JVM进程中作为单独的线程运行任务。与MiddleManager + Peon系统相比,Indexer的设计更容易配置和部署,并且更好地支持跨任务共享资源。Indexer是一个较新的特性,由于它的内存管理系统仍在开发中,所以目前还处于试验阶段,将在Druid的未来版本中逐渐成熟。通常情况下,可以部署MiddleManagers或Indexers,但不能同时部署两者。
深度存储:Druid使用深度存储来存储任何已经摄入到系统中的数据。深度存储是每个Druid服务器都可以访问的共享文件存储。在集群部署中,这通常是一个分布式对象存储,如S3、HDFS或一个网络挂载的文件系统。在单服务器部署中是本地磁盘。
元数据存储:存储各种共享的系统元数据,如段使用信息和任务信息。在集群部署中,这通常是传统的RDBMS,如PostgreSQL或MySQL。在单服务器部署中,它通常是本地存储的Apache Derby数据库。
ZooKeeper:用于内部服务发现、协调和领导者选举。
Druid能够实现海量数据实时分析采取了如下特殊⼿段:
Apache Druid可以在摄入原始数据时使用称为“roll-up”的过程进行汇总。roll-up是针对选定列集的一级聚合操作,可减小存储数据的大小。分析查询逃不开聚合操作,Druid在数据⼊库时就提前进⾏了聚合,这就是所谓的预聚合(roll-up)。Druid把数据按照选定维度的相同的值进⾏分组聚合,可以⼤⼤降低存储⼤⼩。数据查询的时候只需要预聚合的数据基础上进⾏轻量的⼆次过滤和聚合即可快速拿到分析结果。要做预聚合,Druid要求数据能够分为三个部分:
使用网络流事件数据的一个小样本,表示在特定秒内发生的从源到目的IP地址的流量的包和字节计数,数据如下:
{"timestamp":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":20,"bytes":9024}
{"timestamp":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":255,"bytes":21133}
{"timestamp":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":11,"bytes":5780}
{"timestamp":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":38,"bytes":6289}
{"timestamp":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":377,"bytes":359971}
{"timestamp":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":49,"bytes":10204}
{"timestamp":"2018-01-02T21:33:14Z","srcIP":"7.7.7.7","dstIP":"8.8.8.8","packets":38,"bytes":6289}
{"timestamp":"2018-01-02T21:33:45Z","srcIP":"7.7.7.7","dstIP":"8.8.8.8","packets":123,"bytes":93999}
{"timestamp":"2018-01-02T21:35:45Z","srcIP":"7.7.7.7","dstIP":"8.8.8.8","packets":12,"bytes":2818}
timestamp是Timestamp列,srcIP和dstIP是Dimension列(维度),packets和bytes是Metric列。数据⼊库到Druid时如果打开预聚合功能(可以不打开聚合,数据量大建议打开),要求对packets和bytes进⾏累加(sum),并且要求按条计数(count *),聚合之后的数据如下,可以看出聚合是以牺牲明细数据分析查询为代价。
列式存储的概念已经非常耳熟,但凡在⼤数据领域想要解决快速存储和分析海量数据基本都会采⽤列式存储,一般来说OLTP数据库使用行式存储,OLAP数据使用列式存储。
Druid的数据在存储层⾯是按照Datasource和Segments实现多级分区存储的,并建⽴了位图索引。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xkKQdpk9-1675265402036)(http://www.itxiaoshen.com:3001/assets/1675159187424CTMTr68s.png)]
Segment跟Chunk
Druid数据存储的摄取方式、聚合方式、每列数据存储的字节起始位都有存储。
例如有一份数据如下
以tp为时间列,appkey和city为维度,以value为度量值,导⼊Druid后按天聚合,最终结果如下
聚合后数据经过聚合之后查询本身就很快了,为了进⼀步加速对聚合之后数据的查询,Druid会建立位图索引如下
上⾯的位图索引不是针对列⽽是针对列的值,记录了列的值在数据的哪⼀行出现过,第一列是具体列的值,后续列标识该列的值在某⼀⾏是否出现过,依次是第1列到第n列。例如appkey1在第⼀⾏出现过,在其他⾏没出现,那就是1000(例子中只有四个列)。
Select sum(value)
from xxx
where time='2019-11-11' and appkey in('appkey1','appkey2') and area='北京'
当我们有如上查询时,⾸先根据时间段定位到segment,然后根据appkey in (‘appkey1’,’appkey2’) and area=’北京’ 查到各⾃的bitmap:(appkey1(1000) or appkey2(0110)) and 北京(1100) = (1100) 也就是说,符合条件的列是第⼀行和第⼆行,这两⾏的metric的和为125.
在Druid中加载数据称为摄取或索引。当摄取数据到Druid时,Druid从源系统读取数据并将其存储在称为段的数据文件中;通常,每个段文件包含几百万行。
对于大多数摄取方法,Druid MiddleManager进程或Indexer进程加载源数据。唯一的例外是基于Hadoop的摄取,它在YARN上使用Hadoop MapReduce作业。
在摄入过程中,Druid创建片段并将它们存储在深层存储中。历史节点将段加载到内存中以响应查询。对于流输入,中层管理人员和索引人员可以使用到达的数据实时响应查询。
Druid包含流式和批量摄取方法,以下描述了适用于所有摄入方法的摄入概念和信息。
流摄取:有两个可用的选项;流摄取由一个持续运行的管理器控制。
批量摄取:有三种可供批量摄入的选择。批量摄取作业与在作业期间运行的控制器任务相关联。
Apache Druid支持两种查询语言:Druid SQL和本机查询;可以使用Druid SQL查询Druid数据源中的数据。Druid将SQL查询翻译成其本地查询语言。Druid SQL计划发生在Broker上。设置Broker运行时属性以配置查询计划和JDBC查询。
Apache Druid 包含的API如下:
Apache Druid的本地查询类型和本地查询组件内容如下:
使用hadoop1、hadoop2、hadoop3共3台搭建druid的集群,如果有更多服务器可以随时启动相应组件即可,集群规模不大Master Server3台和Query Server2台即可,更多的是根据处理数据规模增加Data Server节点。
主机
组件
hadoop1
Master Server(Coordinator和Overlords)
hadoop2
Data Server(Historical和MiddleManager)
hadoop3
Query Server(Broker和Router)
Java 8 or 11(使用现有)
Python2 or Python3(使用现有Python3)
MySQL(元数据存储,使用现有MySQL 8.0.28)
HDFS(深度存储,使用现有hadoop 3.3.4)
ZooKeeper(使用现有)
创建数据库
-- 创建一个druid数据库,确保使用utf8mb4作为编码
CREATE DATABASE druid DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;
-- 创建一个druid用户
CREATE USER 'druid'@'%' IDENTIFIED BY 'diurd';
-- 向用户授予刚刚创建的数据库的所有权限
GRANT ALL PRIVILEGES ON druid.* TO druid@'%' WITH GRANT OPTION;
ALTER USER 'druid'@'%' IDENTIFIED WITH mysql_native_password BY 'druid';
FLUSH PRIVILEGES;
将MySQL驱动(mysql-connector-java-8.0.28.jar)上传到druid根目录下的extensions/mysql-metadata-storage目录下
修改集群配置文件。vi conf/druid/cluster/_common/common.runtime.properties
druid.host=hadoop1
druid.extensions.loadList=["druid-hdfs-storage", "druid-kafka-indexing-service", "druid-datasketches", "druid-multi-stage-query","mysql-metadata-storage"]
#druid.metadata.storage.type=derby
#druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/var/druid/metadata.db;create=true
#druid.metadata.storage.connector.host=localhost
#druid.metadata.storage.connector.port=1527
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://mysqlserver:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd
注释掉“深度存储”和“索引服务日志”下的本地存储配置。vi conf/druid/cluster/_common/common.runtime.properties
#druid.storage.type=local
#druid.storage.storageDirectory=var/druid/segments
druid.storage.type=hdfs
druid.storage.storageDirectory=/druid/segments
#druid.indexer.logs.type=file
#druid.indexer.logs.directory=var/druid/indexing-logs
druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=/druid/indexing-logs
将Hadoop配置xml (core-site.xml, hdfs-site.xml, yarn-site.xml, mapred-site.xml)放在Druid进程的类路径中。把它们复制到conf/druid/cluster/_common/。
连接Hadoop的配置(可选),如果需要从Hadoop集群中加载数据则需要配置,并将Hadoop配置xml (core-site.xml, hdfs-site.xml,)放在Druid进程的类路径中。把它们复制到conf/druid/cluster/_common/。vi conf/druid/cluster/data/middleManager/runtime.properties
druid.indexer.task.baseTaskDir=/var/druid/task
druid.indexer.task.hadoopWorkingPath=/var/druid/hadoop-tmp
vi conf/druid/cluster/_common/common.runtime.properties
druid.zk.service.host=zk1:2181,zk2:2181,zk3:2181
# 将apache-druid分别到另外两台服务器上,并修改druid.host
rsync -az apache-druid-25.0.0/ hadoop2:/home/commons/apache-druid-25.0.0/
rsync -az apache-druid-25.0.0/ hadoop3:/home/commons/apache-druid-25.0.0/
# hadoop1上启动Master Serve
bin/start-cluster-master-no-zk-server
# hadoop2上启动Data Server
bin/start-cluster-data-server
# hadoop3上启动Query Server
bin/start-cluster-query-server
# 如果集群规模较大需要分离进程模块,也可以单独启动
bin/coordinator.sh start
bin/overlord.sh start
bin/historical.sh start
bin/middleManager.sh start
bin/broker.sh start
bin/jconsole.sh start
# 单独关闭
bin/coordinator.sh stop
bin/overlord.sh stop
bin/historical.sh stop
bin/middleManager.sh stop
bin/broker.sh stop
bin/jconsole.sh stop
启动完毕后访问查询节点的Druid的控制台UI,http://hadoop3:8888/,点击Services栏目可以看到所有进程服务详细信息
# 先将官方提供的示例数据上传到hdfs
hdfs dfs -put wikiticker-2015-09-12-sampled.json.gz /tmp/my-druid
然后和前面单机版导入操作流程相似,只是选择输入类型为HDFS,填写Paths为上面上传的路径/tmp/my-druid/wikiticker-2015-09-12-sampled.json.gz
生成SQL如下,修改表名为wikipedia(原来为data)
REPLACE INTO "wikipedia" OVERWRITE ALL
WITH "ext" AS (SELECT *
FROM TABLE(
EXTERN(
'{"type":"hdfs","paths":"/tmp/my-druid/wikiticker-2015-09-12-sampled.json.gz"}',
'{"type":"json"}',
'[{"name":"time","type":"string"},{"name":"channel","type":"string"},{"name":"cityName","type":"string"},{"name":"comment","type":"string"},{"name":"countryIsoCode","type":"string"},{"name":"countryName","type":"string"},{"name":"isAnonymous","type":"string"},{"name":"isMinor","type":"string"},{"name":"isNew","type":"string"},{"name":"isRobot","type":"string"},{"name":"isUnpatrolled","type":"string"},{"name":"metroCode","type":"long"},{"name":"namespace","type":"string"},{"name":"page","type":"string"},{"name":"regionIsoCode","type":"string"},{"name":"regionName","type":"string"},{"name":"user","type":"string"},{"name":"delta","type":"long"},{"name":"added","type":"long"},{"name":"deleted","type":"long"}]'
)
))
SELECT
TIME_PARSE("time") AS "__time",
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
"delta",
"added",
"deleted"
FROM "ext"
PARTITIONED BY DAY
查看数据源可以看到wikipedia表信息
查看HDFS上也有相应的段数据
输入SQL,点击运行查询数据
SELECT
channel,
COUNT(*)
FROM "wikipedia"
GROUP BY channel
ORDER BY COUNT(*) DESC
可以通过http请求查询,这里以官方示例TopN查询为例
curl -X POST 'http://hadoop3:8888/druid/v2/?pretty' -H 'Content-Type:application/json' -d @wikipedia-top-pages.json
查看数据摄取的任务信息
查看段信息
手机扫一扫
移动阅读更方便
你可能感兴趣的文章