- 现在我们有一个DataFrame,它在一条记录中包含新旧数据,让我们在各自单独的DataFrame中拉取更新记录的活动和非活动实例 。
在进行上述练习时 , 我们将通过更改活动(新)记录的 eff_end_tsto eff_start_ts -1 并更新 actv_ind = 0 来废弃非活动记录
//Prepare Active updatesval updActiveDf = updDf.select(col("stg_seller_id").as("seller_id"),col("stg_prod_category").as("prod_category"),col("stg_product_name").as("product_name"),col("stg_product_package").as("product_package"),col("stg_discount_percentage").as("discount_percentage"),col("stg_eff_start_ts").as("eff_start_ts"),to_timestamp(lit("9999-12-31 23:59:59")) as ("eff_end_ts"),lit(1) as ("actv_ind"))updActiveDf.show(false)+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+|seller_id|prod_category|product_name |product_package|discount_percentage|eff_start_ts |eff_end_ts |actv_ind|+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+|1234 |Detergent |Tide 5L |6 |25 |2022-01-31 10:00:30|9999-12-31 23:59:59|1 ||4565 |Gourmet |Dairy Milk Almond|12 |45 |2022-06-12 20:30:40|9999-12-31 23:59:59|1 |+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+//Prepare inactive updates, which will become obsolete recordsval updInactiveDf = updDf.select(col("tgt_seller_id").as("seller_id"),col("tgt_prod_category").as("prod_category"),col("tgt_product_name").as("product_name"),col("tgt_product_package").as("product_package"),col("tgt_discount_percentage").as("discount_percentage"),col("tgt_eff_start_ts").as("eff_start_ts"),(col("stg_eff_start_ts") - expr("interval 1 seconds")).as("eff_end_ts"),lit(0) as ("actv_ind"))scala> updInactiveDf.show+---------+-------------+---------------+---------------+-------------------+-------------------+-------------------+--------+|seller_id|prod_category| product_name|product_package|discount_percentage| eff_start_ts| eff_end_ts|actv_ind|+---------+-------------+---------------+---------------+-------------------+-------------------+-------------------+--------+| 1234| Detergent| Tide 2L| 6| 15|2021-12-15 15:20:30|2022-01-31 10:00:29| 0|| 4565| Gourmet|Dairy Milk Silk| 6| 30|2021-06-12 20:30:40|2022-06-12 20:30:39| 0|+---------+-------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
- 现在我们将使用union运算符将插入、活动更新和非活动更新拉入单个DataFrame 。将此DataFrame作为最终 Hudi 写入逻辑的增量源 。
scala> val upsertDf = insDf.union(updActiveDf).union(updInactiveDf)scala> upsertDf.show+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+|seller_id|prod_category| product_name|product_package|discount_percentage| eff_start_ts| eff_end_ts|actv_ind|+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+| 3345| Stationary| Sticky Notes| 4| 12|2022-07-09 21:30:45|9999-12-31 23:59:59| 1|| 4565| Gourmet|Dairy Milk Almond| 12| 45|2022-06-12 20:30:40|9999-12-31 23:59:59| 1|| 1234| Detergent| Tide 5L| 6| 25|2022-01-31 10:00:30|9999-12-31 23:59:59| 1|| 4565| Gourmet| Dairy Milk Silk| 6| 30|2021-06-12 20:30:40|2022-06-12 20:30:39| 0|| 1234| Detergent| Tide 2L| 6| 15|2021-12-15 15:20:30|2022-01-31 10:00:29| 0|+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+val path = "gs://target_bucket/hudi_product_catalog"upsertDf.write.format("org.apache.hudi").option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE").option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator").option(RECORDKEY_FIELD_OPT_KEY, "seller_id,prod_category,eff_end_ts").option(PRECOMBINE_FIELD_OPT_KEY, "eff_start_ts").option("hoodie.table.name","hudi_product_catalog").option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "target_schema").option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "hudi_product_catalog").option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL).option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true").option(PARTITIONPATH_FIELD_OPT_KEY, "actv_ind").mode(Append).save(s"$path")scala> spark.sql("refresh table stg_wmt_ww_fin_rtn_mb_dl_secure.hudi_product_catalog")scala> spark.sql("select * from stg_wmt_ww_fin_rtn_mb_dl_secure.hudi_product_catalog").show(false)+-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key |_hoodie_partition_path|_hoodie_file_name |seller_id|prod_category |product_name |product_package|discount_percentage|eff_start_ts |eff_end_ts |actv_ind|+-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+|20220722113258101 |20220722113258101_0_0|seller_id:3412,prod_category:Healthcare,eff_end_ts:253402300799000000 |actv_ind=1 |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|3412 |Healthcare |Dolo 650 |10 |10 |2022-04-01 16:30:45|9999-12-31 23:59:59|1 ||20220722113258101 |20220722113258101_0_1|seller_id:1234,prod_category:Home Essential,eff_end_ts:253402300799000000|actv_ind=1 |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|1234 |Home Essential|Hand Towel |12 |20 |2021-10-20 06:55:22|9999-12-31 23:59:59|1 ||20220722114049500 |20220722114049500_0_2|seller_id:4565,prod_category:Gourmet,eff_end_ts:253402300799000000 |actv_ind=1 |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|4565 |Gourmet |Dairy Milk Almond|12 |45 |2022-06-12 20:30:40|9999-12-31 23:59:59|1 ||20220722114049500 |20220722114049500_0_3|seller_id:1234,prod_category:Detergent,eff_end_ts:253402300799000000 |actv_ind=1 |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|1234 |Detergent |Tide 5L |6 |25 |2022-01-31 10:00:30|9999-12-31 23:59:59|1 ||20220722114049500 |20220722114049500_0_4|seller_id:3345,prod_category:Stationary,eff_end_ts:253402300799000000 |actv_ind=1 |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|3345 |Stationary |Sticky Notes |4 |12 |2022-07-09 21:30:45|9999-12-31 23:59:59|1 ||20220722114049500 |20220722114049500_1_0|seller_id:4565,prod_category:Gourmet,eff_end_ts:1655065839000000 |actv_ind=0 |789e0317-d499-4d74-a5d9-ad6e6517d6b8-0_1-72-2452_20220722114049500.parquet|4565 |Gourmet |Dairy Milk Silk |6 |30 |2021-06-12 20:30:40|2022-06-12 20:30:39|0 ||20220722114049500 |20220722114049500_1_1|seller_id:1234,prod_category:Detergent,eff_end_ts:1643623229000000 |actv_ind=0 |789e0317-d499-4d74-a5d9-ad6e6517d6b8-0_1-72-2452_20220722114049500.parquet|1234 |Detergent |Tide 2L |6 |15 |2021-12-15 15:20:30|2022-01-31 10:00:29|0 |+-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
推荐阅读
-
都江堰市2023年小升初入学指南 都江堰市小升初招生网
-
-
-
-
-
-
-
国漫天官赐福让原著粉丝飘了,你认为是尬捧还是真的好看?
-
宁夏理工学院2022年分省分专业招生计划(本科+专科)
-
-
-
-
-
北京天马旅行社有限公司_工商信用信息_经营范围期限状态_法人_地址_注册资本_怎么样
-
-
-
-
王者荣耀背景修改,王者荣耀实名制弄错了怎么修改?,
-
冷冻的面包吃的时候应该怎么处理 冷冻的面包吃的时候应该怎么处理呢
-