离线数据处理 任务二:数据清洗

hive数据库和表的创建

给dim添加最新状态记录

任务 

        接着上一篇数据抽取的任务继续 需用到上篇ods数据抽取的数据继续练习

hive数据库和表的创建

        1、创建dwd数据库

create database dwd;

        2、创建dim_user_info 表,分区字段etl_date

CREATE TABLE `dim_user_info`  (
  `id` bigint,
  `login_name` string,
  `nick_name` string,
  `passwd` string,
  `name` string,
  `phone_num` string,
  `email` string,
  `head_img` string,
  `user_level` string,
  `birthday` timestamp,
  `gender` string,
  `create_time` timestamp,
  `operate_time` timestamp,
  `dwd_insert_user` string,
  `dwd_insert_time` timestamp,
  `dwd_modify_user` string,
  `dwd_modify_time` timestamp
)  PARTITIONED BY ( `etl_date` string)
row format delimited
fields terminated by "01"
stored as textfile;

        3、创建dim_sku_info 表,分区字段 etl_dat 

CREATE TABLE `dim_sku_info`  (
  `id` bigint,
  `spu_id` bigint,
  `price` decimal(10, 0),
  `sku_name` string,
  `sku_desc`  string,
  `weight` decimal(10, 2),
  `tm_id` bigint,
  `category3_id` bigint,
  `sku_default_img` string,
  `create_time` timestamp,
  `dwd_insert_user` string,
  `dwd_insert_time` timestamp,
  `dwd_modify_user` string,
  `dwd_modify_time` timestamp
) PARTITIONED BY ( `etl_date` string)
row format delimited
fields terminated by "01"
stored as textfile;

         4、创建dim_base_province 表,分区字段 etl_date

CREATE TABLE `dim_base_province`  (
  `id` bigint,
  `name` string,
  `region_id` string,
  `area_code` string,
  `iso_code` string,
  `create_time` timestamp,
  `dwd_insert_user` string,
  `dwd_insert_time` timestamp,
  `dwd_modify_user` string,
  `dwd_modify_time` timestamp
)  PARTITIONED BY ( `etl_date` string)
row format delimited
fields terminated by "01"
stored as textfile;

        5、 创建dim_base_region 表,分区字段是 etl_date

CREATE TABLE `dim_base_region`  (
  `id` string,
  `region_name` string,
  `create_time` timestamp,
  `dwd_insert_user` string,
  `dwd_insert_time` timestamp,
  `dwd_modify_user` string,
  `dwd_modify_time` timestamp
) PARTITIONED BY ( `etl_date` string)
row format delimited
fields terminated by "01"
stored as textfile;

        6、 创建fact_order_info 表 分区字段etl_date 

CREATE TABLE `fact_order_info`  (
  `id` bigint,
  `consignee` string,
  `consignee_tel`string,
  `final_total_amount` decimal(16, 2),
  `order_status` string,
  `user_id` bigint,
  `delivery_address` string,
  `order_comment` string,
  `out_trade_no` string,
  `trade_body` string,
  `create_time` timestamp,
  `operate_time` timestamp,
  `expire_time` timestamp,
  `tracking_no` string,
  `parent_order_id` bigint,
  `img_url` string,
  `province_id` int,
  `benefit_reduce_amount` decimal(16, 2),
  `original_total_amount` decimal(16, 2),
  `feight_fee` decimal(16, 2),
  `dwd_insert_user` string,
  `dwd_insert_time` timestamp,
  `dwd_modify_user` string,
  `dwd_modify_time` timestamp
) PARTITIONED BY ( `etl_date` string)
row format delimited
fields terminated by "01"
stored as textfile;

        7、创建fact_order_detail 表 分区字段是etl_date 

CREATE TABLE `fact_order_detail`  (
  `id` bigint,
  `order_id` bigint,
  `sku_id` bigint,
  `sku_name` string,
  `img_url` string,
  `order_price` decimal(10, 2),
  `sku_num` string,
  `create_time` timestamp,
  `source_type` string,
  `source_id` bigint,
  `dwd_insert_user` string,
  `dwd_insert_time` timestamp,
  `dwd_modify_user` string,
  `dwd_modify_time` timestamp
) PARTITIONED BY ( `etl_date` string)
row format delimited
fields terminated by "01"
stored as textfile;

给dim添加最新状态记录

        这里可以自己给dwd数据库中dim打头的所有表 自己给里面添加最新状态(分区时间)的数据方便实验测试结果。测试完成可以在hive的dwd数据库的操作表中使用select查看按题目要求添加后的新状态记录以及题目要求的其余内容。

        1、dim_user_info表

INSERT INTO `dim_user_info` VALUES (82, 'rz4gxf1', '阿仁+1', NULL, '康仁', '13437298274', '[email protected]', NULL, '1', '2004-04-26', 'M', '2020-04-26 18:57:55', '2020-04-26 06:02:36','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');
INSERT INTO `dim_user_info` VALUES (85, 'cte9ov7mv', '波宁+1', NULL, '华良海', '13751595688', '[email protected]', NULL, '1', '1989-04-26', 'M', '2020-04-26 18:57:55', '2020-04-26 23:54:52','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');

        2、dim_sku_info表

INSERT INTO `dim_sku_info` VALUES (1, 1, 2220, '荣耀10青春版+1 幻彩渐变 2400万AI自拍 全网通版4GB+64GB 渐变蓝 移动联通电信4G全面屏手机 双卡双待', 'new sku_desc', 0.24, 2, 61, 'http://AOvKmfRQEBRJJllwCwCuptVAOtBBcIjWeJRsmhbJ','2021-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');
INSERT INTO `dim_sku_info` VALUES (2, 2, 3321, 'TCL 55A950C 55英寸32核人工智能+1 HDR曲面超薄4K电视金属机身(枪色)', 'new sku_desc', 15.24, 4, 86, 'http://JfJSvAnPkErYPcUsbgCuokhjxKiLeqpDXakZqFeE','2021-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');

        3、dim_base_province表

INSERT INTO `dim_base_province` VALUES (1, '北京+1', '1', '110000', 'CN-11','2024-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');
INSERT INTO `dim_base_province` VALUES (2, '天津市+1', '1', '120000', 'CN-12','2024-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');

        4、dim_base_region表

INSERT INTO `dim_base_region` VALUES ('1', '华北+1','2024-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');
INSERT INTO `dim_base_region` VALUES ('2', '华东+1','2024-01-01 12:21:13','user1','2020-04-26 18:57:55','user1','2020-04-26 18:57:55','20230403');

任务 

要求:使用Scala编写spark工程代码,将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss。

1、抽取ods库中user_info表中昨天的分区(任务一生成的分区)数据,并结合dim_user_info最新分区现有的数据,根据id合并数据到dwd库中dim_user_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据operate_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,同时若operate_time为空,则用create_time填充,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条记录第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均存当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli执行show partitions dwd.dim_user_info命令 

def main(args: Array[String]): Unit = {
    //添加hdfs用户名
    System.getProperty("HADOOP_user_name","root")
    System.getProperty("user.name","root")
    //创建Session对象
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("sparkOneHive")
      .enableHiveSupport()
      .getOrCreate()

    //结构 插入最终查询结果字段(字段开窗过滤(dos.user_info的查询字段 union all dwd.dim_user_info)条件 开窗排序=1)
    spark.sql(
      """
        |insert overwrite table dwd.dim_user_info
        |partition (etl_date = '20230403')
        |select
        |id,
        |login_name,
        |nick_name,
        |passwd,
        |name,
        |phone_num,
        |email,
        |head_img,
        |user_level,
        |birthday,
        |gender,
        |create_time,
        |operate_time,
        |dwd_insert_user,
        |insert_time,
        |dwd_modify_user,
        |dwd_modify_time
        |from
          |(select
          |id,
          |login_name,
          |nick_name,
          |passwd,
          |name,
          |phone_num,
          |email,
          |head_img,
          |user_level,
          |birthday,
          |gender,
          |create_time,
          |operate_time,
          |dwd_insert_user,
          |dwd_insert_time,
          |dwd_modify_user,
          |dwd_modify_time,
          |min(dwd_insert_time) over(partition by id) insert_time,
          |row_number() over(partition by id order by operate_time desc) row
          |from
            |(select
            |id,
            |login_name,
            |nick_name,
            |passwd,
            |name,
            |phone_num,
            |email,
            |head_img,
            |user_level,
            |birthday,
            |gender,
            |create_time,
            |if(operate_time is null,create_time,operate_time) operate_time,
            |"user1" dwd_insert_user,
            |cast(date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_insert_time,
            |"user1" dwd_modify_user,
            |cast(date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_modify_time
            |from ods.user_info
            |where etl_date = '20230330'
            |union all
            |select
            |id,
            |login_name,
            |nick_name,
            |passwd,
            |name,
            |phone_num,
            |email,
            |head_img,
            |user_level,
            |birthday,
            |gender,
            |create_time,
            |if(operate_time is null,create_time,operate_time) operate_time,
            |dwd_insert_user,
            |dwd_insert_time,
            |dwd_modify_user,
            |dwd_modify_time
            |from dwd.dim_user_info
            |where etl_date = '20230403'
            |)dw
          |)tmp where row = 1
        |""".stripMargin)
    
    spark.sql("select * from dwd.dim_user_info").show()
    spark.sql("show partitions dwd.dim_user_info").show()
    
    spark.close()
  }

2、抽取ods库sku_info表中昨天的分区(任务一生成的分区)数据,并结合dim_sku_info最新分区现有的数据,根据id合并数据到dwd库中dim_sku_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据create_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条数据第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli查询表dim_sku_info的字段id、sku_desc、dwd_insert_user、dwd_modify_time、etl_date,条件为最新分区的数据,id大于等于15且小于等于20,并且按照id升序排序

def main(args: Array[String]): Unit = {
    //添加hdfs用户名
    System.getProperty("HADOOP_user_name","root")
    System.getProperty("user.name","root")
    //创建Session对象
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("sparkOneHive")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql("""insert overwrite table dwd.dim_sku_info partition (etl_date = '20230403')
                |select
                |id,
                |spu_id,
                |price,
                |sku_name,
                |sku_desc,
                |weight,
                |tm_id,
                |category3_id,
                |sku_default_img,
                |create_time,
                |dwd_insert_user,
                |insert_time,
                |dwd_modify_user,
                |dwd_modify_time
                |from
                | (select
                | id,
                | spu_id,
                | price,
                | sku_name,
                | sku_desc,
                | weight,
                | tm_id,
                | category3_id,
                | sku_default_img,
                | create_time,
                | dwd_insert_user,
                | dwd_insert_time,
                | dwd_modify_user,
                | dwd_modify_time,
                | min(dwd_insert_time) over(partition by id)  insert_time,
                | row_number() over(partition by id order by create_time desc) rw
                |from
                |  (select
                |  id,
                |  spu_id,
                |  price,
                |  sku_name,
                |  sku_desc,
                |  weight,
                |  tm_id,
                |  category3_id,
                |  sku_default_img,
                |  cast(date_format(create_time , 'yyyy-MM-dd HH-mm-ss') as timestamp) create_time,
                |  'user1' dwd_insert_user,
                |  cast(date_format(current_timestamp() , 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_insert_time,
                |  'user1' dwd_modify_user,
                |  cast(date_format(current_timestamp() , 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_modify_time
                |  from ods.sku_info
                |  where etl_date = '20230330'
                |  union all
                |  select
                |  id,
                |  spu_id,
                |  price,
                |  sku_name,
                |  sku_desc,
                |  weight,
                |  tm_id,
                |  category3_id,
                |  sku_default_img,
                |  cast(create_time as timestamp) create_time,
                |  dwd_insert_user,
                |  dwd_insert_time,
                |  dwd_modify_user,
                |  dwd_modify_time
                |  from dwd.dim_sku_info
                | where etl_date = '20230403'
                | ) dw
                | )tmp where rw = 1""".stripMargin)

    spark.sql("select id,sku_desc,dwd_insert_user,dwd_modify_time from dwd.dim_sku_info where id >= 15 and id <= 20 order by id").show()
    spark.sql("show partitions dwd.dim_sku_info").show()

    spark.close()
  }

3、抽取ods库base_province表中昨天的分区(任务一生成的分区)数据,并结合dim_province最新分区现有的数据,根据id合并数据到dwd库中dim_province的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据create_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条数据第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli在表dwd.dim_province最新分区中,查询该分区中数据的条数 

def main(args: Array[String]): Unit = {
    //添加hdfs用户名
    System.getProperty("HADOOP_user_name","root")
    System.getProperty("user.name","root")
    //创建Session对象
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("sparkOneHive")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql("""
                |insert overwrite table dwd.dim_base_province
                |partition (etl_date = '20230403')
                |select
                |id,
                |name,
                |region_id,
                |area_code,
                |iso_code,
                |create_time,
                |dwd_insert_user,
                |dwd_insert_time,
                |dwd_modify_user,
                |dwd_modify_time
                |from
                |	(select
                |	id,
                |	name,
                |	region_id,
                |	area_code,
                |	iso_code,
                |	create_time,
                |	dwd_insert_user,
                |	dwd_insert_time,
                |	dwd_modify_user,
                |	dwd_modify_time,
                |	min(dwd_insert_time) over(partition by id) insert_time,
                |	row_number() over(partition by id order by create_time desc) rw
                |	from
                |		(select
                |		id,
                |		name,
                |		region_id,
                |		area_code,
                |		iso_code,
                |		create_time,
                |		'user1' dwd_insert_user,
                |		cast(date_format(current_timestamp() , 'yyyy-MM-dd HH-mm-ss') as timestamp) dwd_insert_time,
                |		'user1' dwd_modify_user,
                |		cast(date_format(current_timestamp() , 'yyyy-MM-dd HH-mm-ss') as timestamp) dwd_modify_time
                |		from ods.base_province where etl_date = '20230405'
                |		union all
                |		select
                |		id,
                |		name,
                |		region_id,
                |		area_code,
                |		iso_code,
                |		create_time,
                |		dwd_insert_user,
                |		dwd_insert_time,
                |		dwd_modify_user,
                |		dwd_modify_time
                |		from dwd.dim_base_province where etl_date = '20230403'
                |		)dw
                |	)tmp where rw = 1
                | """.stripMargin)

    spark.sql("select count(*) from dwd.dim_base_province").show()
    spark.sql("show partitions dwd.dim_base_province").show()
    spark.close()
  }

4、抽取ods库base_region表中昨天的分区(任务一生成的分区)数据,并结合dim_region最新分区现有的数据,根据id合并数据到dwd库中dim_region的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据create_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条数据第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli在表dwd.dim_region最新分区中,查询该分区中数据的条数 

def main(args: Array[String]): Unit = {
    //添加hdfs用户名
    System.getProperty("HADOOP_user_name","root")
    System.getProperty("user.name","root")
    //创建Session对象
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("sparkOneHive")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql(
      """
        |insert overwrite table dwd.dim_base_region partition (etl_date = '20230403')
        |select
        |id,
        |region_name,
        |create_time,
        |dwd_insert_user,
        |insert_time,
        |dwd_modify_user,
        |dwd_modify_time
        |from
        |	(select
        |	id,
        |	region_name,
        |	create_time,
        |	dwd_insert_user,
        |	dwd_insert_time,
        |	dwd_modify_user,
        |	dwd_modify_time,
        |	min(dwd_insert_time) over(partition by id) insert_time,
        |	row_number() over(partition by id order by create_time desc) rw
        |	from
        |		(select
        |		id,
        |		region_name,
        |		create_time,
        |		'user1' dwd_insert_user,
        |		cast(date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_insert_time,
        |		'user1' dwd_modify_user,
        |		cast(date_format(current_timestamp(), 'yyyy-MM-dd HH:mm:ss') as timestamp) dwd_modify_time
        |		from ods.base_region where etl_date = '20230401'
        |		union all
        |		select
        |		id,
        |		region_name,
        |		create_time,
        |		dwd_insert_user,
        |		dwd_insert_time,
        |		dwd_modify_user,
        |		dwd_modify_time
        |		from dwd.dim_base_region
        |		where etl_date = '20230403'
        |		)dw
        |	)tmp where rw = 1
        |""".stripMargin)

    spark.sql("select * from dwd.dim_base_region").show()
    spark.sql("show partitions dwd.dim_base_region").show()
    spark.close()
  }

5、将ods库中order_info表昨天的分区(任务一生成的分区)数据抽取到dwd库中fact_order_info的动态分区表,分区字段为etl_date,类型为String,取create_time值并将格式转换为yyyyMMdd,同时若operate_time为空,则用create_time填充,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”,dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。使用hive cli执行show partitions dwd.fact_order_info命令 

使用动态分区需要在hive-site.xml配置文件种添加如下

<!--配置动态分区-->
    <property>
        <name>hive.exec.dynamic.partition.mode</name>
        <value>nonstrict</value>
    </property>
def main(args: Array[String]): Unit = {
    //添加hdfs用户名
    System.getProperty("HADOOP_user_name","root")
    System.getProperty("user.name","root")
    //创建Session对象
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("sparkOneHive")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql("""
                |insert overwrite table dwd.fact_order_info partition(etl_date)
                |select
                |	id,
                |	consignee,
                |	consignee_tel,
                |	final_total_amount,
                |	order_status,
                |	user_id,
                |	delivery_address,
                |	order_comment,
                |	out_trade_no,
                |	trade_body,
                |	cast(date_format(create_time,'yyyyMMdd') as timestamp) create_time,
                |	cast(date_format(if(operate_time is null,create_time,operate_time),'yyyyMMdd') as timestamp) operate_time,
                |	expire_time,
                |	tracking_no,
                |	parent_order_id,
                |	img_url,
                |	province_id,
                |	benefit_reduce_amount,
                |	original_total_amount,
                |	feight_fee,
                |	'user1' dwd_insert_user,
                |	cast(date_format(current_timestamp() , 'yyyy-MM-dd') as timestamp) dwd_insert_time,
                |	'user1' dwd_modify_user,
                |	cast(date_format(current_timestamp() , 'yyyy-MM-dd') as timestamp) dwd_modify_time,
                |	etl_date
                |	from ods.order_info where etl_date = '20230401'""".stripMargin)

    spark.sql("select * from dwd.fact_order_info").show()
    spark.sql("show partitions dwd.fact_order_info").show()
    spark.close()
  }

6、将ods库中order_detail表昨天的分区(任务一中生成的分区)数据抽取到dwd库中fact_order_detail的动态分区表,分区字段为etl_date,类型为String,取create_time值并将格式转换为yyyyMMdd,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”,dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。使用hive cli执行show partitions dwd.fact_order_detail命令 

def main(args: Array[String]): Unit = {
    //添加hdfs用户名
    System.getProperty("HADOOP_user_name","root")
    System.getProperty("user.name","root")
    //创建Session对象
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("sparkOneHive")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql(
      """
        |insert overwrite table dwd.fact_order_detail partition(etl_date)
        |select
        |id,
        |order_id,
        |sku_id,
        |sku_name,
        |img_url,
        |order_price,
        |sku_num,
        |cast(date_format(create_time,'yyyyMMdd') as timestamp) create_time,
        |source_type,
        |source_id,
        |'user1' dwd_insert_user,
        |cast(date_format(current_timestamp() , 'yyyy-MM-dd HH-mm-ss') as timestamp) dwd_insert_time,
        |'user1' dwd_modify_user,
        |cast(date_format(current_timestamp() , 'yyyy-MM-dd HH-mm-ss') as timestamp) dwd_modify_time,
        |etl_date
        |from ods.order_detail where etl_date = '20230401'
        |""".stripMargin)

    spark.sql("select * from dwd.fact_order_detail").show()
    spark.sql("show partitions dwd.fact_order_detail").show()
    spark.close()
  }