- 现在让我们通过对目标表进行Left Anti Join过滤掉增量表中的所有 Insert only 记录 。
val updFileDf = spark.read.option("header",true).csv("gs://target_bucket/hudi_product_catalog/hudi_product_update.csv")val tgtHudiDf = spark.sql("select * from hudi_product_catalog")hudiTableData.createOrReplaceTempView("hudiTable")//Cast as neededval stgDf = updFileDf.withColumn("eff_start_ts",to_timestamp(col("eff_start_ts"))).withColumn("seller_id",col("seller_id").cast("int"))//Prepare an insert DF from incremental temp DFval instmpDf = stgDf.as("stg") .join(tgtHudiDf.as("tgt"), col("stg.seller_id") === col("tgt.seller_id") && col("stg.prod_category") === col("tgt.prod_category"),"left_anti").select("stg.*")val insDf = instmpDf.withColumn("eff_end_ts",to_timestamp(lit("9999-12-31 23:59:59"))).withColumn("actv_ind",lit(1))insDf.show(false)+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+|seller_id|prod_category|product_name|product_package|discount_percentage| eff_start_ts| eff_end_ts|actv_ind|+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+| 3345| Stationary|Sticky Notes| 4| 12|2022-07-09 21:30:45|9999-12-31 23:59:59| 1|+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+
- 我们有一个只插入记录的DataFrame 。接下来让我们创建一个DataFrame , 其中将包含来自 delta 表和目标表的属性,并在目标上使用内连接,它将获取需要更新的记录 。
//Prepare an update DF from incremental temp DF, select columns from both the tablesval updDf = stgDf.as("stg") .join(tgtHudiDf.as("tgt"), col("stg.seller_id") === col("tgt.seller_id") && col("stg.prod_category") === col("tgt.prod_category"),"inner") .where(col("stg.eff_start_ts") > col("tgt.eff_start_ts")).select((stgDf.columns.map(c => stgDf(c).as(s"stg_$c"))++ tgtHudiDf.columns.map(c => tgtHudiDf(c).as(s"tgt_$c"))):_*)updDf.show(false)+-------------+-----------------+-----------------+-------------------+-----------------------+-------------------+-----------------------+------------------------+----------------------+--------------------------+---------------------+-------------+-----------------+----------------+-------------------+-----------------------+-------------------+-------------------+------------+|stg_seller_id|stg_prod_category| stg_product_name|stg_product_package|stg_discount_percentage| stg_eff_start_ts|tgt__hoodie_commit_time|tgt__hoodie_commit_seqno|tgt__hoodie_record_key|tgt__hoodie_partition_path|tgt__hoodie_file_name|tgt_seller_id|tgt_prod_category|tgt_product_name|tgt_product_package|tgt_discount_percentage| tgt_eff_start_ts| tgt_eff_end_ts|tgt_actv_ind|+-------------+-----------------+-----------------+-------------------+-----------------------+-------------------+-----------------------+------------------------+----------------------+--------------------------+---------------------+-------------+-----------------+----------------+-------------------+-----------------------+-------------------+-------------------+------------+| 1234| Detergent| Tide 5L| 6| 25|2022-01-31 10:00:30| 20220710113622931| 20220710113622931...| seller_id:1234,pr...| actv_ind=1| 2dd6109f-2173-429...| 1234| Detergent| Tide 2L| 6| 15|2021-12-15 15:20:30|9999-12-31 23:59:59| 1|| 4565| Gourmet|Dairy Milk Almond| 12| 45|2022-06-12 20:30:40| 20220710113622931| 20220710113622931...| seller_id:4565,pr...| actv_ind=1| 2dd6109f-2173-429...| 4565| Gourmet| Dairy Milk Silk| 6| 30|2021-06-12 20:30:40|9999-12-31 23:59:59| 1|+-------------+-----------------+-----------------+-------------------+-----------------------+-------------------+-----------------------+------------------------+----------------------+--------------------------+---------------------+-------------+-----------------+----------------+-------------------+-----------------------+-------------------+-------------------+------------+
推荐阅读
-
都江堰市2023年小升初入学指南 都江堰市小升初招生网
-
-
-
-
-
-
-
国漫天官赐福让原著粉丝飘了,你认为是尬捧还是真的好看?
-
宁夏理工学院2022年分省分专业招生计划(本科+专科)
-
-
-
-
-
北京天马旅行社有限公司_工商信用信息_经营范围期限状态_法人_地址_注册资本_怎么样
-
-
-
-
王者荣耀背景修改,王者荣耀实名制弄错了怎么修改?,
-
冷冻的面包吃的时候应该怎么处理 冷冻的面包吃的时候应该怎么处理呢
-