Iceberg数据湖探索与实践

概念引入–数据湖

  • 数据湖是一种存储数据的方式,用于组织不同数据结构.本质上是一种企业数据架构方法,物理实现上则是基于数据存储平台(例如Hadoop,OSS,S3等存储系统),集中存储企业内海量的、多来源,多种类的数据,并支持对数据进行快速加工和分析.
  • 数据湖的主要思想是对企业中的所有数据进行统一存储,从原始数据转换为用于报告、可视化、分析和机器学习等各种任务的目标数据.
  • 数据湖中的数据包括结构化数据(关系数据库数据),半结构化数据(CSV、XML、JSON等),非结构化数据(电子邮件,文档,PDF)和二进制数据(图像、音频、视频),从而形成一个容纳所有形式数据的集中式数据存储.

Iceberg简介

Iceberg是一种高性能的TableFormat(表格式),定义了数据、元数据的组织方式,支持在Spark、Trino、Flink、Hive及Impala等计算引擎中使用.

Iceberg特性

  1. 真正的流批一体: 上游写入数据后下游立即可查,满足实时场景;Iceberg提供了流批读取和流批写入接口,用户可以在同一个流程同时处理流批数据,使得流批处理可以使用相同的存储模型,简化了ETL思路.
  2. 支持异构计算和存储引擎: 存储上支持常见存储如HDFS以及各种对象存储(不与底层存储强绑定);计算上支持Flink,Spark,Presto,Hive等常见计算引擎.
  3. Schema Evolution(模式演化): 支持无副作用地增(ADD)删(Drop)改(Update)列,改变列顺序(Reorder)以及重命名列(Rename),且代价很低(只涉及元数据操作,不存在数据重新读写操作)(Iceberg使用唯一ID定位列,新增列会分配新的ID,所以列不会错位)
  4. Partition Evolution(分区演化): 在已有的表上改变分区策略时,之前的分区数据不会变且依然采用老的分区策略,新数据会采用新的分区策略.在Iceberg元数据里,两个分区策略相互独立.比如以前有个天分区表,现在业务需要小时分区,按Hive数仓的处理方式需要重新建表,但Iceberg表直接在原表上更改分区布局即可.
    alt
  5. 支持隐藏分区: Iceberg的分区信息不需要人工维护,可以被隐藏起来.与Hive指定分区字段的方式不同,Iceberg的分区字段(分区策略)支持通过某字段计算出来,在建表或者修改分区策略之后, 新的数据会自动计算所属于的分区,查询时Iceberg会自动过滤不需要扫描的数据,避免了因用户SQL未指定分区过滤条件而导致的性能问题,让用户更专注业务逻辑而无需考虑分区字段过滤问题.(Iceberg分区信息和数据存储目录是相互独立开的,使得Iceberg表分区可以被修改,而且不涉及数据迁移;分区信息不存在HMS,减轻了HMS的压力)
  6. 分区演化和隐藏分区使得业务可以方便地调整分区策略.
  7. Time Travel: 可以查询历史某一时间点snapshot的数据,支持回滚到历史snapshot.
  8. 支持事务(ACID): Iceberg提供了边读边写的能力,上游数据写入即可见,通过事务,保证了下游组件只能消费已经commit的数据,无法读到未提交的数据.支持添加删除更新数据.
  9. 支持基于乐观锁的并发写: Iceberg基于乐观锁提供了多个程序并发写入的能力并且保证数据线性一致.(乐观创建metadata文件,提交更新会触发metadata原子交换,完成提交)
  10. 文件级数据剪裁: Iceberg通过元数据来对查询进行高效过滤,Iceberg的元数据里面提供了每个数据文件的一些统计信息, 比如最大值, 最小值, Count计数等等. 因此, 查询SQL的过滤条件除了常规的分区, 列过滤, 甚至可以下推到文件级别, 大大加快了查询效率.
  11. 支持多种底层存储格式如Parquet、Avro以及ORC等.
  12. 支持Upsert能力,且更新即可见,但不能过于频繁,若Upsert过于频繁,则需要频繁数据合并

Iceberg原理

Iceberg元数据

alt
1.DataFiles数据文件 存放真实数据文件,由一个或多个ManifestFile跟踪

2.MetadataFile文件 “*.metadata.json”文件,Iceberg表某时刻的状态,里面记录了表Schema,分区配置,表参数,snapshot记录以及这个时刻涉及到的所有的ManifestList.

3.ManifestList清单列表 “snap-*.avro”文件,存储了构建快照的所有ManifestFile列表,每个ManifestFile在里面占一行,每行存储了ManifestFile路径,分区范围,增删文件信息,来为查询时提供过滤能力,提高性能.一个快照对应一个ManifestList文件.

4.ManifestFile清单文件 非snap开头的avro格式文件,包含了DataFiles列表,每行包含一个数据文件的详细描述(状态,路径,分区信息,列级别的统计信息,最大值最小值空值个数,文件大小,行数等),为查询时提供过滤能力,提高性能.

HadoopCatalog与HiveCatalog表的目录结构
alt
差异:
1.HadoopCatalog表MetadataFile命名为*.metadata.json,与HiveCatalog表ManifestList命名规范不同
2.HadoopCatalog表通过version-hint.text记录最新快照ID,HiveCatalog通过HiveMetaStore记录最新metadata_location.
3.HadoopCatalog与HiveCatalog表元数据不互通,无法互相转换

HadoopCatalog表元数据解析

# 查看avro文件内容
wget https://repo1.maven.org/maven2/org/apache/avro/avro-tools/1.11.1/avro-tools-1.11.1.jar
java -jar avro-tools-1.11.1.jar tojson xxx.avro

MetadataFile文件 v3.metadata.json

{
  "format-version" : 1,
  "table-uuid" : "eeffbc08-9156-4a6f-8380-6138c6b67889",
  "location" : "hdfs://shmily:8020/user/iceberg/warehouse/iceberg_db/hadoop_iceberg_partitioned_table",
  "last-updated-ms" : 1667357677633,
  "last-column-id" : 4,
  "schema" : {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "id",
      "required" : false,
      "type" : "long"
    }, {
      "id" : 2,
      "name" : "name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 3,
      "name" : "age",
      "required" : false,
      "type" : "int"
    }, {
      "id" : 4,
      "name" : "dt",
      "required" : false,
      "type" : "string"
    } ]
  },
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "id",
      "required" : false,
      "type" : "long"
    }, {
      "id" : 2,
      "name" : "name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 3,
      "name" : "age",
      "required" : false,
      "type" : "int"
    }, {
      "id" : 4,
      "name" : "dt",
      "required" : false,
      "type" : "string"
    } ]
  } ],
  "partition-spec" : [ {
    "name" : "dt",
    "transform" : "identity",
    "source-id" : 4,
    "field-id" : 1000
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ {
      "name" : "dt",
      "transform" : "identity",
      "source-id" : 4,
      "field-id" : 1000
    } ]
  } ],
  "last-partition-id" : 1000,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "EXTERNAL" : "TRUE",
    "write.metadata.previous-versions-max" : "5",
    "bucketing_version" : "2",
    "write.metadata.delete-after-commit.enabled" : "true",
    "write.distribution-mode" : "hash",
    "storage_handler" : "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler"
  },
  "current-snapshot-id" : 1244418053907939374,
  "refs" : {
    "main" : {
      "snapshot-id" : 1244418053907939374,
      "type" : "branch"
    }
  },
  "snapshots" : [ {
    "snapshot-id" : 7688152750730458585,
    "timestamp-ms" : 1667357628763,
    "summary" : {
      "operation" : "append",
      "added-data-files" : "1",
      "added-records" : "2",
      "added-files-size" : "1272",
      "changed-partition-count" : "1",
      "total-records" : "2",
      "total-files-size" : "1272",
      "total-data-files" : "1",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://shmily:8020/user/iceberg/warehouse/iceberg_db/hadoop_iceberg_partitioned_table/metadata/snap-7688152750730458585-1-f0e6c6ca-51a7-42e6-b412-4036e27c7d98.avro",
    "schema-id" : 0
  }, {
    "snapshot-id" : 1244418053907939374,
    "parent-snapshot-id" : 7688152750730458585,
    "timestamp-ms" : 1667357677633,
    "summary" : {
      "operation" : "append",
      "added-data-files" : "1",
      "added-records" : "1",
      "added-files-size" : "1166",
      "changed-partition-count" : "1",
      "total-records" : "3",
      "total-files-size" : "2438",
      "total-data-files" : "2",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://shmily:8020/user/iceberg/warehouse/iceberg_db/hadoop_iceberg_partitioned_table/metadata/snap-1244418053907939374-1-44671db7-02ca-47c1-a229-c7f62d8aa12f.avro",
    "schema-id" : 0
  } ],
  "snapshot-log" : [ {
    "timestamp-ms" : 1667357628763,
    "snapshot-id" : 7688152750730458585
  }, {
    "timestamp-ms" : 1667357677633,
    "snapshot-id" : 1244418053907939374
  } ],
  "metadata-log" : [ {
    "timestamp-ms" : 1667357528190,
    "metadata-file" : "hdfs://shmily:8020/user/iceberg/warehouse/iceberg_db/hadoop_iceberg_partitioned_table/metadata/v1.metadata.json"
  }, {
    "timestamp-ms" : 1667357628763,
    "metadata-file" : "hdfs://shmily:8020/user/iceberg/warehouse/iceberg_db/hadoop_iceberg_partitioned_table/metadata/v2.metadata.json"
  } ]
}

ManifestList清单列表文件 snap-7688152750730458585-1-f0e6c6ca-51a7-42e6-b412-4036e27c7d98.avro

{"manifest_path":"hdfs://shmily:8020/user/iceberg/warehouse/iceberg_db/hadoop_iceberg_partitioned_table/metadata/f0e6c6ca-51a7-42e6-b412-4036e27c7d98-m0.avro","manifest_length":6189,"partition_spec_id":0,"added_snapshot_id":{"long":7688152750730458585},"added_data_files_count":{"int":1},"existing_data_files_count":{"int":0},"deleted_data_files_count":{"int":0},"partitions":{"array":[{"contains_null":false,"contains_nan":{"boolean":false},"lower_bound":{"bytes":"20221011"},"upper_bound":{"bytes":"20221011"}}]},"added_rows_count":{"long":2},"existing_rows_count":{"long":0},"deleted_rows_count":{"long":0}}

ManifestFile清单文件 f0e6c6ca-51a7-42e6-b412-4036e27c7d98-m0.avro

{"status":1,"snapshot_id":{"long":7688152750730458585},"data_file":{"file_path":"hdfs://shmily:8020/user/iceberg/warehouse/iceberg_db/hadoop_iceberg_partitioned_table/data/dt=20221011/00000-0-hive_20221102105315_5bb17fc0-3092-4bed-8839-253f19117b6d-job_1667357081446_0001-00001.parquet","file_format":"PARQUET","partition":{"dt":{"string":"20221011"}},"record_count":2,"file_size_in_bytes":1272,"block_size_in_bytes":67108864,"column_sizes":{"array":[{"key":1,"value":55},{"key":2,"value":59},{"key":3,"value":93},{"key":4,"value":101}]},"value_counts":{"array":[{"key":1,"value":2},{"key":2,"value":2},{"key":3,"value":2},{"key":4,"value":2}]},"null_value_counts":{"array":[{"key":1,"value":0},{"key":2,"value":0},{"key":3,"value":0},{"key":4,"value":0}]},"nan_value_counts":{"array":[]},"lower_bounds":{"array":[{"key":1,"value":"\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"},{"key":2,"value":"abc"},{"key":3,"value":"\u0018\u0000\u0000\u0000"},{"key":4,"value":"20221011"}]},"upper_bounds":{"array":[{"key":1,"value":"\u0002\u0000\u0000\u0000\u0000\u0000\u0000\u0000"},{"key":2,"value":"qjj"},{"key":3,"value":"\u0018\u0000\u0000\u0000"},{"key":4,"value":"20221011"}]},"key_metadata":null,"split_offsets":{"array":[4]},"sort_order_id":{"int":0}}}

HiveCatalog表元数据解析
MetadataFile文件 00001-66c5832f-9d6d-4674-9a52-2aa6b8e29991.metadata.json

{
  "format-version" : 1,
  "table-uuid" : "5397c8ee-2b24-4eea-83ae-55e024ccd2c0",
  "location" : "hdfs://shmily:8020/user/hive/warehouse/iceberg_db.db/hive_iceberg_partitioned_table",
  "last-updated-ms" : 1665470339331,
  "last-column-id" : 4,
  "schema" : {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "id",
      "required" : false,
      "type" : "long"
    }, {
      "id" : 2,
      "name" : "name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 3,
      "name" : "age",
      "required" : false,
      "type" : "int"
    }, {
      "id" : 4,
      "name" : "dt",
      "required" : false,
      "type" : "string"
    } ]
  },
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "id",
      "required" : false,
      "type" : "long"
    }, {
      "id" : 2,
      "name" : "name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 3,
      "name" : "age",
      "required" : false,
      "type" : "int"
    }, {
      "id" : 4,
      "name" : "dt",
      "required" : false,
      "type" : "string"
    } ]
  } ],
  "partition-spec" : [ {
    "name" : "dt",
    "transform" : "identity",
    "source-id" : 4,
    "field-id" : 1000
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ {
      "name" : "dt",
      "transform" : "identity",
      "source-id" : 4,
      "field-id" : 1000
    } ]
  } ],
  "last-partition-id" : 1000,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "engine.hive.enabled" : "true",
    "write.metadata.previous-versions-max" : "5",
    "bucketing_version" : "2",
    "write.metadata.delete-after-commit.enabled" : "true",
    "write.distribution-mode" : "hash",
    "storage_handler" : "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler"
  },
  "current-snapshot-id" : 6283861985931247372,
  "refs" : {
    "main" : {
      "snapshot-id" : 6283861985931247372,
      "type" : "branch"
    }
  },
  "snapshots" : [ {
    "snapshot-id" : 6283861985931247372,
    "timestamp-ms" : 1665470339331,
    "summary" : {
      "operation" : "append",
      "added-data-files" : "2",
      "added-records" : "3",
      "added-files-size" : "2438",
      "changed-partition-count" : "2",
      "total-records" : "3",
      "total-files-size" : "2438",
      "total-data-files" : "2",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://shmily:8020/user/hive/warehouse/iceberg_db.db/hive_iceberg_partitioned_table/metadata/snap-6283861985931247372-1-825f6beb-3be7-485c-b338-8dec6068be94.avro",
    "schema-id" : 0
  } ],
  "snapshot-log" : [ {
    "timestamp-ms" : 1665470339331,
    "snapshot-id" : 6283861985931247372
  } ],
  "metadata-log" : [ {
    "timestamp-ms" : 1665470256700,
    "metadata-file" : "hdfs://shmily:8020/user/hive/warehouse/iceberg_db.db/hive_iceberg_partitioned_table/metadata/00000-d4b0dc94-f59f-4968-950c-31d22c2aab0d.metadata.json"
  } ]
}

ManifestList清单列表文件 snap-6283861985931247372-1-825f6beb-3be7-485c-b338-8dec6068be94.avro

{"manifest_path":"hdfs://shmily:8020/user/hive/warehouse/iceberg_db.db/hive_iceberg_partitioned_table/metadata/825f6beb-3be7-485c-b338-8dec6068be94-m0.avro","manifest_length":6234,"partition_spec_id":0,"added_snapshot_id":{"long":6283861985931247372},"added_data_files_count":{"int":2},"existing_data_files_count":{"int":0},"deleted_data_files_count":{"int":0},"partitions":{"array":[{"contains_null":false,"contains_nan":{"boolean":false},"lower_bound":{"bytes":"20221010"},"upper_bound":{"bytes":"20221011"}}]},"added_rows_count":{"long":3},"existing_rows_count":{"long":0},"deleted_rows_count":{"long":0}}

ManifestFile清单文件 825f6beb-3be7-485c-b338-8dec6068be94-m0.avro

{"status":1,"snapshot_id":{"long":6283861985931247372},"data_file":{"file_path":"hdfs://shmily:8020/user/hive/warehouse/iceberg_db.db/hive_iceberg_partitioned_table/data/dt=20221011/00000-0-shmily_20221011143858_89f7e99f-7227-4b19-9a44-b6807cf3b718-job_local1764035342_0002-00001.parquet","file_format":"PARQUET","partition":{"dt":{"string":"20221011"}},"record_count":2,"file_size_in_bytes":1272,"block_size_in_bytes":67108864,"column_sizes":{"array":[{"key":1,"value":55},{"key":2,"value":59},{"key":3,"value":93},{"key":4,"value":101}]},"value_counts":{"array":[{"key":1,"value":2},{"key":2,"value":2},{"key":3,"value":2},{"key":4,"value":2}]},"null_value_counts":{"array":[{"key":1,"value":0},{"key":2,"value":0},{"key":3,"value":0},{"key":4,"value":0}]},"nan_value_counts":{"array":[]},"lower_bounds":{"array":[{"key":1,"value":"\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"},{"key":2,"value":"abc"},{"key":3,"value":"\u0018\u0000\u0000\u0000"},{"key":4,"value":"20221011"}]},"upper_bounds":{"array":[{"key":1,"value":"\u0002\u0000\u0000\u0000\u0000\u0000\u0000\u0000"},{"key":2,"value":"qjj"},{"key":3,"value":"\u0018\u0000\u0000\u0000"},{"key":4,"value":"20221011"}]},"key_metadata":null,"split_offsets":{"array":[4]},"sort_order_id":{"int":0}}}
{"status":1,"snapshot_id":{"long":6283861985931247372},"data_file":{"file_path":"hdfs://shmily:8020/user/hive/warehouse/iceberg_db.db/hive_iceberg_partitioned_table/data/dt=20221010/00000-0-shmily_20221011143858_89f7e99f-7227-4b19-9a44-b6807cf3b718-job_local1764035342_0002-00002.parquet","file_format":"PARQUET","partition":{"dt":{"string":"20221010"}},"record_count":1,"file_size_in_bytes":1166,"block_size_in_bytes":67108864,"column_sizes":{"array":[{"key":1,"value":51},{"key":2,"value":53},{"key":3,"value":51},{"key":4,"value":59}]},"value_counts":{"array":[{"key":1,"value":1},{"key":2,"value":1},{"key":3,"value":1},{"key":4,"value":1}]},"null_value_counts":{"array":[{"key":1,"value":0},{"key":2,"value":0},{"key":3,"value":0},{"key":4,"value":0}]},"nan_value_counts":{"array":[]},"lower_bounds":{"array":[{"key":1,"value":"\u0003\u0000\u0000\u0000\u0000\u0000\u0000\u0000"},{"key":2,"value":"abc"},{"key":3,"value":"\u0018\u0000\u0000\u0000"},{"key":4,"value":"20221010"}]},"upper_bounds":{"array":[{"key":1,"value":"\u0003\u0000\u0000\u0000\u0000\u0000\u0000\u0000"},{"key":2,"value":"abc"},{"key":3,"value":"\u0018\u0000\u0000\u0000"},{"key":4,"value":"20221010"}]},"key_metadata":null,"split_offsets":{"array":[4]},"sort_order_id":{"int":0}}}

Iceberg表类型

当Iceberg添加了新特性但该新特性破坏了向前兼容性时,表的version会增加,以保证旧的表版本仍然可以兼容.
Iceberg当前有V1和V2两种表类型,建表时由property-version指定.
Version 1: Analytic Data Tables 🔗 基于不可变文件格式管理的大型分析表
V1表可以按分区删除数据(如在Trino中delete from iceberg_table where ds=’2022120114’;),删除并不会真正删除数据,而是commit新的元数据新的快照,只要旧快照未过期,仍然可以回滚到删除前的状态,但一旦快照过期,数据文件会被删除无法还原;V1表不支持行级删除(会报错failed: Iceberg table updates require at least format version 2)

Version 2: Row-level Deletes 🔗 较Version 1添加了行级更新\删除能力;添加了Delete files以对现有数据文件中删除的行进行编码。Version2可实现删除或替换不可变数据文件中的单个行,而无需重写文件。

Iceberg表数据类型

数据类型 介绍 要求
int 32位有符号整形 可转为long
long 64位有符号整形
float 单精度浮点型 可转为double
double 双精度浮点型
decimal(P,S) 固定小数点类型数值 精度P,决定总位数;比例S,决定小数位数;P必须小于等于38
date 日期,不含时间和时区
time 时间,不含日期和时区 以微妙存储
timestamp 不含时区的时间戳 以微妙存储
timestamptz 含时区的时间戳 以微妙存储
string 字符串,任意长度 Encoded with UTF-8
fixed(L) 固定长度为L的字节数组
binary 任意长度字节数组
struct<…> 任意数据类型组成的结构体
list 任意数据类型组成的List
map<K,V> 任意数据类型组成的键值对 行存储类型,存储和检索时扫描数据量较大

Iceberg集成

Iceberg与Hive集成

添加iceberg-hive-runtime-0.14.1.jar,libfb303-0.9.3.jar两个jar到$HIVE_HOME/auxlib下
添加iceberg.engine.hive.enabled=true参数到hive-site.xml

Hive创建Iceberg表
(Hive操作Iceberg支持多种Catalog,支持Hadoop、Hive(默认)、location_based_table、Custom几种管理方式,其中前三种是开箱即用的)

1.HiveCatalog类型(表元数据信息使用HiveMetaStore来管理,依赖Hive):

-- 不设置Catalog类型时默认会使用HiveCatalog类型的Iceberg表
-- 示例1 非分区表
CREATE TABLE iceberg_db.hive_iceberg_table (
  id BIGINT,
  name STRING
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION '/user/hive/warehouse/iceberg_db.db/hive_iceberg_table'
TBLPROPERTIES (
 'write.distribution-mode'='hash',  -- 数据写入参数,设置为hash表示按key哈希,每一个Partition数据最多由一个Task来写入,减少小文件
 'write.metadata.delete-after-commit.enabled'='true',   -- (每次提交后是否删除旧元数据文件) 自动清理旧元数据 metadata.json 不能清理manifest和snapshot的avro文件
 'write.metadata.previous-versions-max'='5'  -- 保留的metadata.json数量
);
-- 示例2 分区表
CREATE TABLE iceberg_db.hive_iceberg_partitioned_table (
  id BIGINT,
  name STRING,
  age int
) partitioned by (dt string)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
TBLPROPERTIES (
 'write.distribution-mode'='hash',
 'write.metadata.delete-after-commit.enabled'='true',
 'write.metadata.previous-versions-max'='5',
 'format-version'='2',
 'engine.hive.enabled'='true', 
 'write.target-file-size-bytes'='268435456',
 'write.format.default'='parquet',
 'write.parquet.compression-codec'='zstd',
 'write.parquet.compression-level'='10',
 'write.avro.compression-codec'='zstd',
 'write.avro.compression-level'='10'
);
-- 示例3 手动指定catalog名称,指定catalog类型为HiveCatalog类型并建表:
set iceberg.catalog.<catalog_name>.type=hive;  -- 设置catalog类型
CREATE TABLE iceberg_db.hive_iceberg_partitioned_table (
  id BIGINT,
  name STRING,
  age int
) partitioned by (dt string)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
TBLPROPERTIES (
'iceberg.catalog'='<catalog_name>',
 'write.distribution-mode'='hash',
 'write.metadata.delete-after-commit.enabled'='true',
 'write.metadata.previous-versions-max'='5'
);

HiveCatalog表在HMS中保存了很多Table Parameters信息,如current-schema,current-snapshot-xx,default-partition-spec,metadata_location,previous_metadata_location,snapshot-count等信息.

HiveCatalog表在Hive下存在的问题: 在Kerberos认证的HMS环境下,Hive客户端可以建表和查询,但无法inssert数据;可以使用beeline+hiveserver2进行Iceberg表的insert操作.
适用场景: HiveCatalog在兼容性方面有天然的优势,几乎大部分常见计算引擎都支持HiveCatalog,而其他类型的Catalog则有不被计算引擎支持的可能.尤其是如果使用Iceberg自定义Catalog,则需要为每个试用Iceberg的引擎做一定的开发工作以兼容自定义Catalog.

2.HadoopCatalog类型(元数据信息使用底层外部存储来管理)

set iceberg.catalog.<catalog_name>.type=hadoop;  -- 必须每次设置catalog类型
set iceberg.catalog.<catalog_name>.warehouse=hdfs://nameservice/user/iceberg/warehouse;  -- 必须每次设置warehouse存储路径
create external table iceberg_db.hadoop_iceberg_partitioned_table (
  id BIGINT,
  name STRING,
  age int
) partitioned by (dt string)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://nameservice/user/iceberg/warehouse/iceberg_db/hadoop_iceberg_partitioned_table'  -- 路径必须是${iceberg.catalog.<catalog_name>.warehous}/${db_name}/${table_name}
tblproperties (
    'iceberg.catalog'='<catalog_name>',
    'write.distribution-mode'='hash',
    'write.metadata.delete-after-commit.enabled'='true',
    'write.metadata.previous-versions-max'='5'
);

3.LocationBasedTable(外部存储中已经存在HadoopCatalog类型Iceberg表的数据,将其映射到Hive表)
HDFS已经存在了Iceberg格式表的数据,我们可以指定tblproperties(‘iceberg.catalog’=’location_based_table’)和LOCATION,它会去指定的LOCATION路径下加载iceberg表数据.前提LOCATION下已经存在Iceberg格式表数据了.
建表时不需要加PARTITION BY,只需要加字段即可.

create external table iceberg_db.location_iceberg_partitioned_table (
  id BIGINT,
  name STRING,
  age INT,
  dt STRING
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://nameservice/user/iceberg/warehouse/iceberg_db/location_iceberg_partitioned_table'
tblproperties ('iceberg.catalog'='location_based_table');
或
create table iceberg_db.location_iceberg_partitioned_table (
  id BIGINT,
  name STRING,
  age INT,
  dt STRING
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://nameservice/user/iceberg/warehouse/iceberg_db/location_iceberg_partitioned_table'
tblproperties ('iceberg.catalog'='location_based_table');

推荐场景: 外部计算引擎均支持HadoopCatalog类型Iceberg表的情况下,比如Flink、Spark等引擎写入的数据,可以使用这种方式创建Hive表来打通Hive.
不推荐场景: 需要使用Trino分析该表.(因为Trino当前不支持HadoopCatalog类型Iceberg表)
注意: 外部存储上的Iceberg表,Catalog必须是HadoopCatalog类型的,否则无法读取数据。如果是其他Catalog类型,表创建时会报错File does not exist: /table_path…/metadata/version-hint.text,表能创建成功,但查询结果为空。

4.CustomCatalog自定义Catalog,通过Iceberg提供的API定制Catalog,使Iceberg能更加灵活地使用各类元数据管理方案.
适用场景: 需要与Hive Hadoop等解耦的场景,以及需要灵活管理元数据的场景.在元数据管理和兼容计算引擎方面需要一定的开发工作量.

Iceberg与Flink集成

Flink 1.14则下载iceberg-flink-runtime-1.14-0.14.1.jar 放入$FLINK_HOME/lib目录下

  1. Flink DataStreamAPI集成Iceberg
    写了几个案例:
    Kafka数据通过Flink Datastream API写入Iceberg:
    KafkaSinkHadoopCatalogIcebergTable
    KafkaSinkHiveCatalogIcebergTable
    通过Flink Datastream API读取Iceberg:
    HadoopCatalogIcebergTableSource
    HiveCatalogIcebergTableSource

  2. Flink SQL集成Iceberg

打通Kafka->Flink SQL->HadoopCatalog类型Iceberg表->Hive

-- 启动flink集群:cd $FLINK_HOME ; bin/start-cluster.sh
-- 启动FlinkSQL Console:bin/sql-client.sh embedded shell
set execution.checkpointing.interval=10sec; -- 必须设置checkpoint  靠checkpoint提交更新数据到Iceberg
SET execution.runtime-mode = streaming;  -- 流式写
CREATE TABLE t_kafka_source (
    id BIGINT,
    name STRING,
    age INT,
    dt STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'flink_topic1',  
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = 'cdh101:9092,cdh102:9092,cdh103:9092,cdh104:9092',
    'properties.group.id' = 'test',
    'format' = 'csv'
);
-- 1.写入Iceberg表[HadoopCatalog类型]
CREATE CATALOG hadoop_iceberg_catalog WITH (
  'type'='iceberg',  -- 创建HadoopCatalog类型Iceberg表在FlinkSQL中的Catalog
  'catalog-type'='hadoop',
  'warehouse'='hdfs://nameservice/user/iceberg/warehouse',
  'property-version'='1'
);
CREATE TABLE if not exists `hadoop_iceberg_catalog`.`iceberg_db`.`hadoop_iceberg_table_flink_sql` (
   id BIGINT,
   name STRING,
   age INT,
   dt STRING
) PARTITIONED BY (dt)
WITH('type'='ICEBERG',
'engine.hive.enabled'='true',  -- 支持hive查询(实测发现不加也没影响)
'read.split.target-size'='1073741824', -- 减少split数提升查询效率
'write.target-file-size-bytes'='134217728',
'write.format.default'='parquet',
'write.metadata.delete-after-commit.enabled'='true',
'write.metadata.previous-versions-max'='9',  
'write.distribution-mode'='hash'); 
insert into hadoop_iceberg_catalog.iceberg_db.hadoop_iceberg_table_flink_sql select id,name,age,dt from t_kafka_source;
-- 2.FlinkSQL批式查询
SET execution.runtime-mode = batch;
select id,name,age,dt from `hadoop_iceberg_catalog`.`iceberg_db`.`hadoop_iceberg_table_flink_sql`;
-- 3.FlinkSQL流式查询 
---- 注:不指定start-snapshot-id则会逐渐回溯全量数据
---- 指定了start-snapshot-id后,会从该snapshot的数据开始消费
---- 重启Flink应用时,若不指定上次关闭时的checkpoint或savepoint,则每次重启Flink应用都会从start-snapshot-id指定snapshot开始消费,导致重复消费历史数据
---- 重启Flink应用时,若指定了上次关闭时的checkpoint或savepoint,则会从上次消费的位点继续消费
select id,name,age,dt from `hadoop_iceberg_catalog`.`iceberg_db`.`hadoop_iceberg_table_flink_sql` /*+ OPTIONS('streaming'='true', 'monitor-interval'='5s', 'start-snapshot-id'='3821550127947089987')*/ ;
-- 4.在Hive中创建Iceberg映射表[只针对HadoopCatalog类型表]
create external table iceberg_db.hadoop_iceberg_table_flink_sql (
  id BIGINT,
  name STRING,
  age INT,
  dt STRING
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://nameservice/user/iceberg/warehouse/iceberg_db/hadoop_iceberg_table_flink_sql'
tblproperties ('iceberg.catalog'='location_based_table');
-- 5.HiveSQL查询(能查到实时最新数据)
select * from iceberg_db.hadoop_iceberg_table_flink_sql; 
-- 6.FlinkSQL Upsert更新 基于累积窗口统计逻辑 (注意upsert次数过多会导致查询性能很差,如果频繁upsert,则需要频繁做compact来保证查询性能,可根据需要设置只保留1-3个snapshot)
CREATE TABLE if not exists `hive_iceberg_catalog`.`iceberg_db`.`summary_iceberg_table` (
  actionid STRING,
  userid STRING,
  `success_cnt` bigint,
  `failed_cnt` bigint,
  window_start TIMESTAMP(3) NOT NULL,
  window_end TIMESTAMP(3) NOT NULL,
  ds STRING,
  PRIMARY KEY(`actionid`,`userid`,`ds`) NOT ENFORCED  -- 必须设置主键 根据主键upsert
) PARTITIONED BY (ds)
WITH('type'='ICEBERG',
'format-version'='2',   -- 必须是v2表
'write.upsert.enabled'='true',  -- 指定该参数 使表可upsert
'engine.hive.enabled'='true',  
'read.split.target-size'='536870912',
'write.target-file-size-bytes'='268435456',
'write.format.default'='parquet',
'write.parquet.compression-codec'='zstd',
'write.parquet.compression-level'='10',
'write.avro.compression-codec'='zstd',
'write.avro.compression-level'='10',
'write.metadata.delete-after-commit.enabled'='true',
'write.metadata.previous-versions-max'='5',  
'write.distribution-mode'='hash'); 
INSERT INTO `hive_iceberg_catalog`.`iceberg_db`.`summary_iceberg_table` /*+ OPTIONS('upsert-enabled'='true') */   -- 需要指定'upsert-enabled'='true'
SELECT 
actionid,
userid,
sum(cast(if(`error` = 'ok', 1, 0) as BIGINT)) AS success_cnt,
sum(cast(if(`error` <> 'ok', 1, 0) as BIGINT)) AS failed_cnt,
window_start, 
window_end,
DATE_FORMAT(window_start, 'yyyyMMdd') AS ds
FROM 
    TABLE(
    CUMULATE(
      TABLE kafka_table, 
      DESCRIPTOR(event_time), 
      INTERVAL '1' MINUTES, 
      INTERVAL '1' DAY
      )
    )
  GROUP BY 
    window_start, 
    window_end,
    actionid,
    userid;
-- 7. FlinkSQL Upsert更新 基于累积窗口计算TopN逻辑 (注意upsert次数过多会导致查询性能很差,如果频繁upsert,则需要频繁做compact来保证查询性能,可根据需要设置只保留1-3个snapshot)
CREATE TABLE if not exists `hive_iceberg_catalog`.`iceberg_db`.`top_iceberg_table` (
  actionid STRING,
  success_cnt bigint,
  failed_cnt bigint,
  ranking_num bigint,
  window_start TIMESTAMP(3) NOT NULL,
  window_end TIMESTAMP(3) NOT NULL,
  ds STRING,
  PRIMARY KEY(`actionid`,`ds`) NOT ENFORCED  -- 必须设置主键 根据主键upsert
) PARTITIONED BY (ds)
WITH('type'='ICEBERG',
'format-version'='2',    -- 必须是v2表
'write.upsert.enabled'='true',  -- 指定该参数 使表可upsert
'engine.hive.enabled'='true',  
'read.split.target-size'='536870912',
'write.target-file-size-bytes'='268435456',
'write.format.default'='parquet',
'write.parquet.compression-codec'='zstd',
'write.parquet.compression-level'='10',
'write.avro.compression-codec'='zstd',
'write.avro.compression-level'='10',
'write.metadata.delete-after-commit.enabled'='true',
'write.metadata.previous-versions-max'='5',  
'write.distribution-mode'='hash');
INSERT INTO `hive_iceberg_catalog`.`iceberg_db`.`user_experience_topn_action_report` /*+ OPTIONS('upsert-enabled'='true') */  -- 需要指定'upsert-enabled'='true'
SELECT * from (
    SELECT 
      actionid, 
      success_cnt, 
      failed_cnt, 
      ROW_NUMBER() OVER (
        PARTITION BY window_start, 
        window_end 
        ORDER BY 
          failed_cnt asc
      ) AS rn, 
      window_start, 
      window_end, 
      ds 
    from (
        select 
          actionid, 
          sum(cast(if(`error` = 'ok', 1, 0) as BIGINT)) AS success_cnt, 
          sum(cast(if(`error` <> 'ok', 1, 0) as BIGINT)) AS failed_cnt, 
          window_start, 
          window_end, 
          DATE_FORMAT(window_end, 'yyyyMMdd') AS ds 
        FROM 
          TABLE(
            CUMULATE(
              TABLE kafka_shoulei_odl_odl_xlpan_server_log, 
              DESCRIPTOR(event_time), 
              INTERVAL '1' MINUTES, 
              INTERVAL '1' DAY
            )
          ) 
        GROUP BY 
          window_start, 
          window_end, 
          actionid
      ) t_inner
  ) t_outer 
where 
  rn <= 100;

打通Kafka->Flink SQL->HiveCatalog类型Iceberg表->Hive/Trino

-- 写入Iceberg表[HiveCatalog类型]
-- 启动flink集群:cd $FLINK_HOME ; bin/start-cluster.sh
-- 启动FlinkSQL Console:bin/sql-client.sh embedded -j iceberg-flink-runtime-1.13-0.14.0.jar -j  /opt/cloudera/parcels/CDH/jars/hive-metastore-2.1.1-cdh6.3.1.jar -j /opt/cloudera/parcels/CDH/jars/libthrift-0.9.3.jar -j /opt/cloudera/parcels/CDH/jars/hive-common-2.1.1-cdh6.3.1.jar -j /opt/cloudera/parcels/CDH/jars/hive-serde-2.1.1-cdh6.3.1.jar -j /opt/cloudera/parcels/CDH/jars/libfb303-0.9.3.jar -j /opt/cloudera/parcels/CDH/jars/hive-shims-common-2.1.1-cdh6.3.1.jar shell
set execution.checkpointing.interval=10sec; -- 必须设置checkpoint  靠checkpoint提交更新数据到Iceberg
SET execution.runtime-mode = streaming;  -- 流式写
CREATE TABLE t_kafka_source (
    id BIGINT,
    name STRING,
    age INT,
    dt STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'flink_topic1',  
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = 'cdh101:9092,cdh102:9092,cdh103:9092,cdh104:9092',
    'properties.group.id' = 'test',
    'format' = 'csv'
);
CREATE CATALOG hive_iceberg_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://cdh101:9083,thrift://cdh103:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://nameservice/user/iceberg/warehouse',
  'hive-conf-dir'='/etc/ecm/hive-conf'   -- 如果hive是kerberos认证的,必须要加hive-conf-dir参数,非kerberos集群可忽略
);
CREATE TABLE if not exists `hive_iceberg_catalog`.`iceberg_db`.`hive_iceberg_table_flink_sql` (
   id BIGINT,
   name STRING,
   age INT,
   dt STRING
) PARTITIONED BY (dt)
WITH('type'='ICEBERG',
'engine.hive.enabled'='true',
'read.split.target-size'='1073741824',
'write.target-file-size-bytes'='536870912',
'write.format.default'='parquet',
'write.parquet.compression-codec'='zstd',
'write.parquet.compression-level'='10',
'write.avro.compression-codec'='zstd',
'write.avro.compression-level'='10',
'write.metadata.delete-after-commit.enabled'='true',
'write.metadata.previous-versions-max'='5',
'write.distribution-mode'='none');
insert into hive_iceberg_catalog.iceberg_db.hive_iceberg_table_flink_sql select id,name,age,dt from t_kafka_source;

写入HiveCatalogIceberg表后,在Hive可以直接看到并查询表iceberg_db.hive_iceberg_table_flink_sql.
也可以先在hive创建表,再Flink写入,均正常.
Trino中也可以直接看到并查询该表.
3. StreamPark集成Iceberg(基于HiveCatalog)
StreamPark是基于Flink SQL的流式计算平台.在StreamPark上可以很方便地开发实时操作Iceberg的Flink任务.
环境: Hadoop 3.2.1 + Hive 3.1.2 + Iceberg 0.14.1 + Flink 1.14.5 + StreamPark 1.2.4 + OSS
FlinkSQL编写:

CREATE CATALOG hive_iceberg_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://thrift-host:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='oss://bucket_name/data/iceberg/warehouse',
  'hive-conf-dir'='/etc/ecm/hive-conf'
);
-- Kafka source table
CREATE TABLE t_kafka_source (
    id BIGINT,
    name STRING,
    age INT,
    dt STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 't_qjj_flink_test',  
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = 'broker1:9092,broker2:9092,broker3:9092',
    'properties.group.id' = 'test',
    'format' = 'csv'
);
-- Iceberg target table
CREATE TABLE IF NOT EXISTS `hive_iceberg_catalog`.`iceberg_db`.`hive_krb_iceberg_table_flink_sql` (
   id BIGINT,
   name STRING,
   age INT,
   dt STRING
) PARTITIONED BY (dt)
WITH('type'='ICEBERG',
'engine.hive.enabled'='true',
'read.split.target-size'='1073741824',
'write.target-file-size-bytes'='536870912',
'write.format.default'='parquet',
'write.parquet.compression-codec'='zstd',
'write.parquet.compression-level'='10',
'write.avro.compression-codec'='zstd',
'write.avro.compression-level'='10',
'write.metadata.delete-after-commit.enabled'='true',
'write.metadata.previous-versions-max'='10',
'write.distribution-mode'='none');
-- Insert data
insert into hive_iceberg_catalog.iceberg_db.hive_krb_iceberg_table_flink_sql select id,name,age,dt from t_kafka_source;

依赖jar:

  <dependency>
    <groupId>org.apache.iceberg</groupId>
    <artifactId>iceberg-flink-runtime-1.14</artifactId>
    <version>0.14.1</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-metastore</artifactId>
    <version>3.1.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.thrift</groupId>
    <artifactId>libthrift</artifactId>
    <version>0.9.3</version>
  </dependency>
  <dependency>
    <groupId>org.apache.thrift</groupId>
    <artifactId>libfb303</artifactId>
    <version>0.9.3</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-common</artifactId>
    <version>3.1.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-serde</artifactId>
    <version>3.1.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hive.shims</groupId>
    <artifactId>hive-shims-common</artifactId>
    <version>3.1.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.14.5</version>
  </dependency>
  <dependency>
    <groupId>commons-cli</groupId>
    <artifactId>commons-cli</artifactId>
    <version>1.3.1</version>
  </dependency>

可能出现的异常:

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.commons.cli.Option.builder(Ljava/lang/String;)Lorg/apache/commons/cli/Option$Builder;
        at org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.<clinit>(CommandLineOptions.java:27)

原因: streamx在下载hive依赖时,下载了它的子依赖,且hive使用的commons-cli与streamx使用的commons-cli版本不一致,导致jar冲突.
解决: 每次build后手动删除hdfs dfs -rm -f hdfs://ns/streamx/workspace/项目ID/lib/commons-cli-1.2.jar

Iceberg与Trino集成

Trino整合Iceberg需要配置$TRINO_HOME/etc/catalog/iceberg.properties内容如下:

connector.name=iceberg
iceberg.file-format=PARQUET
hive.metastore.service.principal=hive/metastore-server-ip@realm-name 
hive.metastore.authentication.type=KERBEROS
hive.metastore.uri=thrift://metastore-server-ip:9083,metastore-server-ip-bk:9083
hive.metastore.client.principal=principal-in-hive-keytab
hive.metastore.client.keytab=/path/to/hive.keytab
hive.config.resources=/etc/ecm/hadoop-conf/core-site.xml, /etc/ecm/hadoop-conf/hdfs-site.xml
iceberg.compression-codec=SNAPPY

若需要其支持外部存储例如oss,则需要将jindo-core-4.3.0.jar和jindo-sdk-4.3.0.jar两个jar拷贝到$TRINO_HOME/plugin/iceberg/和$TRINO_HOME/plugin/hive/以兼容外部存储.

Trino当前仅支持HiveCatalog类型的Iceberg表,不支持HadoopCatalog类型Iceberg表.如果查询的是HadoopCatalog,location_based_table,Custome类型的Iceberg表会报错:Table is missing [metadata_location] property: iceberg_db.iceberg_table

Trino操作Iceberg表常用操作:

1.查看有哪些分区
select * from "iceberg_table$partitions";
2.查看有哪些快照
select * from "iceberg_table$snapshots";
SELECT snapshot_id,committed_at FROM "iceberg_table$snapshots" ORDER BY committed_at;
3.表优化 之 快照过期
ALTER TABLE iceberg_table EXECUTE expire_snapshots(retention_threshold => '7d')
4.表优化 之 文件合并
ALTER TABLE iceberg_table EXECUTE optimize [默认合并小于file_size_threshold的数据文件,file_size_threshold默认100MB]
ALTER TABLE iceberg_table EXECUTE optimize(file_size_threshold => '256MB')
ALTER TABLE iceberg_table EXECUTE optimize WHERE partition_key = 1 [按分区优化]
5.表优化 之 清理孤立无效的文件
ALTER TABLE iceberg_table EXECUTE remove_orphan_files(retention_threshold => '7d')
6.升级表的版本如V1升级到V2
ALTER TABLE iceberg_table SET PROPERTIES format_version = 2;
7.V2表根据条件进行行级删除操作 (V1表不支持行级删除,只支持分区条件删除)
delete from iceberg_table where ds='2022120102' and eventid = 'event_1';
8.修改分区之添加一个分区字段
ALTER TABLE iceberg_table SET PROPERTIES partitioning = ARRAY[<existing partition columns>, 'my_new_partition_column'];
9.修改表和字段注释 在Trino修改后同样会在Hive生效
COMMENT ON TABLE iceberg_table IS 'Table comment';
COMMENT ON COLUMN iceberg_table.name IS 'Column comment';
10.TimeTravel查询 临时查询历史某个快照的数据
SELECT * FROM iceberg.iceberg_db.iceberg_table FOR VERSION AS OF 8954597067493422955;
SELECT * FROM iceberg.iceberg_db.iceberg_table FOR TIMESTAMP AS OF TIMESTAMP '2022-12-02 09:59:29.803 Europe/Vienna';
11.回滚当前表状态到某个历史快照的状态
CALL iceberg.system.rollback_to_snapshot('iceberg_db', 'iceberg_table', 8954597067493422955);
12.查看表的文件和文件修改时间
select "$path", "$file_modified_time" from iceberg_table;
13.查询数据
select * from iceberg_table limit 10;
select * from "iceberg_table$data" limit 10; [等价于上面的SQL]
14.查看表配置参数
select * from "iceberg_table$properties";
15.查看表元数据更改历史记录
select * from "iceberg_table$history";
16.列出表涉及到的manifest file列表
select * from "iceberg_table$manifests";
17.列出表在当前快照(当前状态)下引用的所有数据文件
select * from "iceberg_table$files";
18.创建Trino物化视图 只支持Trino中查询
CREATE OR REPLACE MATERIALIZED VIEW iceberg_view COMMENT 'materializedView' WITH ( format = 'ORC', partitioning = ARRAY['ds'] ) as select appid,ds from iceberg_table;
CREATE MATERIALIZED VIEW IF NOT EXISTS iceberg_view COMMENT 'materializedView' WITH ( format = 'ORC', partitioning = ARRAY['ds'] ) as select appid,ds from iceberg_table;
REFRESH MATERIALIZED VIEW iceberg_view; [底层表数据变化导致物化视图与底层表数据不一致时,使用该命令更新物化视图]
19.如果查询很复杂并且包括连接大型数据集,则在表上运行ANALYZE可以通过收集有关数据的统计信息来提高查询性能
SET SESSION iceberg.experimental_extended_statistics_enabled = true;
ANALYZE iceberg_table;
ANALYZE iceberg_table WITH (columns = ARRAY['col_1', 'col_2']);
ALTER TABLE iceberg_table EXECUTE drop_extended_stats;  [如果需要重新分析表统计信息,则再重新分析前先清除之前统计的信息]
20.创建表(本质也是创建HiveCatalog表,不建议在Trino建Iceberg表,因为Hive引擎无法支持)
CREATE TABLE iceberg_oss_table (
    c1 integer,
    c2 date,
    c3 double
)
WITH (
    format = 'PARQUET',
    partitioning = ARRAY['c1', 'c2'],
    location = 'oss://bucket-name/user/iceberg/warehouse/iceberg_oss_table'
);

Iceberg与Spark集成

下载iceberg-spark-runtime-3.3_2.12-1.0.0.jar到$SPARK_HOME/jars路径
编辑$SPARK_HOME/conf/spark-defaults.conf添加如下内容

spark.sql.catalog.spark_catalog  org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type  hive

Spark创建的Iceberg表打通Hive

-- Spark创建Iceberg表
CREATE TABLE spark_catalog.default.qjj_iceberg_test (
    id bigint COMMENT 'unique id',
    data string
) USING iceberg
LOCATION 'oss://bucket-name/user/hive/warehouse/qjj_iceberg_test';
-- Hive中兼容Spark创建的Iceberg表需要做的操作
ALTER TABLE qjj_iceberg_test SET FILEFORMAT INPUTFORMAT "org.apache.iceberg.mr.hive.HiveIcebergInputFormat" OUTPUTFORMAT "org.apache.iceberg.mr.hive.HiveIcebergOutputFormat" SERDE "org.apache.iceberg.mr.hive.HiveIcebergSerDe";
alter table qjj_iceberg_test set TBLPROPERTIES ('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler');

注: Hive创建的Iceberg表可以直接被Spark读取

Iceberg表管理维护

数据实时写入Iceberg表会频繁发生Commit操作,产生大量元数据文件和数据文件,文件数膨胀和小文件问题会使其性能下降,甚至影响底层存储系统稳定性.目前Iceberg表并不能像Hudi一样自动处理小文件问题,需要一定的手动维护工作.
以下是发生31次commit后,Iceberg表目录树形结构.

hadoop_iceberg_partitioned_table
├── data
│   ├── dt=20221010
│   │   └── 00000-0-hive_20221102105407_0605b24c-e823-4244-a994-83887ea7e430-job_1667357081446_0002-00001.parquet
│   ├── dt=20221011
│   │   └── 00000-0-hive_20221102105315_5bb17fc0-3092-4bed-8839-253f19117b6d-job_1667357081446_0001-00001.parquet
│   └── dt=20221104
│       ├── 00000-0-shmily_20221104104344_a66cc954-b33c-48df-812c-cc59d609ec59-job_1667529535736_0001-00001.parquet
│       ├── 00000-0-shmily_20221104104714_7bc45bcf-b8dc-4730-b9b5-d0c92ae46d5d-job_1667529535736_0002-00001.parquet
│       ├── 00000-0-shmily_20221104104805_4a490286-3499-43fb-affb-4317e07128d1-job_1667529535736_0003-00001.parquet
│       ├── 00000-0-shmily_20221104104851_71127db1-1170-4a38-a3d4-83e296fd3330-job_1667529535736_0004-00001.parquet
│       ├── 00000-0-shmily_20221104105901_1d66566e-af6c-4311-b123-86cfd18e0102-job_1667529535736_0005-00001.parquet
│       ├── 00000-0-shmily_20221104110033_e6e579d7-b08f-48b1-9277-b51b318dec7c-job_1667529535736_0006-00001.parquet
│       ├── 00000-0-shmily_20221104110100_345a4caa-bb75-4ea4-be5c-51a2420c428d-job_1667529535736_0007-00001.parquet
│       ├── 00000-0-shmily_20221104110124_bbb8626e-3d06-48a4-be6a-3ef061be4122-job_1667529535736_0008-00001.parquet
│       ├── 00000-0-shmily_20221104110152_13d0e1a5-6630-4dc2-ae24-ee104cbca3b5-job_1667529535736_0009-00001.parquet
│       ├── 00000-0-shmily_20221104110158_4283a83a-2662-4a89-8311-e8f89fe64603-job_1667529535736_0010-00001.parquet
│       ├── 00000-0-shmily_20221104110218_66253aad-58ca-4121-b454-8383e1ae7aae-job_1667529535736_0011-00001.parquet
│       ├── 00000-0-shmily_20221104110245_c041e352-dacd-4f5e-9fb4-ca321b4b3468-job_1667529535736_0012-00001.parquet
│       ├── 00000-0-shmily_20221104110311_29653ebe-6426-4163-8ace-7b1d305c9253-job_1667529535736_0013-00001.parquet
│       ├── 00000-0-shmily_20221104110336_35708806-e3a6-4746-851a-e8d306812810-job_1667529535736_0014-00001.parquet
│       ├── 00000-0-shmily_20221104110400_d1949e60-6123-49ff-8a85-0281651cf0b2-job_1667529535736_0015-00001.parquet
│       ├── 00000-0-shmily_20221104110425_e0cdc030-7d88-4b5f-8956-c95ffd71e698-job_1667529535736_0016-00001.parquet
│       ├── 00000-0-shmily_20221104110448_e144f00c-60b7-4935-a749-d3d88eba828a-job_1667529535736_0017-00001.parquet
│       ├── 00000-0-shmily_20221104110513_01b3285c-a3c8-490c-bc3c-5dbd696824e8-job_1667529535736_0018-00001.parquet
│       ├── 00000-0-shmily_20221104110538_bbe889dd-8615-4019-b6cc-636d1503dc8c-job_1667529535736_0019-00001.parquet
│       ├── 00000-0-shmily_20221104110602_09da4a19-c2ac-46af-af6c-5d97a3fbdcd9-job_1667529535736_0020-00001.parquet
│       ├── 00000-0-shmily_20221104110627_b1544cff-a0c2-4310-a06a-cd6103e40e9d-job_1667529535736_0021-00001.parquet
│       ├── 00000-0-shmily_20221104110651_a7e21f79-764d-4e62-8174-109dc4f1e7e2-job_1667529535736_0022-00001.parquet
│       ├── 00000-0-shmily_20221104110716_a657cb8b-45be-4484-9a21-bc2814e0c6b1-job_1667529535736_0023-00001.parquet
│       ├── 00000-0-shmily_20221104110742_f0e29e2e-4225-4e42-9699-524a19dacf44-job_1667529535736_0024-00001.parquet
│       ├── 00000-0-shmily_20221104110805_1827ea66-b863-4ecb-adc2-285470309490-job_1667529535736_0025-00001.parquet
│       ├── 00000-0-shmily_20221104110831_1019e38e-4845-4792-83fb-39822e497983-job_1667529535736_0026-00001.parquet
│       ├── 00000-0-shmily_20221104110919_4f1e3e96-1d37-40f3-aa2e-2a0139170381-job_1667529535736_0027-00001.parquet
│       ├── 00000-0-shmily_20221104110943_9ff35be2-cec0-4389-843e-395c7c6ec428-job_1667529535736_0028-00001.parquet
│       └── 00000-0-shmily_20221104111008_7a8ca8e3-d4c6-4aa1-81b6-c79d6ba7dd4f-job_1667529535736_0029-00001.parquet
├── metadata
│   ├── 03710058-d552-4dc1-b9cb-9340729e8f5e-m0.avro
│   ├── 09fd33b8-c9ce-4f7c-b871-ca31f096e3b1-m0.avro
│   ├── 12169e7b-13cb-4393-8158-1c9effe14e8f-m0.avro
│   ├── 2e834cae-756b-4498-a82a-2418db4b1092-m0.avro
│   ├── 3486a62e-1d74-49bd-bad3-c61187fac97f-m0.avro
│   ├── 3a9351b4-388b-44d9-8243-4a11189d81b2-m0.avro
│   ├── 3d0a560f-06f4-4402-a388-0b3cc7e25598-m0.avro
│   ├── 40862af2-6d95-40b8-a979-2360ea3b7175-m0.avro
│   ├── 44671db7-02ca-47c1-a229-c7f62d8aa12f-m0.avro
│   ├── 551c586b-9c4d-4ca4-a0ef-d46c30fb01f8-m0.avro
│   ├── 5baf0fec-3247-48f3-84f6-2f6402e866c7-m0.avro
│   ├── 639416fc-47c0-452e-a1d9-f17864cf008f-m0.avro
│   ├── 63ab2797-6a07-4886-9c27-43765bc31851-m0.avro
│   ├── 74ca6f5e-4eab-45d9-b0b1-04ba48e53971-m0.avro
│   ├── 8a4ce917-5986-4a95-9573-62103a116559-m0.avro
│   ├── 90bcc77d-5516-4c3e-96c8-242713920b1b-m0.avro
│   ├── 9f87073d-4cbc-46e9-b4dd-42fc28c86726-m0.avro
│   ├── a4c74672-5ace-4a79-aca9-677926532794-m0.avro
│   ├── b0e01ba5-bbe9-4ce4-ad9e-f07ab774a041-m0.avro
│   ├── b1a2cd1f-e30a-4c45-9119-9ba0e185cc58-m0.avro
│   ├── b209efb3-aab3-4bc2-a821-0557a0cda8d3-m0.avro
│   ├── b7c5b752-49d4-4840-8990-fb4a84e0f71d-m0.avro
│   ├── b9ba125d-bd76-483d-94f7-f6a9b664f633-m0.avro
│   ├── bcc5bf7b-7f01-4969-9f4c-cc9c1c920029-m0.avro
│   ├── d5b51efd-32b4-4948-9cc8-f2422919f1d7-m0.avro
│   ├── d6492cb2-7012-4668-9af9-c25cbe4df95a-m0.avro
│   ├── e511d02d-ecd8-4a3f-b8b7-45ad864026dc-m0.avro
│   ├── e652600f-4167-4f59-92cc-45faf15b03b1-m0.avro
│   ├── f0e6c6ca-51a7-42e6-b412-4036e27c7d98-m0.avro
│   ├── f3646fc2-7e64-4395-8adc-cd6a75413d37-m0.avro
│   ├── f91de7e0-2bf3-4804-bb28-64b61ebc588f-m0.avro
│   ├── snap-1244418053907939374-1-44671db7-02ca-47c1-a229-c7f62d8aa12f.avro
│   ├── snap-1477308230043616149-1-f91de7e0-2bf3-4804-bb28-64b61ebc588f.avro
│   ├── snap-1490572932134542813-1-5baf0fec-3247-48f3-84f6-2f6402e866c7.avro
│   ├── snap-1778869542790618047-1-12169e7b-13cb-4393-8158-1c9effe14e8f.avro
│   ├── snap-2054318792294634903-1-f3646fc2-7e64-4395-8adc-cd6a75413d37.avro
│   ├── snap-2520326235035414997-1-b7c5b752-49d4-4840-8990-fb4a84e0f71d.avro
│   ├── snap-3185789235788477057-1-e652600f-4167-4f59-92cc-45faf15b03b1.avro
│   ├── snap-3406584701390941146-1-b209efb3-aab3-4bc2-a821-0557a0cda8d3.avro
│   ├── snap-3684994728472824032-1-9f87073d-4cbc-46e9-b4dd-42fc28c86726.avro
│   ├── snap-3706799770416474623-1-a4c74672-5ace-4a79-aca9-677926532794.avro
│   ├── snap-3951591399252751391-1-3a9351b4-388b-44d9-8243-4a11189d81b2.avro
│   ├── snap-4081427338556096982-1-d5b51efd-32b4-4948-9cc8-f2422919f1d7.avro
│   ├── snap-4367759472594176887-1-b0e01ba5-bbe9-4ce4-ad9e-f07ab774a041.avro
│   ├── snap-4477640749996566080-1-63ab2797-6a07-4886-9c27-43765bc31851.avro
│   ├── snap-4792262885242972970-1-551c586b-9c4d-4ca4-a0ef-d46c30fb01f8.avro
│   ├── snap-501818490576080743-1-3486a62e-1d74-49bd-bad3-c61187fac97f.avro
│   ├── snap-558299450529529123-1-bcc5bf7b-7f01-4969-9f4c-cc9c1c920029.avro
│   ├── snap-6000755959745218957-1-09fd33b8-c9ce-4f7c-b871-ca31f096e3b1.avro
│   ├── snap-6590633258547705279-1-639416fc-47c0-452e-a1d9-f17864cf008f.avro
│   ├── snap-70006429373167712-1-d6492cb2-7012-4668-9af9-c25cbe4df95a.avro
│   ├── snap-7258286604987289050-1-03710058-d552-4dc1-b9cb-9340729e8f5e.avro
│   ├── snap-7353150060042609479-1-e511d02d-ecd8-4a3f-b8b7-45ad864026dc.avro
│   ├── snap-7512257803790292671-1-b9ba125d-bd76-483d-94f7-f6a9b664f633.avro
│   ├── snap-7520911403174383355-1-90bcc77d-5516-4c3e-96c8-242713920b1b.avro
│   ├── snap-7612339408675772086-1-40862af2-6d95-40b8-a979-2360ea3b7175.avro
│   ├── snap-7688152750730458585-1-f0e6c6ca-51a7-42e6-b412-4036e27c7d98.avro
│   ├── snap-8654338094020315416-1-8a4ce917-5986-4a95-9573-62103a116559.avro
│   ├── snap-8685114841540976719-1-b1a2cd1f-e30a-4c45-9119-9ba0e185cc58.avro
│   ├── snap-8693851636236625016-1-2e834cae-756b-4498-a82a-2418db4b1092.avro
│   ├── snap-8855760427151465849-1-3d0a560f-06f4-4402-a388-0b3cc7e25598.avro
│   ├── snap-9102081850556452524-1-74ca6f5e-4eab-45d9-b0b1-04ba48e53971.avro
│   ├── v1.metadata.json
│   ├── v2.metadata.json
│   ├── v3.metadata.json
│   ├── v4.metadata.json
│   ├── v5.metadata.json
│   ├── v6.metadata.json
│   ├── v7.metadata.json
│   ├── v8.metadata.json
│   ├── v9.metadata.json
│   ├── v10.metadata.json
│   ├── v11.metadata.json
│   ├── v12.metadata.json
│   ├── v13.metadata.json
│   ├── v14.metadata.json
│   ├── v15.metadata.json
│   ├── v16.metadata.json
│   ├── v17.metadata.json
│   ├── v18.metadata.json
│   ├── v19.metadata.json
│   ├── v20.metadata.json
│   ├── v21.metadata.json
│   ├── v22.metadata.json
│   ├── v23.metadata.json
│   ├── v24.metadata.json
│   ├── v25.metadata.json
│   ├── v26.metadata.json
│   ├── v27.metadata.json
│   ├── v28.metadata.json
│   ├── v29.metadata.json
│   ├── v30.metadata.json
│   ├── v31.metadata.json
│   ├── v32.metadata.json
│   └── version-hint.text
└── temp

其中有32个MetadataFile文件(metadata.json),31个ManifestList文件(snap-*.avro),31个ManifestFile文件(xx-m0.avro)以及31个DataFile(xx.parquet)文件.
n次commit会带来3n+1个文件落盘.
执行清理(合并数据文件->清理过期快照->重写ManifestFile->清理孤立文件)后,小文件数量多的问题会有明显改善,结果如下:

hadoop_iceberg_partitioned_table_after
├── data
│   ├── dt=20221010
│   │   ├── 00000-0-hive_20221102105407_0605b24c-e823-4244-a994-83887ea7e430-job_1667357081446_0002-00001.parquet
│   │   ├── 00000-1-5ea18300-180b-465d-8310-bbbf422e15b8-00001.parquet
│   │   ├── 00000-1-78fcc067-f967-465f-be58-f5beff8561dd-00001.parquet
│   │   ├── 00000-2-be5c1e4a-254f-4503-9ff5-f8817a9e92f7-00001.parquet
│   │   └── 00000-613-8bdfbded-0300-425a-8201-031920536100-00001.parquet
│   ├── dt=20221011
│   │   ├── 00000-0-d75b9b2f-4794-42e2-94fe-f6ae22ccd7d9-00001.parquet
│   │   ├── 00000-0-hive_20221102105315_5bb17fc0-3092-4bed-8839-253f19117b6d-job_1667357081446_0001-00001.parquet
│   │   ├── 00000-2-1402730f-cc62-4daf-a47e-a8aecaa545c8-00001.parquet
│   │   ├── 00000-2-73f4cb74-1ab1-494d-ba4d-242b070bb82d-00001.parquet
│   │   └── 00000-611-6edbc04e-0919-4ae4-be30-89a8b91478e6-00001.parquet
│   └── dt=20221104
│       ├── 00000-0-11b9e51d-1ebd-496c-ae07-9e480c92c35e-00001.parquet
│       ├── 00000-0-274072c6-cefa-4395-ab31-016eacd19f08-00001.parquet
│       ├── 00000-0-ad26a63a-9b36-4c7d-9cb9-109aafae96fc-00001.parquet
│       ├── 00000-1-bd95039f-65c4-4b19-938e-185d615e3e0d-00001.parquet
│       └── 00000-612-73dd262d-7377-42f7-87e8-024447dc6fd6-00001.parquet
├── metadata
│   ├── 6d24ddd9-be10-42f3-a5d6-4551ee5a8bf0-m0.avro
│   ├── 79077b45-29e8-4d19-89a0-aef243b6a4ca-m0.avro
│   ├── 79077b45-29e8-4d19-89a0-aef243b6a4ca-m1.avro
│   ├── 7f9cc85d-0de4-4c4c-bde7-54d4f4f78447-m0.avro
│   ├── ae99e2ab-fcc5-44b0-bd94-f2d73eab22f3-m0.avro
│   ├── ae99e2ab-fcc5-44b0-bd94-f2d73eab22f3-m1.avro
│   ├── d2eeb7b2-9607-4c5b-bdc5-6cfe2c81ed94-m0.avro
│   ├── e52f34b0-ff45-4207-87a0-95b1897da11a-m0.avro
│   ├── e52f34b0-ff45-4207-87a0-95b1897da11a-m1.avro
│   ├── e715d26c-152c-4ff3-9533-7860d920503d-m0.avro
│   ├── e715d26c-152c-4ff3-9533-7860d920503d-m1.avro
│   ├── snap-2296367325872747730-1-e715d26c-152c-4ff3-9533-7860d920503d.avro
│   ├── snap-3114464783889165727-1-6d24ddd9-be10-42f3-a5d6-4551ee5a8bf0.avro
│   ├── snap-4397206702551297792-1-ae99e2ab-fcc5-44b0-bd94-f2d73eab22f3.avro
│   ├── snap-5015314203544980905-1-7f9cc85d-0de4-4c4c-bde7-54d4f4f78447.avro
│   ├── snap-761586103173871579-1-79077b45-29e8-4d19-89a0-aef243b6a4ca.avro
│   ├── snap-8390029841836840556-1-d2eeb7b2-9607-4c5b-bdc5-6cfe2c81ed94.avro
│   ├── snap-8895954507409072148-1-e52f34b0-ff45-4207-87a0-95b1897da11a.avro
│   ├── v38.metadata.json
│   ├── v39.metadata.json
│   ├── v40.metadata.json
│   ├── v41.metadata.json
│   ├── v42.metadata.json
│   ├── v43.metadata.json
│   └── version-hint.text
└── temp

metadata数控制

在Iceberg中,每次触发事务提交都会生成一个metadata.json,应当避免metadata文件无限增长,可以在建表时指定如下参数:

'write.metadata.delete-after-commit.enabled'='true' # 发生commit后,是否删除比较旧的metadata文件
'write.metadata.previous-versions-max'='9' # 保留的最大历史metadata文件数量,超过该历史版本数量的老的metadata文件会被删除

这样可以自动控制MetadataFile文件数为9个.

清理过期snapshot

清理Iceberg表过期快照的Demo
Flink实现:
ClearExpiredSnapshots
Spark实现:
SparkIcebergTableMaintenance$expireSnapshots

数据文件重写

流式数据写入可能会产生大量小的数据文件,Iceberg提供了rewriteDataFiles(Compaction)操作,可以定期合并小文件,提高查询性能.
SparkIcebergTableMaintenance$compactDataFiles

元数据文件重写

每次Commit都会产生一个metadata文件,随着时间的推移,实时任务写入的MetadataFile数越来越多,做合并可以降低文件数,提升查询效率.
SparkIcebergTableMaintenance$rewriteManifests

清理孤立文件

SparkIcebergTableMaintenance$removeOrphanFiles

异常处理

ManifestFile文件丢失

2023-01-30 09:36:57,558 WARN  org.apache.flink.runtime.taskmanager.Task [] - IcebergFilesCommitter -> Sink: IcebergSink (1
/1)#0 (2125b52f518a53194e79e9f5d86dbb78) switched from RUNNING to FAILED with failure cause: org.apache.iceberg.exceptions.NotFoundException:
 Failed to open input stream for file: oss://xxxxx/user/hive/warehouse/iceberg_db/xxxxx/metadata/f86794c3-750d-4def-ad2d-b726c4c210ad-m0.avro
        at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:183)
        at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
        at org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:65)
        at org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:115)
        at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:91)
        at org.apache.iceberg.SnapshotProducer.newManifestReader(SnapshotProducer.java:448)
        at org.apache.iceberg.MergingSnapshotProducer$DataFileMergeManager.newManifestReader(MergingSnapshotProducer.java:1005)
        at org.apache.iceberg.ManifestMergeManager.createManifest(ManifestMergeManager.java:175)
        at org.apache.iceberg.ManifestMergeManager.lambda$mergeGroup$1(ManifestMergeManager.java:156)
        at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:402)
        at org.apache.iceberg.util.Tasks$Builder.access$300(Tasks.java:68)
        at org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:308)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: ErrorCode : 25002 , ErrorMsg: File not found.  [RequestId]: ...

原因: 可能合并任务异常导致
解决:

// 将丢失ManifestFile文件从元数据中移除,以解决表不可用的问题
        String lostManifestFilePath = "xxxxx"
        Table table = getTable(dbName, tabName);
        Snapshot snapshot = table.currentSnapshot();
        List<ManifestFile> manifestFiles = snapshot.allManifests(table.io());
        List<ManifestFile> manifestFileDeletes = new ArrayList<>();
        for (ManifestFile manifestFile : manifestFiles) {
            String path = manifestFile.path();
            if (path.equals(lostManifestFilePath)) {
                manifestFileDeletes.add(manifestFile);
                break;
            }
        }
        if (manifestFileDeletes.isEmpty()) {
            throw new Exception(StringUtils.format("Manifest File:%s not in metadata",lostManifestFilePath));
        }
        RewriteManifests rewriteManifests = table.rewriteManifests();
        for (ManifestFile manifestFile : manifestFileDeletes) {
            rewriteManifests.deleteManifest(manifestFile);
        }
        rewriteManifests.commit();
// 代码执行过程中可能抛出org.apache.iceberg.exceptions.ValidationException:Replaced and created manifests must have the same number of active files: 0 (new), 5567 (old)
// 修改iceberg-core位于core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java activeFilesCount方法注释掉如下两行
//      activeFilesCount += manifest.addedFilesCount();
//      activeFilesCount += manifest.existingFilesCount();

Flink写入Iceberg无法找到avro文件,导致任务报错无法写入

org.apache.iceberg.exceptions.NotFoundException: Failed to open input stream for file: oss://bucket_name/user/hive/warehouse/iceberg_db/user_experience_report/metadata/32759abff25a1366837ed3d146e27d51-55f7b63bf1c8c02b88d8659b98477e64-00000-2-71-00037.avro
    at org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:183)
    at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
    at org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:65)
    at org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:115)
    at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:91)
    at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:72)
    at org.apache.iceberg.flink.sink.FlinkManifestUtil.readDataFiles(FlinkManifestUtil.java:58)
    at org.apache.iceberg.flink.sink.FlinkManifestUtil.readCompletedFiles(FlinkManifestUtil.java:113)
    at org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:244)
    at org.apache.iceberg.flink.sink.IcebergFilesCommitter.initializeState(IcebergFilesCommitter.java:184)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:119)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:748)
  ......

可能原因: 此文件不是MainfestList文件也不是ManifestFile文件,而是Flink写入Iceberg时一种中间状态的文件,可能原因是checkpoint超时或时间过长,但该异常与合并和清理任务无关
解决:

hdfs dfs -ls -r -t oss://bucket_name/user/hive/warehouse/iceberg_db/user_experience_report/metadata/ | grep avro | grep -v snap | grep -v m0 | grep -v m1 | grep -v m2 | grep -v m3
找到最新avro 拷贝并重命名为缺失的avro 同时优化checkpoint稳定性

HiveCatalog下锁表造成提交commit失败

23/05/14 03:32:34 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: org.apache.iceberg.exceptions.CommitFailedException: Timed out after 183898 ms waiting for lock on iceberg_db.user_experience_report_user_details

解决:到Hive元数据库select * from metastore.hive_locks; DELETE FROM metastore.hive_locks WHERE HL_DB=’iceberg_db’ AND HL_TABLE=’user_experience_report_user_details’;再重跑iceberg任务即可

对比Hudi和DeltaLake

对比维度\技术 Iceberg Hudi DeltaLake
数据管理 通过metadata文件管理 通过metadata文件管理 通过metadata文件管理
使用场景 流批一体,高性能分析与可靠数据管理 流批一体,Upsert场景 流批一体,融合Spark生态
ACID 支持 支持 支持
ACID隔离级别 Write Serialization(写串行执行) Snapshot Isolation(写数据若无交集则并发写,否则串行) Serialization(读写都必须串行)/Write Serialization/Snapshot Isolation
Schema演化 支持 支持 支持
数据操作 支持Update/Delete 支持Upsert/Delete 支持Update/Delete/Merge
流式读 支持 支持 支持
流式写 支持 支持 支持
并发控制 乐观 乐观 乐观
文件清理 手动 自动 手动
Compaction 手动 自动 手动
外部依赖 完全解耦 依赖Spark 依赖Spark
CopyOnWrite 支持 支持 支持
MergeOnRead v2表支持,v1表不支持 支持 不支持
字段加密 v3表计划支持 不支持 不支持

参考

Apache Iceberg
Iceberg概述
深度对比 Delta、Iceberg 和 Hudi 三大开源数据湖方案


你自以为的极限,只是别人的起点