hive数据库和表的创建
给dim添加最新状态记录
任务
接着上一篇数据抽取的任务继续 需用到上篇ods数据抽取的数据继续练习
hive数据库和表的创建
1、创建dwd数据库
create database dwd;
2、创建dim_user_info 表,分区字段etl_date
CREATE TABLE `dim_user_info` ( `id` bigint, `login_name` string, `nick_name` string, `passwd` string, `name` string, `phone_num` string, `email` string, `head_img` string, `user_level` string, `birthday` timestamp, `gender` string, `create_time` timestamp, `operate_time` timestamp, `dwd_insert_user` string, `dwd_insert_time` timestamp, `dwd_modify_user` string, `dwd_modify_time` timestamp ) PARTITIONED BY ( `etl_date` string) row format delimited fields terminated by " 01" stored as textfile;
3、创建dim_sku_info 表,分区字段 etl_dat
CREATE TABLE `dim_sku_info` ( `id` bigint, `spu_id` bigint, `price` decimal(10, 0), `sku_name` string, `sku_desc` string, `weight` decimal(10, 2), `tm_id` bigint, `category3_id` bigint, `sku_default_img` string, `create_time` timestamp, `dwd_insert_user` string, `dwd_insert_time` timestamp, `dwd_modify_user` string, `dwd_modify_time` timestamp ) PARTITIONED BY ( `etl_date` string) row format delimited fields terminated by " 01" stored as textfile;
4、创建dim_base_province 表,分区字段 etl_date
CREATE TABLE `dim_base_province` ( `id` bigint, `name` string, `region_id` string, `area_code` string, `iso_code` string, `create_time` timestamp, `dwd_insert_user` string, `dwd_insert_time` timestamp, `dwd_modify_user` string, `dwd_modify_time` timestamp ) PARTITIONED BY ( `etl_date` string) row format delimited fields terminated by " 01" stored as textfile;
5、 创建dim_base_region 表,分区字段是 etl_date
CREATE TABLE `dim_base_region` ( `id` string, `region_name` string, `create_time` timestamp, `dwd_insert_user` string, `dwd_insert_time` timestamp, `dwd_modify_user` string, `dwd_modify_time` timestamp ) PARTITIONED BY ( `etl_date` string) row format delimited fields terminated by " 01" stored as textfile;
6、 创建fact_order_info 表 分区字段etl_date
CREATE TABLE `fact_order_info` ( `id` bigint, `consignee` string, `consignee_tel`string, `final_total_amount` decimal(16, 2), `order_status` string, `user_id` bigint, `delivery_address` string, `order_comment` string, `out_trade_no` string, `trade_body` string, `create_time` timestamp, `operate_time` timestamp, `expire_time` timestamp, `tracking_no` string, `parent_order_id` bigint, `img_url` string, `province_id` int, `benefit_reduce_amount` decimal(16, 2), `original_total_amount` decimal(16, 2), `feight_fee` decimal(16, 2), `dwd_insert_user` string, `dwd_insert_time` timestamp, `dwd_modify_user` string, `dwd_modify_time` timestamp ) PARTITIONED BY ( `etl_date` string) row format delimited fields terminated by " 01" stored as textfile;
7、创建fact_order_detail 表 分区字段是etl_date
CREATE TABLE `fact_order_detail` ( `id` bigint, `order_id` bigint, `sku_id` bigint, `sku_name` string, `img_url` string, `order_price` decimal(10, 2), `sku_num` string, `create_time` timestamp, `source_type` string, `source_id` bigint, `dwd_insert_user` string, `dwd_insert_time` timestamp, `dwd_modify_user` string, `dwd_modify_time` timestamp ) PARTITIONED BY ( `etl_date` string) row format delimited fields terminated by " 01" stored as textfile;
给dim添加最新状态记录
这里可以自己给dwd数据库中dim打头的所有表 自己给里面添加最新状态(分区时间)的数据方便实验测试结果。测试完成可以在hive的dwd数据库的操作表中使用select查看按题目要求添加后的新状态记录以及题目要求的其余内容。
1、dim_user_info表
INSERT INTO `dim_user_info` VALUES (82, 'rz4gxf1', '阿仁+1', NULL, '康仁', '13437298274', '[email protected]', NULL, '1', '2004-04-26', 'M', '2020-04-26 18:57:55', '2020-04-26 06:02:36','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403'); INSERT INTO `dim_user_info` VALUES (85, 'cte9ov7mv', '波宁+1', NULL, '华良海', '13751595688', '[email protected]', NULL, '1', '1989-04-26', 'M', '2020-04-26 18:57:55', '2020-04-26 23:54:52','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');
2、dim_sku_info表
INSERT INTO `dim_sku_info` VALUES (1, 1, 2220, '荣耀10青春版+1 幻彩渐变 2400万AI自拍 全网通版4GB+64GB 渐变蓝 移动联通电信4G全面屏手机 双卡双待', 'new sku_desc', 0.24, 2, 61, 'http://AOvKmfRQEBRJJllwCwCuptVAOtBBcIjWeJRsmhbJ','2021-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403'); INSERT INTO `dim_sku_info` VALUES (2, 2, 3321, 'TCL 55A950C 55英寸32核人工智能+1 HDR曲面超薄4K电视金属机身(枪色)', 'new sku_desc', 15.24, 4, 86, 'http://JfJSvAnPkErYPcUsbgCuokhjxKiLeqpDXakZqFeE','2021-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');
3、dim_base_province表
INSERT INTO `dim_base_province` VALUES (1, '北京+1', '1', '110000', 'CN-11','2024-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403'); INSERT INTO `dim_base_province` VALUES (2, '天津市+1', '1', '120000', 'CN-12','2024-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');
4、dim_base_region表
INSERT INTO `dim_base_region` VALUES ('1', '华北+1','2024-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403'); INSERT INTO `dim_base_region` VALUES ('2', '华东+1','2024-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');
任务
要求:使用Scala编写spark工程代码,将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss。
1、抽取ods库中user_info表中昨天的分区(任务一生成的分区)数据,并结合dim_user_info最新分区现有的数据,根据id合并数据到dwd库中dim_user_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据operate_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,同时若operate_time为空,则用create_time填充,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条记录第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均存当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli执行show partitions dwd.dim_user_info命令
def main(args: Array[String]): Unit = { //添加hdfs用户名 System.getProperty("HADOOP_user_name","root") System.getProperty("user.name","root") //创建Session对象 val spark: SparkSession = SparkSession .builder() .master("local[*]") .appName("sparkOneHive") .enableHiveSupport() .getOrCreate() //结构 插入最终查询结果字段(字段开窗过滤(dos.user_info的查询字段 union all dwd.dim_user_info)条件 开窗排序=1) spark.sql( """ |insert overwrite table dwd.dim_user_info |partition (etl_date = '20230403') |select |id, |login_name, |nick_name, |passwd, |name, |phone_num, |email, |head_img, |user_level, |birthday, |gender, |create_time, |operate_time, |dwd_insert_user, |insert_time, |dwd_modify_user, |dwd_modify_time |from |(select |id, |login_name, |nick_name, |passwd, |name, |phone_num, |email, |head_img, |user_level, |birthday, |gender, |create_time, |operate_time, |dwd_insert_user, |dwd_insert_time, |dwd_modify_user, |dwd_modify_time, |min(dwd_insert_time) over(partition by id) insert_time, |row_number() over(partition by id order by operate_time desc) row |from |(select |id, |login_name, |nick_name, |passwd, |name, |phone_num, |email, |head_img, |user_level, |birthday, |gender, |create_time, |if(operate_time is null,create_time,operate_time) operate_time, |"user1" dwd_insert_user, |cast(date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_insert_time, |"user1" dwd_modify_user, |cast(date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_modify_time |from ods.user_info |where etl_date = '20230330' |union all |select |id, |login_name, |nick_name, |passwd, |name, |phone_num, |email, |head_img, |user_level, |birthday, |gender, |create_time, |if(operate_time is null,create_time,operate_time) operate_time, |dwd_insert_user, |dwd_insert_time, |dwd_modify_user, |dwd_modify_time |from dwd.dim_user_info |where etl_date = '20230403' |)dw |)tmp where row = 1 |""".stripMargin) spark.sql("select * from dwd.dim_user_info").show() spark.sql("show partitions dwd.dim_user_info").show() spark.close() }
2、抽取ods库sku_info表中昨天的分区(任务一生成的分区)数据,并结合dim_sku_info最新分区现有的数据,根据id合并数据到dwd库中dim_sku_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据create_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条数据第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli查询表dim_sku_info的字段id、sku_desc、dwd_insert_user、dwd_modify_time、etl_date,条件为最新分区的数据,id大于等于15且小于等于20,并且按照id升序排序
def main(args: Array[String]): Unit = { //添加hdfs用户名 System.getProperty("HADOOP_user_name","root") System.getProperty("user.name","root") //创建Session对象 val spark: SparkSession = SparkSession .builder() .master("local[*]") .appName("sparkOneHive") .enableHiveSupport() .getOrCreate() spark.sql("""insert overwrite table dwd.dim_sku_info partition (etl_date = '20230403') |select |id, |spu_id, |price, |sku_name, |sku_desc, |weight, |tm_id, |category3_id, |sku_default_img, |create_time, |dwd_insert_user, |insert_time, |dwd_modify_user, |dwd_modify_time |from | (select | id, | spu_id, | price, | sku_name, | sku_desc, | weight, | tm_id, | category3_id, | sku_default_img, | create_time, | dwd_insert_user, | dwd_insert_time, | dwd_modify_user, | dwd_modify_time, | min(dwd_insert_time) over(partition by id) insert_time, | row_number() over(partition by id order by create_time desc) rw |from | (select | id, | spu_id, | price, | sku_name, | sku_desc, | weight, | tm_id, | category3_id, | sku_default_img, | cast(date_format(create_time , 'yyyy-MM-dd HH-mm-ss') as timestamp) create_time, | 'user1' dwd_insert_user, | cast(date_format(current_timestamp() , 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_insert_time, | 'user1' dwd_modify_user, | cast(date_format(current_timestamp() , 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_modify_time | from ods.sku_info | where etl_date = '20230330' | union all | select | id, | spu_id, | price, | sku_name, | sku_desc, | weight, | tm_id, | category3_id, | sku_default_img, | cast(create_time as timestamp) create_time, | dwd_insert_user, | dwd_insert_time, | dwd_modify_user, | dwd_modify_time | from dwd.dim_sku_info | where etl_date = '20230403' | ) dw | )tmp where rw = 1""".stripMargin) spark.sql("select id,sku_desc,dwd_insert_user,dwd_modify_time from dwd.dim_sku_info where id >= 15 and id <= 20 order by id").show() spark.sql("show partitions dwd.dim_sku_info").show() spark.close() }
3、抽取ods库base_province表中昨天的分区(任务一生成的分区)数据,并结合dim_province最新分区现有的数据,根据id合并数据到dwd库中dim_province的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据create_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条数据第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli在表dwd.dim_province最新分区中,查询该分区中数据的条数
def main(args: Array[String]): Unit = { //添加hdfs用户名 System.getProperty("HADOOP_user_name","root") System.getProperty("user.name","root") //创建Session对象 val spark: SparkSession = SparkSession .builder() .master("local[*]") .appName("sparkOneHive") .enableHiveSupport() .getOrCreate() spark.sql(""" |insert overwrite table dwd.dim_base_province |partition (etl_date = '20230403') |select |id, |name, |region_id, |area_code, |iso_code, |create_time, |dwd_insert_user, |dwd_insert_time, |dwd_modify_user, |dwd_modify_time |from | (select | id, | name, | region_id, | area_code, | iso_code, | create_time, | dwd_insert_user, | dwd_insert_time, | dwd_modify_user, | dwd_modify_time, | min(dwd_insert_time) over(partition by id) insert_time, | row_number() over(partition by id order by create_time desc) rw | from | (select | id, | name, | region_id, | area_code, | iso_code, | create_time, | 'user1' dwd_insert_user, | cast(date_format(current_timestamp() , 'yyyy-MM-dd HH-mm-ss') as timestamp) dwd_insert_time, | 'user1' dwd_modify_user, | cast(date_format(current_timestamp() , 'yyyy-MM-dd HH-mm-ss') as timestamp) dwd_modify_time | from ods.base_province where etl_date = '20230405' | union all | select | id, | name, | region_id, | area_code, | iso_code, | create_time, | dwd_insert_user, | dwd_insert_time, | dwd_modify_user, | dwd_modify_time | from dwd.dim_base_province where etl_date = '20230403' | )dw | )tmp where rw = 1 | """.stripMargin) spark.sql("select count(*) from dwd.dim_base_province").show() spark.sql("show partitions dwd.dim_base_province").show() spark.close() }
4、抽取ods库base_region表中昨天的分区(任务一生成的分区)数据,并结合dim_region最新分区现有的数据,根据id合并数据到dwd库中dim_region的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据create_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条数据第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli在表dwd.dim_region最新分区中,查询该分区中数据的条数
def main(args: Array[String]): Unit = { //添加hdfs用户名 System.getProperty("HADOOP_user_name","root") System.getProperty("user.name","root") //创建Session对象 val spark: SparkSession = SparkSession .builder() .master("local[*]") .appName("sparkOneHive") .enableHiveSupport() .getOrCreate() spark.sql( """ |insert overwrite table dwd.dim_base_region partition (etl_date = '20230403') |select |id, |region_name, |create_time, |dwd_insert_user, |insert_time, |dwd_modify_user, |dwd_modify_time |from | (select | id, | region_name, | create_time, | dwd_insert_user, | dwd_insert_time, | dwd_modify_user, | dwd_modify_time, | min(dwd_insert_time) over(partition by id) insert_time, | row_number() over(partition by id order by create_time desc) rw | from | (select | id, | region_name, | create_time, | 'user1' dwd_insert_user, | cast(date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_insert_time, | 'user1' dwd_modify_user, | cast(date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_modify_time | from ods.base_region where etl_date = '20230401' | union all | select | id, | region_name, | create_time, | dwd_insert_user, | dwd_insert_time, | dwd_modify_user, | dwd_modify_time | from dwd.dim_base_region | where etl_date = '20230403' | )dw | )tmp where rw = 1 |""".stripMargin) spark.sql("select * from dwd.dim_base_region").show() spark.sql("show partitions dwd.dim_base_region").show() spark.close() }
5、将ods库中order_info表昨天的分区(任务一生成的分区)数据抽取到dwd库中fact_order_info的动态分区表,分区字段为etl_date,类型为String,取create_time值并将格式转换为yyyyMMdd,同时若operate_time为空,则用create_time填充,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”,dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。使用hive cli执行show partitions dwd.fact_order_info命令
使用动态分区需要在hive-site.xml配置文件种添加如下
<!--配置动态分区--> <property> <name>hive.exec.dynamic.partition.mode</name> <value>nonstrict</value> </property>
def main(args: Array[String]): Unit = { //添加hdfs用户名 System.getProperty("HADOOP_user_name","root") System.getProperty("user.name","root") //创建Session对象 val spark: SparkSession = SparkSession .builder() .master("local[*]") .appName("sparkOneHive") .enableHiveSupport() .getOrCreate() spark.sql(""" |insert overwrite table dwd.fact_order_info partition(etl_date) |select | id, | consignee, | consignee_tel, | final_total_amount, | order_status, | user_id, | delivery_address, | order_comment, | out_trade_no, | trade_body, | cast(date_format(create_time,'yyyyMMdd') as timestamp) create_time, | cast(date_format(if(operate_time is null,create_time,operate_time),'yyyyMMdd') as timestamp) operate_time, | expire_time, | tracking_no, | parent_order_id, | img_url, | province_id, | benefit_reduce_amount, | original_total_amount, | feight_fee, | 'user1' dwd_insert_user, | cast(date_format(current_timestamp() , 'yyyy-MM-dd') as timestamp) dwd_insert_time, | 'user1' dwd_modify_user, | cast(date_format(current_timestamp() , 'yyyy-MM-dd') as timestamp) dwd_modify_time, | etl_date | from ods.order_info where etl_date = '20230401'""".stripMargin) spark.sql("select * from dwd.fact_order_info").show() spark.sql("show partitions dwd.fact_order_info").show() spark.close() }
6、将ods库中order_detail表昨天的分区(任务一中生成的分区)数据抽取到dwd库中fact_order_detail的动态分区表,分区字段为etl_date,类型为String,取create_time值并将格式转换为yyyyMMdd,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”,dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。使用hive cli执行show partitions dwd.fact_order_detail命令
def main(args: Array[String]): Unit = { //添加hdfs用户名 System.getProperty("HADOOP_user_name","root") System.getProperty("user.name","root") //创建Session对象 val spark: SparkSession = SparkSession .builder() .master("local[*]") .appName("sparkOneHive") .enableHiveSupport() .getOrCreate() spark.sql( """ |insert overwrite table dwd.fact_order_detail partition(etl_date) |select |id, |order_id, |sku_id, |sku_name, |img_url, |order_price, |sku_num, |cast(date_format(create_time,'yyyyMMdd') as timestamp) create_time, |source_type, |source_id, |'user1' dwd_insert_user, |cast(date_format(current_timestamp() , 'yyyy-MM-dd HH-mm-ss') as timestamp) dwd_insert_time, |'user1' dwd_modify_user, |cast(date_format(current_timestamp() , 'yyyy-MM-dd HH-mm-ss') as timestamp) dwd_modify_time, |etl_date |from ods.order_detail where etl_date = '20230401' |""".stripMargin) spark.sql("select * from dwd.fact_order_detail").show() spark.sql("show partitions dwd.fact_order_detail").show() spark.close() }