
在第1篇文章中,我们介绍了 catalog 接口,以及如何加载 catalog 并使用它创建具有已定义结构的表。在 Java API 系列的第2篇文章中,我们将介绍查询引擎通常用于执行表扫描的核心 Java API,它也可以用于开发需要与 Iceberg 核心内部组件交互的程序。如果你是计算框架或查询引擎的开发人员,那么这是你需要熟悉 Iceberg 客户端的一个领域。但是,如果你是一名通过其他工具使用 Iceberg 的数据专业人员,通常不会直接使用 Java 客户端执行表扫描,但是你可能会发现这篇文章是一个隐藏于引擎下的有用工具。
就像第1部分一样,我们将使用我们较早的文章之一,Docker、Spark 和 Iceberg:体验 Iceberg 的最快方式。如果你已经在本地缓存了 tabulario/spark-iceberg 镜像,请确保通过运行以下操作来获取最新版本。
docker-compose pull
docker-compose up -d
docker exec -it spark-iceberg notebook
你可以找到一个完全可运行的notebook,名为Iceberg - An Introduction to the Iceberg Java API.ipynb。如果你向下翻到它的第2部分,你可以找到本文中的所有代码片段。现在开始吧。
在第1篇文章中,我们已加载了在docker环境中配置的JDBC目录。创建一个demo.webapp.logs表。
import org.apache.iceberg.catalog.Catalog;import org.apache.hadoop.conf.Configuration;import org.apache.iceberg.CatalogProperties;import org.apache.iceberg.jdbc.JdbcCatalog;import org.apache.iceberg.hadoop.HadoopFileIO;import org.apache.iceberg.catalog.Namespace;import org.apache.iceberg.catalog.TableIdentifier;import org.apache.iceberg.Schema;import org.apache.iceberg.types.Types;import org.apache.iceberg.PartitionSpec;import org.apache.iceberg.catalog.TableIdentifier;Map<String, String> properties = new HashMap<>();properties.put(CatalogProperties.CATALOG_IMPL, JdbcCatalog.class.getName());properties.put(CatalogProperties.URI, "jdbc:postgresql://postgres:5432/demo_catalog");properties.put(JdbcCatalog.PROPERTY_PREFIX + "user", "admin");properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");properties.put(CatalogProperties.WAREHOUSE_LOCATION, "/home/iceberg/warehouse");properties.put(CatalogProperties.FILE_IO_IMPL, HadoopFileIO.class.getName());JdbcCatalog catalog = new JdbcCatalog();Configuration conf = new Configuration();catalog.setConf(conf);catalog.initialize("demo", properties);Namespace webapp = Namespace.of("webapp");Schema schema = new Schema(Types.NestedField.required(1, "level", Types.StringType.get()),Types.NestedField.required(2, "event_time", Types.TimestampType.withZone()),Types.NestedField.required(3, "message", Types.StringType.get()),Types.NestedField.optional(4, "call_stack", Types.ListType.ofRequired(5, Types.StringType.get())));PartitionSpec spec = PartitionSpec.builderFor(schema).hour("event_time").identity("level").build();TableIdentifier name = TableIdentifier.of(webapp, "logs");catalog.createTable(name, schema, spec)
在开始执行表扫描之前,我们需要在表中添加一些数据。让我们用Java启动一个Spark会话并运行一些INSERT查询。我们将使用尝试Iceberg的最快方法文章中描述docker环境的所有配置参数,要了解更多关于这些属性的功能,请查看文章中描述最小运行时的部分。
import org.apache.spark.sql.SparkSession;SparkSession spark = SparkSession.builder().master("local[*]").appName("Java API Demo").config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions").config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog").config("spark.sql.catalog.demo.catalog-impl", "org.apache.iceberg.jdbc.JdbcCatalog").config("spark.sql.catalog.demo.uri", "jdbc:postgresql://postgres:5432/demo_catalog").config("spark.sql.catalog.demo.jdbc.user", "admin").config("spark.sql.catalog.demo.jdbc.password", "password").config("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO").config("spark.sql.catalog.demo.warehouse", "/home/iceberg/warehouse").config("spark.sql.defaultCatalog", "demo").config("spark.eventLog.enabled", "true").config("spark.eventLog.dir", "/home/iceberg/spark-events").config("spark.history.fs.logDirectory", "/home/iceberg/spark-events").getOrCreate();spark.sparkContext().setLogLevel("ERROR"); // Set the log level to ERROR otherwise we'll get a ton of INFO messages
现在,我们可以使用spark提交INSERT,向上述创建的demo.webapp.logs表添加数据。
String query = "INSERT INTO demo.webapp.logs "+ "VALUES "+ "('info', timestamp 'today', 'Just letting you know!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3')), "+ "('warning', timestamp 'today', 'You probably should not do this!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3')), "+ "('error', timestamp 'today', 'This was a fatal application error!', array('stack trace line 1', 'stack trace line 2', 'stack trace line 3'))";spark.sql(query).show()
现在我们有了一些数据,我们需要从catalog中加载我们的表。这首先涉及到加载catalog,就像我们在第1篇文章中所做的那样。
import org.apache.iceberg.catalog.Catalog;import org.apache.hadoop.conf.Configuration;import org.apache.iceberg.CatalogProperties;import org.apache.iceberg.jdbc.JdbcCatalog;import org.apache.iceberg.hadoop.HadoopFileIO;Map<String, String> properties = new HashMap<>();properties.put(CatalogProperties.CATALOG_IMPL, JdbcCatalog.class.getName());properties.put(CatalogProperties.URI, "jdbc:postgresql://postgres:5432/demo_catalog");properties.put(JdbcCatalog.PROPERTY_PREFIX + "user", "admin");properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");properties.put(CatalogProperties.WAREHOUSE_LOCATION, "/home/iceberg/warehouse");properties.put(CatalogProperties.FILE_IO_IMPL, HadoopFileIO.class.getName());JdbcCatalog catalog = new JdbcCatalog();Configuration conf = new Configuration();catalog.setConf(conf);catalog.initialize("demo", properties);
接下来,我们需要为webapp数据库创建一个Namespace对象,为logs表创建一个 TableIdentifier。然后可以将此 TableIdentifier 传递给 JdbcCatalog 实例上的 loadTable 方法。
import org.apache.iceberg.Table;import org.apache.iceberg.TableScan;import org.apache.iceberg.catalog.Namespace;import org.apache.iceberg.catalog.TableIdentifier;Namespace webapp = Namespace.of("webapp");TableIdentifier name = TableIdentifier.of(webapp, "logs");Table table = catalog.loadTable(name);
正如在介绍中提到的,使用Java客户端可以对Iceberg表执行不同类型的扫描。如果你只是想从表中检索数据,并可选地传入一个筛选条件表达式,那么你可以使用IcebergGenerics。它可以让我们在不使用任何过滤条件的情况下执行扫描。
import org.apache.iceberg.io.CloseableIterable;import org.apache.iceberg.data.Record;import org.apache.iceberg.data.IcebergGenerics;CloseableIterable<Record> result = IcebergGenerics.read(table).build();
result 是一个拥有Record实例的iterable,该iterable包含来自demo.webapp.logs表的数据。尝试遍历结果并打印出每个Record。
for (Record r: result) {System.out.println(r);}
输出为:
Record(info, 2022-04-28T00:00Z, Just letting you know!, [This, Then this, And then finally this!])Record(warning, 2022-04-28T00:00Z, You probably should not do this!, [This, Then this, And then finally this!])Record(error, 2022-04-28T00:00Z, This was a fatal application error!, [This, Then this, And then finally this!])
通常,并不推荐进行全表扫描,特别是对于包含大量数据的表。通过使用 Iceberg 表达式(Expression),你可以将where子句添加到扫描构建器,从而向表扫描添加过滤器。让我们添加一个表达式来过滤扫描level字段值为error的数据。
import org.apache.iceberg.expressions.Expressions;CloseableIterable<Record> result = IcebergGenerics.read(table).where(Expressions.equal("level", "error")).build();
输出为:
Record(error, 2022-04-28T00:00Z, This was a fatal application error!, [This, Then this, And then finally this!])
对于从Iceberg表中提取一些数据到内存中,使用IcebergGenerics非常有效。但是,当将Iceberg集成到计算框架或查询引擎中时,更有用的方法是生成与特定表达式匹配的文件列表。为此,可以构造一个TableScan对象来生成一组任务。
import org.apache.iceberg.CombinedScanTask;import org.apache.iceberg.TableScan;TableScan scan = table.newScan();
就像使用IcebergGenerics一样,你可以将表达式应用于扫描。让我们添加一个表达式来过滤扫描,只包含level字段值为error的数据记录。
import org.apache.iceberg.expressions.Expressions;TableScan filteredScan = scan.filter(Expressions.equal("level", "info")).select("message")
现在我们可以从筛选后的扫描中检索任务列表。
Iterable<CombinedScanTask> result = filteredScan.planTasks();
一个CombinedScanTask包括可以包含不同类型文件的FileScanTask实例,例如数据文件、位置删除文件或相等删除文件。让我们取出第一个文件并检查它。
import org.apache.iceberg.DataFile;CombinedScanTask task = result.iterator().next();DataFile dataFile = task.files().iterator().next().file();System.out.println(dataFile);
输出为:
GenericDataFile{content=data, file_path=/home/iceberg/warehouse/webapp/logs/data/event_time_hour=2022-04-28-00/level=info/00000-7-35232059-9181-4335-94ce-5cec73f38fa6-00001.parquet, file_format=PARQUET, spec_id=0, partition=PartitionData{event_time_hour=458640, level=info}, record_count=1, file_size_in_bytes=1523, column_sizes=null, value_counts=null, null_value_counts=null, nan_value_counts=null, lower_bounds=null, upper_bounds=null, key_metadata=null, split_offsets=[4], equality_ids=null, sort_order_id=null}
数据文件实例包含有关文件的大量信息,例如:- 路径 - 格式 - 大小 - 分区 - 它包含的记录数
此外,如果文件是使用包含列级统计数据的写入者提交的,则数据文件还将包括诸如列的下限/上限以及null计数之类的信息!
表扫描是 Iceberg 功能中非常重要的一部分。如果你需要构建大规模的ETL或运行高性能查询,那么你几乎总是通过计算引擎(译者注:如Spark\Flink)使用Iceberg。计算引擎可以为你构建 Iceberg 表扫描,并将其与特定工具捆绑在一起。因此,在这些计算引擎上工作的工程师非常熟悉扫描API,并且知道如何使用扫描任务来显著缩小必须由引擎框架进行处理的文件。另一方面,引擎的自动调优与声明式语法,让使用这些框架的工程师不需要了解幕后发生了什么,就可以写出高性能的程序。
到目前为止,我们已经讨论了catalogs和tables,以及如何执行表扫描,以便从Iceberg 表中提取数据或生成一组待进一步处理的任务。请继续关注本系列中揭示 Iceberg Java 客户端的更多文章,如果你想成为 Iceberg 社区的一员,请随时查看我们的社区页面,了解在哪里可以找到我们
原作者:Sam Redai
出处:https://tabular.io/blog/java-api-part-2/

