您当前的位置:首页 >> 家居百科

大数据Hadoop之——Apache Hudi 数据湖东实战操作

2023-04-28 12:16:13

-spark3.2-bundle_2.12-0.12.0.jar,/opt/apache/hudi-0.12.0/hudi-examples/hudi-examples-spark/target/lib/unused-1.0.0.jar,/opt/apache/spark-avro_2.12-3.3.0.jar ----conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' ----conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"2、导入park及Hudi涉及自带import org.apache.hudi.QuickstartUtils._import scala.collection.javaConversions._import org.apache.spark.sql.SaveMode._import org.apache.hudi.DataSourceReadOptions._import org.apache.hudi.DataSourceWriteOptions._import org.apache.hudi.config.HoodieWriteConfig._import org.apache.hudi.common.model.HoodieRecord3、界定数据种类val tableName = "hudi_trips_cow"# 加载到HDFSval basePath = "hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/hudi_trips_cow"# 加载到本地# val basePath = "file:///tmp/hudi_trips_cow"4、演示转换成Trip乘车信息##框架DataGenerator对象,主要用途演示转换成10条Trip乘车信息val dataGen = new DataGenerator val inserts = convertToStringList(dataGen.generateInserts(10))

其中,DataGenerator可以主要用途转换成的测试信息,用来同步进行后续操控。

5、将演示信息List转换成为DataFrame信息集##转成dfval df = spark.read.json(spark.sparkContext.parallelize(inserts,2))##查看信息结构df.printSchema()##查看信息df.show()# 登录字符串检索df.select("rider","begin_lat","begin_lon","driver","end_lat","end_lon","fare","partitionpath","ts","uuid").show(10,truncate=false)6、将信息写入到hudi# 将信息保存到hudi所列中,由于Hudi肇始时基于Spark框架,所以SparkSQL支持Hudi信息源,直接通过format登录信息源Source,设置涉及一般来说保存信息亦可,注意,hudi不是正真加载信息,而是行政信息。df.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath)## 关键值解释#值:getQuickstartWriteConfigs,设置写入/更换信息至Hudi时,Shuffle时分区数量#值:PRECOMBINE_FIELD_OPT_KEY,信息并入时,依据主键字符串#值:RECORDKEY_FIELD_OPT_KEY,每条历史纪录的唯一id,支持多个字符串#值:PARTITIONPATH_FIELD_OPT_KEY,主要用途存放信息的分区字符串

本地加载

HDFS 加载

四、Flink 与 Hudi 导入适用

官方网站下述:

1)启动时flink炮兵部队

App电话号码:

### 1、App软件自带wget _2.12.tgztar -xf flink-1.14.6-bin-scala_2.12.tgzexport FLINK_HOME=/opt/apache/flink-1.14.6### 2、设置HADOOP_CLASSPATH# HADOOP_HOME is your hadoop root directory after unpack the binary package.export HADOOP_CLASSPATH=在在$HADOOP_HOME/bin/hadoop classpath在在export HADOOP_CONF_DIR='/opt/apache/hadoop/etc/hadoop'### 3、启动时单端口flink 炮兵部队# Start the Flink standalone cluster,这里再行变更slot数量,默认是1,这里改成4# taskmanager.numberOfTaskSlots: 4cd $FLINK_HOME./bin/start-cluster.sh# 的测试比如说性./bin/flink run examples/batch/WordCount.jar 2) 启动时flink SQL 服务器端# 【第一种方式为】登录jar自带./bin/sql-client.sh embedded -j ../hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.14-bundle-0.12.0.jar shell# 【第二种方式为】还可以将jar自带置于$FINK_HOME/lib目录下./bin/sql-client.sh embedded shell3)添加信息---- sets up the result mode to tableau to show the results directly in the CLISET 'sql-client.execution.result-mode' = 'tableau';CREATE TABLE t1( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), 在在partition在在 VARCHAR(20))PARTITIONED BY (在在partition在在)WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/flink-hudi-t1', 'table.type' = 'MERGE_ON_READ' ---- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE);INSERT INTO t1 VALUES ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');---- insert data using valuesINSERT INTO t1 VALUES ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'), ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

HDFS上查看

4)检索信息(批式检索)select * from t1; 5)更换信息---- this would update the record with key 'id1'insert into t1 values ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');6)Streaming Query(文件传输检索)

首再行始创所列t2,设置涉及一般来说,以流的方式为检索读取,同构到纸片所列:t1

read.streaming.enabled 设置为true,所列明通过streaming的方式为读取所列信息;read.streaming.check-interval 登录了source防范更进一步commits的间隔时间4stable.type 设置所列种类为 MERGE_ON_READCREATE TABLE t2( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), 在在partition在在 VARCHAR(20))PARTITIONED BY (在在partition在在)WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/flink-hudi-t1', 'table.type' = 'MERGE_ON_READ', 'read.streaming.enabled' = 'true', ---- this option enable the streaming read 'read.start-commit' = '20210316134557', ---- specifies the start commit instant time 'read.streaming.check-interval' = '4' ---- specifies the check interval for finding new source commits, default 60s.);---- Then query the table in stream modeselect * from t2;

注意:查看可能才会遇到如下错误:

[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.hadoop.Hive.ql.io.parquet.MapredParquetInputFormat

【解决】添加hadoop-mapreduce-client-core-xxx.jar和hive-exec-xxx.jar到Flink lib中。

cp /opt/apache/hadoop-3.3.2/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.2.jar $FLINK_HOME/libcp ./hudi-0.12.0/hudi-examples/hudi-examples-spark/target/lib/hive-exec-2.3.1-core.jar $FLINK_HOME/lib

Hive 与 Hudi的导入,小伙伴可以再行看官网文档:_metastore/#flink-setup

Spark 和 Hudi导入,Flink 与 Hudi导入再行到这里了,还有很多其它大信息组件与Hudi的导入下述讲解才会置于后面文章讲解,问小伙伴耐心回头,有任何疑惑赞许留言,才会持续更换【大信息+云原生】涉及的文章~

小孩鼻炎吃再林阿莫西林颗粒有用吗
腹泻吃益生菌没用
英特达泊西汀片(60mg)怎么吃效果好
便秘吃什么药好
经常腹痛拉肚子怎么办
相关阅读
友情链接