wgtmac commented on code in PR #417:
URL: https://github.com/apache/iceberg-cpp/pull/417#discussion_r2631686551


##########
src/iceberg/catalog.h:
##########
@@ -27,6 +27,7 @@
 #include <vector>
 
 #include "iceberg/result.h"
+#include "iceberg/sort_order.h"

Review Comment:
   ```suggestion
   ```
   
   Forward declaration is enough for const reference.



##########
src/iceberg/catalog/rest/rest_catalog.cc:
##########
@@ -241,11 +244,50 @@ Result<std::vector<TableIdentifier>> 
RestCatalog::ListTables(
 }
 
 Result<std::unique_ptr<Table>> RestCatalog::CreateTable(
-    [[maybe_unused]] const TableIdentifier& identifier,
-    [[maybe_unused]] const Schema& schema, [[maybe_unused]] const 
PartitionSpec& spec,
-    [[maybe_unused]] const std::string& location,
-    [[maybe_unused]] const std::unordered_map<std::string, std::string>& 
properties) {
-  return NotImplemented("Not implemented");
+    const TableIdentifier& identifier, const Schema& schema, const 
PartitionSpec& spec,
+    const SortOrder& order, const std::string& location,
+    const std::unordered_map<std::string, std::string>& properties) {
+  ICEBERG_RETURN_UNEXPECTED(CheckEndpoint(supported_endpoints_, 
Endpoint::CreateTable()));
+  ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(identifier.ns));
+
+  // Schema, partition spec, and sort order are not copyable due to Lazy<> 
members,

Review Comment:
   This is a good justification to change the function signature to use `const 
std::shared_ptr<X>&` for them.



##########
src/iceberg/catalog/rest/json_internal.cc:
##########
@@ -336,6 +339,57 @@ Result<ListTablesResponse> 
ListTablesResponseFromJson(const nlohmann::json& json
   return response;
 }
 
+nlohmann::json ToJson(const CreateTableRequest& request) {
+  nlohmann::json json;
+  json[kName] = request.name;
+  SetOptionalStringField(json, kLocation, request.location);
+  if (request.schema) {
+    json[kSchema] = ToJson(*request.schema);
+  }
+  if (request.partition_spec) {
+    json[kPartitionSpec] = ToJson(*request.partition_spec);
+  }
+  if (request.write_order) {
+    json[kWriteOrder] = ToJson(*request.write_order);
+  }
+  if (request.stage_create) {
+    json[kStageCreate] = request.stage_create;
+  }
+  SetContainerField(json, kProperties, request.properties);
+  return json;
+}
+
+Result<CreateTableRequest> CreateTableRequestFromJson(const nlohmann::json& 
json) {
+  CreateTableRequest request;
+  ICEBERG_ASSIGN_OR_RAISE(request.name, GetJsonValue<std::string>(json, 
kName));
+  ICEBERG_ASSIGN_OR_RAISE(request.location,
+                          GetJsonValueOrDefault<std::string>(json, kLocation));
+  ICEBERG_ASSIGN_OR_RAISE(auto schema_json, GetJsonValue<nlohmann::json>(json, 
kSchema));
+  ICEBERG_ASSIGN_OR_RAISE(request.schema, SchemaFromJson(schema_json));
+
+  if (auto spec_json_result = GetJsonValue<nlohmann::json>(json, 
kPartitionSpec);
+      spec_json_result.has_value()) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        request.partition_spec,
+        PartitionSpecFromJson(request.schema, spec_json_result.value(),
+                              PartitionSpec::kInitialSpecId));
+  }
+
+  if (auto order_json_result = GetJsonValue<nlohmann::json>(json, kWriteOrder);

Review Comment:
   Same as above.



##########
src/iceberg/catalog/rest/json_internal.cc:
##########
@@ -336,6 +339,57 @@ Result<ListTablesResponse> 
ListTablesResponseFromJson(const nlohmann::json& json
   return response;
 }
 
+nlohmann::json ToJson(const CreateTableRequest& request) {
+  nlohmann::json json;
+  json[kName] = request.name;
+  SetOptionalStringField(json, kLocation, request.location);
+  if (request.schema) {
+    json[kSchema] = ToJson(*request.schema);
+  }
+  if (request.partition_spec) {
+    json[kPartitionSpec] = ToJson(*request.partition_spec);
+  }
+  if (request.write_order) {
+    json[kWriteOrder] = ToJson(*request.write_order);
+  }
+  if (request.stage_create) {
+    json[kStageCreate] = request.stage_create;
+  }
+  SetContainerField(json, kProperties, request.properties);
+  return json;
+}
+
+Result<CreateTableRequest> CreateTableRequestFromJson(const nlohmann::json& 
json) {
+  CreateTableRequest request;
+  ICEBERG_ASSIGN_OR_RAISE(request.name, GetJsonValue<std::string>(json, 
kName));
+  ICEBERG_ASSIGN_OR_RAISE(request.location,
+                          GetJsonValueOrDefault<std::string>(json, kLocation));
+  ICEBERG_ASSIGN_OR_RAISE(auto schema_json, GetJsonValue<nlohmann::json>(json, 
kSchema));
+  ICEBERG_ASSIGN_OR_RAISE(request.schema, SchemaFromJson(schema_json));
+
+  if (auto spec_json_result = GetJsonValue<nlohmann::json>(json, 
kPartitionSpec);
+      spec_json_result.has_value()) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        request.partition_spec,
+        PartitionSpecFromJson(request.schema, spec_json_result.value(),
+                              PartitionSpec::kInitialSpecId));
+  }

Review Comment:
   We cannot silently ignore any parsing error like this.



##########
src/iceberg/test/rest_catalog_test.cc:
##########
@@ -337,4 +410,78 @@ TEST_F(RestCatalogIntegrationTest, DropNamespace) {
   EXPECT_FALSE(*exists_result);
 }
 
+TEST_F(RestCatalogIntegrationTest, CreateTable) {
+  auto catalog_result = CreateCatalog();
+  ASSERT_THAT(catalog_result, IsOk());
+  auto& catalog = catalog_result.value();
+
+  // Create nested namespace with properties
+  Namespace ns{.levels = {"test_create_table", "apple", "ios"}};
+  std::unordered_map<std::string, std::string> ns_properties{{"owner", "ray"},
+                                                             {"community", 
"apache"}};
+
+  // Create parent namespaces first
+  auto status = catalog->CreateNamespace(Namespace{.levels = 
{"test_create_table"}}, {});
+  EXPECT_THAT(status, IsOk());
+  status =
+      catalog->CreateNamespace(Namespace{.levels = {"test_create_table", 
"apple"}}, {});
+  EXPECT_THAT(status, IsOk());
+  status = catalog->CreateNamespace(ns, ns_properties);
+  EXPECT_THAT(status, IsOk());
+
+  // Create schema
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeOptional(1, "foo", string()),
+                               SchemaField::MakeRequired(2, "bar", int32()),
+                               SchemaField::MakeOptional(3, "baz", boolean())},
+      /*schema_id=*/1);
+
+  // Create partition spec and sort order (unpartitioned and unsorted)
+  auto partition_spec_result = 
PartitionSpec::Make(PartitionSpec::kInitialSpecId, {}, 0);
+  ASSERT_THAT(partition_spec_result, IsOk());
+
+  auto sort_order_result =
+      SortOrder::Make(SortOrder::kUnsortedOrderId, std::vector<SortField>{});
+  ASSERT_THAT(sort_order_result, IsOk());
+
+  // Create table
+  TableIdentifier table_id{.ns = ns, .name = "t1"};
+  std::unordered_map<std::string, std::string> table_properties;
+  auto table_result = catalog->CreateTable(table_id, *schema, 
**partition_spec_result,
+                                           **sort_order_result, "", 
table_properties);
+  ASSERT_THAT(table_result, IsOk());
+  auto& table = table_result.value();
+
+  // Verify table
+  EXPECT_EQ(table->name().ns.levels,
+            (std::vector<std::string>{"test_create_table", "apple", "ios"}));
+  EXPECT_EQ(table->name().name, "t1");
+
+  auto table_schema_result = table->schema();

Review Comment:
   Can we simplify the verification below? It looks too lengthy. Perhaps we can 
just verify the table name is good.



##########
src/iceberg/catalog/rest/rest_catalog.cc:
##########
@@ -103,18 +103,21 @@ Result<std::unique_ptr<RestCatalog>> RestCatalog::Make(
   ICEBERG_ASSIGN_OR_RAISE(auto final_uri, final_config->Uri());
   
ICEBERG_RETURN_UNEXPECTED(paths->SetBaseUri(std::string(TrimTrailingSlash(final_uri))));
 
-  return std::unique_ptr<RestCatalog>(
-      new RestCatalog(std::move(final_config), std::move(paths), 
std::move(endpoints)));
+  return std::shared_ptr<RestCatalog>(
+      new RestCatalog(std::move(final_config), std::move(paths), 
std::move(endpoints),
+                      std::move(file_io)));
 }
 
 RestCatalog::RestCatalog(std::unique_ptr<RestCatalogProperties> config,
                          std::unique_ptr<ResourcePaths> paths,
-                         std::unordered_set<Endpoint> endpoints)
+                         std::unordered_set<Endpoint> endpoints,
+                         std::shared_ptr<FileIO> file_io)
     : config_(std::move(config)),
       client_(std::make_unique<HttpClient>(config_->ExtractHeaders())),
       paths_(std::move(paths)),
       name_(config_->Get(RestCatalogProperties::kName)),
-      supported_endpoints_(std::move(endpoints)) {}
+      supported_endpoints_(std::move(endpoints)),
+      file_io_(std::move(file_io)) {}

Review Comment:
   ditto, placing `file_io_` between `config_` and `client_`.



##########
src/iceberg/test/rest_catalog_test.cc:
##########
@@ -337,4 +410,78 @@ TEST_F(RestCatalogIntegrationTest, DropNamespace) {
   EXPECT_FALSE(*exists_result);
 }
 
+TEST_F(RestCatalogIntegrationTest, CreateTable) {
+  auto catalog_result = CreateCatalog();
+  ASSERT_THAT(catalog_result, IsOk());
+  auto& catalog = catalog_result.value();
+
+  // Create nested namespace with properties
+  Namespace ns{.levels = {"test_create_table", "apple", "ios"}};
+  std::unordered_map<std::string, std::string> ns_properties{{"owner", "ray"},
+                                                             {"community", 
"apache"}};
+
+  // Create parent namespaces first
+  auto status = catalog->CreateNamespace(Namespace{.levels = 
{"test_create_table"}}, {});
+  EXPECT_THAT(status, IsOk());
+  status =
+      catalog->CreateNamespace(Namespace{.levels = {"test_create_table", 
"apple"}}, {});
+  EXPECT_THAT(status, IsOk());
+  status = catalog->CreateNamespace(ns, ns_properties);
+  EXPECT_THAT(status, IsOk());
+
+  // Create schema
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeOptional(1, "foo", string()),
+                               SchemaField::MakeRequired(2, "bar", int32()),
+                               SchemaField::MakeOptional(3, "baz", boolean())},
+      /*schema_id=*/1);
+
+  // Create partition spec and sort order (unpartitioned and unsorted)
+  auto partition_spec_result = 
PartitionSpec::Make(PartitionSpec::kInitialSpecId, {}, 0);
+  ASSERT_THAT(partition_spec_result, IsOk());
+
+  auto sort_order_result =
+      SortOrder::Make(SortOrder::kUnsortedOrderId, std::vector<SortField>{});
+  ASSERT_THAT(sort_order_result, IsOk());
+
+  // Create table
+  TableIdentifier table_id{.ns = ns, .name = "t1"};
+  std::unordered_map<std::string, std::string> table_properties;
+  auto table_result = catalog->CreateTable(table_id, *schema, 
**partition_spec_result,

Review Comment:
   In the end of this test, we can call this again to verify it fails to create 
the same table.



##########
src/iceberg/catalog/rest/json_internal.cc:
##########
@@ -336,6 +339,57 @@ Result<ListTablesResponse> 
ListTablesResponseFromJson(const nlohmann::json& json
   return response;
 }
 
+nlohmann::json ToJson(const CreateTableRequest& request) {
+  nlohmann::json json;
+  json[kName] = request.name;
+  SetOptionalStringField(json, kLocation, request.location);
+  if (request.schema) {
+    json[kSchema] = ToJson(*request.schema);
+  }
+  if (request.partition_spec) {
+    json[kPartitionSpec] = ToJson(*request.partition_spec);
+  }
+  if (request.write_order) {
+    json[kWriteOrder] = ToJson(*request.write_order);
+  }
+  if (request.stage_create) {
+    json[kStageCreate] = request.stage_create;
+  }
+  SetContainerField(json, kProperties, request.properties);
+  return json;
+}
+
+Result<CreateTableRequest> CreateTableRequestFromJson(const nlohmann::json& 
json) {
+  CreateTableRequest request;
+  ICEBERG_ASSIGN_OR_RAISE(request.name, GetJsonValue<std::string>(json, 
kName));
+  ICEBERG_ASSIGN_OR_RAISE(request.location,
+                          GetJsonValueOrDefault<std::string>(json, kLocation));
+  ICEBERG_ASSIGN_OR_RAISE(auto schema_json, GetJsonValue<nlohmann::json>(json, 
kSchema));
+  ICEBERG_ASSIGN_OR_RAISE(request.schema, SchemaFromJson(schema_json));
+
+  if (auto spec_json_result = GetJsonValue<nlohmann::json>(json, 
kPartitionSpec);
+      spec_json_result.has_value()) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        request.partition_spec,
+        PartitionSpecFromJson(request.schema, spec_json_result.value(),
+                              PartitionSpec::kInitialSpecId));
+  }

Review Comment:
   ```suggestion
     if (json.contains(kPartitionSpec)) {
       ICEBERG_ASSIGN_OR_RAISE(auto partition_spec,
           GetJsonValue<nlohmann::json>(json, kPartitionSpec));
       ICEBERG_ASSIGN_OR_RAISE(
           request.partition_spec,
           PartitionSpecFromJson(request.schema, partition_spec,
                                 PartitionSpec::kInitialSpecId));
     }
   ```



##########
src/iceberg/test/rest_catalog_test.cc:
##########
@@ -49,6 +57,70 @@ namespace iceberg::rest {
 
 namespace {
 
+// Simple local filesystem FileIO implementation for testing
+class SimpleLocalFileIO : public FileIO {

Review Comment:
   I'd recommend adding `iceberg/test/std_io.h` and renaming it to `StdFileIo` 
to share this across tests.



##########
src/iceberg/catalog/rest/rest_catalog.h:
##########
@@ -101,13 +104,14 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
  private:
   RestCatalog(std::unique_ptr<RestCatalogProperties> config,
               std::unique_ptr<ResourcePaths> paths,
-              std::unordered_set<Endpoint> endpoints);
+              std::unordered_set<Endpoint> endpoints, std::shared_ptr<FileIO> 
file_io);
 
   std::unique_ptr<RestCatalogProperties> config_;
   std::unique_ptr<HttpClient> client_;
   std::unique_ptr<ResourcePaths> paths_;
   std::string name_;
   std::unordered_set<Endpoint> supported_endpoints_;
+  std::shared_ptr<FileIO> file_io_;

Review Comment:
   Move it to be the 2nd member variable.



##########
src/iceberg/catalog/rest/rest_catalog.cc:
##########
@@ -103,18 +103,21 @@ Result<std::unique_ptr<RestCatalog>> RestCatalog::Make(
   ICEBERG_ASSIGN_OR_RAISE(auto final_uri, final_config->Uri());
   
ICEBERG_RETURN_UNEXPECTED(paths->SetBaseUri(std::string(TrimTrailingSlash(final_uri))));
 
-  return std::unique_ptr<RestCatalog>(
-      new RestCatalog(std::move(final_config), std::move(paths), 
std::move(endpoints)));
+  return std::shared_ptr<RestCatalog>(
+      new RestCatalog(std::move(final_config), std::move(paths), 
std::move(endpoints),
+                      std::move(file_io)));
 }
 
 RestCatalog::RestCatalog(std::unique_ptr<RestCatalogProperties> config,
                          std::unique_ptr<ResourcePaths> paths,
-                         std::unordered_set<Endpoint> endpoints)
+                         std::unordered_set<Endpoint> endpoints,
+                         std::shared_ptr<FileIO> file_io)

Review Comment:
   I prefer placing `file_io` as the 2nd argument. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to