如何快速地把HDFS中的数据导入ClickHouse_Ricky_Huo的博客-CSDN博客_hdfs 导入clickhouse


本站和网页 https://blog.csdn.net/huochen1994/article/details/83827587 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

如何快速地把HDFS中的数据导入ClickHouse_Ricky_Huo的博客-CSDN博客_hdfs 导入clickhouse
如何快速地把HDFS中的数据导入ClickHouse
置顶
Ricky_Huo
于 2018-11-07 17:12:07 发布
18756
收藏
25
分类专栏:
Clickhouse
Spark
文章标签:
ClickHouse
Spark
Waterdrop
大数据
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/huochen1994/article/details/83827587
版权
Clickhouse
同时被 2 个专栏收录
5 篇文章
2 订阅
订阅专栏
Spark
13 篇文章
0 订阅
订阅专栏
如何快速地把HDFS中的数据导入ClickHouse
ClickHouse是面向OLAP的分布式列式DBMS。我们部门目前已经把所有数据分析相关的日志数据存储至ClickHouse这个优秀的数据仓库之中,当前日数据量达到了300亿。
之前介绍的有关数据处理入库的经验都是基于实时数据流,数据存储在Kafka中,我们使用Java或者Golang将数据从Kafka中读取、解析、清洗之后写入ClickHouse中,这样可以实现数据的快速接入。然而在很多同学的使用场景中,数据都不是实时的,可能需要将HDFS或者是Hive中的数据导入ClickHouse。有的同学通过编写Spark程序来实现数据的导入,那么是否有更简单、高效的方法呢。
目前开源社区上有一款工具Waterdrop,项目地址https://github.com/InterestingLab/waterdrop,可以快速地将HDFS中的数据导入ClickHouse。
HDFS to ClickHouse
假设我们的日志存储在HDFS中,我们需要将日志进行解析并筛选出我们关心的字段,将对应的字段写入ClickHouse的表中。
Log Sample
我们在HDFS中存储的日志格式如下, 是很常见的Nginx日志
10.41.1.28 github.com 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:03:09:32 +0800] "GET /InterestingLab/waterdrop HTTP/1.1" 200 0 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)" "196" "-" "mainpage" "443" "-" "172.16.181.129"
ClickHouse Schema
我们的ClickHouse建表语句如下,我们的表按日进行分区
CREATE TABLE cms.cms_msg
date Date,
datetime DateTime,
url String,
request_time Float32,
status String,
hostname String,
domain String,
remote_addr String,
data_size Int32,
pool String
) ENGINE = MergeTree PARTITION BY date ORDER BY date SETTINGS index_granularity = 16384
Waterdrop with ClickHouse
接下来会给大家详细介绍,我们如何通过Waterdrop满足上述需求,将HDFS中的数据写入ClickHouse中。
Waterdrop
Waterdrop是一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上。Waterdrop拥有着非常丰富的插件,支持从Kafka、HDFS、Kudu中读取数据,进行各种各样的数据处理,并将结果写入ClickHouse、Elasticsearch或者Kafka中。
Prerequisites
首先我们需要安装Waterdrop,安装十分简单,无需配置系统环境变量
准备Spark环境安装Waterdrop配置Waterdrop
以下是简易步骤,具体安装可以参照Quick Start
cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
wget https://github.com/InterestingLab/waterdrop/releases/download/v1.1.1/waterdrop-1.1.1.zip
unzip waterdrop-1.1.1.zip
cd waterdrop-1.1.1
vim config/waterdrop-env.sh
# 指定Spark安装路径
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}
Waterdrop Pipeline
我们仅需要编写一个Waterdrop Pipeline的配置文件即可完成数据的导入。
配置文件包括四个部分,分别是Spark、Input、filter和Output。
Spark
这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。
spark {
spark.app.name = "Waterdrop"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
Input
这一部分定义数据源,如下是从HDFS文件中读取text格式数据的配置案例。
input {
hdfs {
path = "hdfs://nomanode:8020/rowlog/accesslog"
table_name = "access_log"
format = "text"
Filter
在Filter部分,这里我们配置一系列的转化,包括正则解析将日志进行拆分、时间转换将HTTPDATE转化为ClickHouse支持的日期格式、对Number类型的字段进行类型转换以及通过SQL进行字段筛减等
filter {
# 使用正则解析原始日志
grok {
source_field = "raw_message"
pattern = '%{IP:ha_ip}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}\"\\s%{DATA:uid}\\s%{DATA:session_id}\\s\"%{DATA:pool}\"\\s\"%{DATA:tag2}\"\\s%{DATA:tag3}\\s%{DATA:tag4}'
# 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
# "yyyy/MM/dd HH:mm:ss"格式的数据
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy/MM/dd HH:mm:ss"
# 使用SQL筛选关注的字段,并对字段进行处理
# 甚至可以通过过滤条件过滤掉不关心的数据
sql {
table_name = "access"
sql = "select substring(date, 1, 10) as date, datetime, hostname, url, http_code, float(request_time), int(data_size), domain from access"
Output
最后我们将处理好的结构化数据写入ClickHouse
output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "waterdrop"
table = "access_log"
fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
Running Waterdrop
我们将上述四部分配置组合成为我们的配置文件config/batch.conf。
vim config/batch.conf
spark {
spark.app.name = "Waterdrop"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
input {
hdfs {
path = "hdfs://nomanode:8020/rowlog/accesslog"
table_name = "access_log"
format = "text"
filter {
# 使用正则解析原始日志
grok {
source_field = "raw_message"
pattern = '%{IP:ha_ip}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}\"\\s%{DATA:uid}\\s%{DATA:session_id}\\s\"%{DATA:pool}\"\\s\"%{DATA:tag2}\"\\s%{DATA:tag3}\\s%{DATA:tag4}'
# 将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
# "yyyy/MM/dd HH:mm:ss"格式的数据
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy/MM/dd HH:mm:ss"
# 使用SQL筛选关注的字段,并对字段进行处理
# 甚至可以通过过滤条件过滤掉不关心的数据
sql {
table_name = "access"
sql = "select substring(date, 1, 10) as date, datetime, hostname, url, http_code, float(request_time), int(data_size), domain from access"
output {
clickhouse {
host = "your.clickhouse.host:8123"
database = "waterdrop"
table = "access_log"
fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
username = "username"
password = "password"
执行命令,指定配置文件,运行Waterdrop,即可将数据写入ClickHouse。这里我们以本地模式为例。
./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'
Conclusion
在这篇文章中,我们介绍了如何使用Waterdrop将HDFS中的Nginx日志文件导入ClickHouse中。仅通过一个配置文件便可快速完成数据的导入,无需编写任何代码。除了支持HDFS数据源之外,Waterdrop同样支持将数据从Kafka中实时读取处理写入ClickHouse中。我们的下一篇文章将会介绍,如何将Hive中的数据快速导入ClickHouse中。
当然,Waterdrop不仅仅是ClickHouse数据写入的工具,在Elasticsearch以及Kafka等数据源的写入上同样可以扮演相当重要的角色。
希望了解Waterdrop和ClickHouse、Elasticsearch、Kafka结合使用的更多功能和案例,可以直接进入项目主页https://github.com/InterestingLab/waterdrop
– Power by InterestingLab
Ricky_Huo
关注
关注
点赞
25
收藏
打赏
评论
如何快速地把HDFS中的数据导入ClickHouse
如何快速地把HDFS中的数据导入ClickHouseClickHouse是面向OLAP的分布式列式DBMS。我们部门目前已经把所有数据分析相关的日志数据存储至ClickHouse这个优秀的数据仓库之中,当前日数据量达到了300亿。之前介绍的有关数据处理入库的经验都是基于实时数据流,数据存储在Kafka中,我们使用Java或者Golang将数据从Kafka中读取、解析、清洗之后写入ClickH...
复制链接
扫一扫
专栏目录
HDFS+Clickhouse+Spark:从0到1实现一款轻量级大数据分析系统
QcloudCommunity的博客
07-13
764
导语 | 在产品精细化运营时代,经常会遇到产品增长问题:比如指标涨跌原因分析、版本迭代效果分析、运营活动效果分析等。这一类分析问题高频且具有较高时效性要求,然而在人力资源紧张情况,传统的...
利用DataX将HDFS中数据同步到ClickHouse中
weixin_45514285的博客
10-22
341
利用DataX将HDFS中数据同步到ClickHouse中
评论 3
您还未登录,请先
登录
后发表或查看评论
clickhouse批量写入数据
最新发布
robinhunan的博客
10-24
736
clickhouse 批量插入数据的示例
datax将hive中的hdfs数据导入到clickhouse测试调错全程
weixin_43465754的博客
04-26
1528
datax将hive中的数据导入到clickhouse
利用waterdrop将hdfs里的数据快速迁移到clickhouse中(单机版)
大鱼的博客
08-15
2884
启动waterdrop:
./bin/start-waterdrop.sh --master local[4] --deploy-mode client --config ./config/streaming.conf
注:这里面的local[4]中的是代表本机线程个数,这个是自己确定,这里为4个线程;后面的配置文件也是自己进行选择,上面是为了做流式计算,故而streaming.conf
...
seatunnel(海量数据处理工具)实现HDFS导入Clickhouse
Machine4869的博客
01-12
2867
文章目录介绍快速开始案例1:HDFS导入Clickhouse
ref: https://interestinglab.github.io/seatunnel-docs/#/zh-cn/v1/
介绍
seatunnel 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上。
为什么需要seatunnel ?
让Spark的使用更简单,更高效。简化开发
特性
简单易用,灵活配置,无需开发
模块化和插件化,易于扩展
支持利用
如何从HDFS导入数据到ClickHouse
qq_43193797的博客
10-23
703
从ClickHouse 18.16.0版本开始支持从HDFS读文件,在 19.1.6 版本对HDFS访问功能进行了增强,支持读和写,在 19.4 版本以后开始支持Parquet格式。本文介绍了如何从HDFS中读数据到ClickHouse中,测试版本为:19.4
在访问HDFS之前需要定义一个访问HDFS的表,指定表引擎为HDFS。表创建完成后,就可以对这张表进行查询。
一、查询CSV文件
例如,在HDFS上有一个数据文件:books.csv,内容如下:
hadoop fs -cat /user/hive/c
从Hadoop到ClickHouse,现代BI系统有哪些问题?如何解决?
大数据
06-23
3717
导读:一次机缘巧合,在研究BI产品技术选型的时候,我接触到了ClickHouse,瞬间就被其惊人的性能所折服。这款非Hadoop生态、简单、自成一体的技术组件引起了我极大的好奇。那么Cl...
使用flink将数据流写入到Clickhouse的工具类
weixin_42796403的博客
03-23
901
使用flink将数据流写入到Clickhouse
package com.gmall.realtime.util;
import com.gmall.realtime.bean.VisitorStats;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.Jd
篇五|ClickHouse数据导入(Flink、Spark、Kafka、MySQL、Hive)
jmx_bigdata的博客
11-18
1921
本文分享主要是ClickHouse的数据导入方式,本文主要介绍如何使用Flink、Spark、Kafka、MySQL、Hive将数据导入ClickHouse,具体内容包括:
使用Flink导入数据
使用Spark导入数据
从Kafka中导入数据
从MySQL中导入数据
从Hive中导入数据
使用Flink导入数据
本文介绍使用 flink-jdbc将数据导入ClickHouse,Maven依赖为:
<dependency>
<groupId>org.apache.fli
ClickHouse-FROM从MySQL ,HDFS中直接读取数据
bwf317的博客
12-07
711
从HDFS中直接读取数据加载到表中
创建表:
CREATE TABLE tb_ch_from
ENGINE = TinyLog AS
SELECT *
FROM hdfs(‘hdfs://linux01:8020/data/user.csv’,‘CSV’,‘uid Int8,name String,gender String,age UInt8’)
注意:
此处建表没有直接指定字段写在()中
SELECT * FROM hdfs 时hdfs是小写,在直接用HDFS 引擎时是用的大写(engine
DefaultTableModel概述
cumao2792的博客
09-14
1250
The
DefaultTableModel
class is a subclass of the
类是的子类
AbstractTableModel
. As the name suggests it is the table model that is used by a
。 顾名思义,它是由表模型使用的。
when no table model is spe...
将hdfs数据快速导入clickhouse,并通过linux客户端模式导入array 类型数据
王亮的博客
08-19
716
1.背景:开始研究使用clickhouse,在检索clickhouse官网(https://clickhouse.tech/docs/en/)的时候看到jdbc 通过8123端口进行数据的导入和连接。然后写了一个工具往clickhou导入文件,发现是真的慢,而且容易崩。于是研究别的导入方式,发现其实通过客户端命令走9000端口进行导入其实真的又快又稳定。于是彻底抛弃了jdbc的导入方式,直接通过shell脚本控制将hdfs文件导入到clickhouse中,官方对客户端导入的格式是有限制的 详情参考:(htt
Clickhouse集群应用、分片、复制
热门推荐
tianjinsong的专栏
10-11
4万+
https://www.jianshu.com/p/20639fdfdc99
简介
通常生产环境我们会用集群代替单机,主要是解决两个问题:
效率
稳定
如何提升效率?一个大大大任务,让一个人干需要一年,拆解一下让12个人同时干,可能只需要1个月。对于数据库来说,就是数据分片。
如何提升稳定性?所谓稳定就是要保证服务时刻都能用,也常说高可用。这就像团队里必须有二把手,老大有事不在,老二要能...
ClickHouse与HDFS交互
寒暄的博客
07-02
3650
ClickHouse与Hadoop的兼容性不好,数据交互还是依靠将数据导出为固定格式的文件,然后将文件导入到ClickHouse中。
从HDFS读取数据
从HDFS上读取数据类似于将HDFS作为外部存储,然后去拉取HDFS上的数据。
需要用到一个新的引擎HDFS:
CREATE TABLE hdfs_student_csv(
id Int8,
name String
Engine=HDFS('hdfs://hadoop01:9000/student.csv','csv');
但是数据实际上还是在
clickhouse hive/hdfs引擎由于HDFS-HA报错问题解决
qq_35128600的博客
03-02
1803
clickhouse hive/hdfs表引擎HA报错问题
ClickHouse入门:表引擎-HDFS
高矮
10-21
1830
Code: 210. DB::Exception: Received from localhost:9000. DB::Exception: Unable to connect to HDFS: InvalidParameter: Cannot parse URI: hdfs://mycluster, missing port or invalid HA configurationCaused by: HdfsConfigNotFound: Config key: dfs.ha.namenodes.myc
Clickhouse 从S3/Hive导入数据
我是坏人哦
06-23
794
我们的埋点数据上传到S3,大概是每天10亿条的数据量级别。最近花了一些时间思考和学习如何将每天如此大量的数据从S3导入到Clickhouse,为后续的实时查询做准备。
“相关推荐”对你有帮助么?
非常没帮助
没帮助
一般
有帮助
非常有帮助
提交
©️2022 CSDN
皮肤主题:技术黑板
设计师:CSDN官方博客
返回首页
Ricky_Huo
CSDN认证博客专家
CSDN认证企业博客
码龄7年
暂无认证
48
原创
19万+
周排名
129万+
总排名
52万+
访问
等级
2760
积分
73
粉丝
71
获赞
71
评论
203
收藏
私信
关注
热门文章
关于发邮件报错535 Error:authentication failed解决方法
225295
Python中List遍历的若干种方法
26152
ClickHouse JDBC插入性能测试(基于Hangout)
22631
使用Python往Elasticsearch插入数据
22174
Waterdrop帮你快速玩转Spark数据处理
21674
分类专栏
Python
13篇
Django
4篇
Git
3篇
Spark
13篇
其他
3篇
Linux
4篇
Elastic
9篇
Java
3篇
Scala
3篇
Prometheus
2篇
Hangout
3篇
Clickhouse
5篇
Zookeeper
1篇
Kafka
Flume
1篇
Golang
1篇
Waterdrop
6篇
最新评论
ElasticSearch pinyin分词支持多音字
qq_40432718:
analyzer使用multiple_pinyin吗?
关于发邮件报错535 Error:authentication failed解决方法
奔跑中的小猿:
授权码重置没用,安装认证再加密也没用,,,试了N种方法,最后发现我虽然可以ping 通smtp.163.com,但是当真正解析到邮件服务器真实IP时,465端口挡住了去路,再打听,发现公司网络防火墙屏蔽了465端口,我裂开了~~~之后一系列的申请流程搞下来,终于解决了
使用hangout将Kafka数据实时清洗写入ClickHouse
程序猿如何淡定的装逼:
从kafka消费的数据直接存入clickhouse可以,但是没必要,有些数据不需要,有些需要做计算,我们使用clickhouse的目的不是为了存储数据,hangout则承担了清洗数据,这样存入clickhouse的数据使用起来才不费力,不然还得去花时间查询出来再清洗,这样就在业务层增加了负担
Zookeeper一次故障处理
夏橙、:
你这还是没解决问题啊,怎么就抛弃Clickhhouse的分布式表了
Waterdrop推动Spark Structured Streaming走向生产环境
在路上2021:
就是把output打印到控制台了,这里变了
[code=java]
#接下来我们选择将结果实时输出到Kafka
output {
# 打印到控制台
stdout {}
[/code]
您愿意向朋友推荐“博客详情页”吗?
强烈不推荐
不推荐
一般般
推荐
强烈推荐
提交
最新文章
如何用Spark实现一个通用大数据引擎
从Flink上谈当今实时流处理
Golang Benchmark Test
2020年4篇
2019年5篇
2018年15篇
2017年9篇
2016年17篇
目录
目录
分类专栏
Python
13篇
Django
4篇
Git
3篇
Spark
13篇
其他
3篇
Linux
4篇
Elastic
9篇
Java
3篇
Scala
3篇
Prometheus
2篇
Hangout
3篇
Clickhouse
5篇
Zookeeper
1篇
Kafka
Flume
1篇
Golang
1篇
Waterdrop
6篇
目录
评论 3
被折叠的 条评论
为什么被折叠?
到【灌水乐园】发言
查看更多评论
打赏作者
Ricky_Huo
你的鼓励将是我创作的最大动力
¥2
¥4
¥6
¥10
¥20
输入1-500的整数
余额支付
(余额:-- )
扫码支付
扫码支付:¥2
获取中
扫码支付
您的余额不足,请更换扫码支付或充值
打赏作者
实付元
使用余额支付
点击重新获取
扫码支付
钱包余额
抵扣说明:
1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、C币套餐、付费专栏及课程。
余额充值