diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index f07893c85594..19d47df4a88d 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -345,6 +345,7 @@ IMPLEMENT_SETTING_ENUM( {"glue", DatabaseDataLakeCatalogType::GLUE}, {"hive", DatabaseDataLakeCatalogType::ICEBERG_HIVE}, {"onelake", DatabaseDataLakeCatalogType::ICEBERG_ONELAKE}, + {"biglake", DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE}, {"paimon_rest", DatabaseDataLakeCatalogType::PAIMON_REST}}) IMPLEMENT_SETTING_ENUM( diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index 91b7eb080590..05c3b26340f4 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -437,6 +437,7 @@ enum class DatabaseDataLakeCatalogType : uint8_t GLUE, ICEBERG_HIVE, ICEBERG_ONELAKE, + ICEBERG_BIGLAKE, PAIMON_REST, }; diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 855e2cb7d948..deb7179381ec 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -10,6 +10,11 @@ #include #include #include +#include +#include +#include +#include +#include #if USE_AVRO && USE_PARQUET @@ -65,6 +70,14 @@ namespace DatabaseDataLakeSetting extern const DatabaseDataLakeSettingsString onelake_client_secret; extern const DatabaseDataLakeSettingsString dlf_access_key_id; extern const DatabaseDataLakeSettingsString dlf_access_key_secret; + extern const DatabaseDataLakeSettingsString google_project_id; + extern const DatabaseDataLakeSettingsString google_service_account; + extern const DatabaseDataLakeSettingsString google_metadata_service; + extern const DatabaseDataLakeSettingsString google_adc_client_id; + extern const DatabaseDataLakeSettingsString google_adc_client_secret; + extern const DatabaseDataLakeSettingsString google_adc_refresh_token; + extern const DatabaseDataLakeSettingsString google_adc_quota_project_id; + extern const DatabaseDataLakeSettingsString google_adc_credentials_file; } namespace Setting @@ -165,7 +178,7 @@ std::shared_ptr DatabaseDataLake::getCatalog() const } case DB::DatabaseDataLakeCatalogType::ICEBERG_ONELAKE: { - catalog_impl = std::make_shared( + catalog_impl = std::make_shared( settings[DatabaseDataLakeSetting::warehouse].value, url, settings[DatabaseDataLakeSetting::onelake_tenant_id].value, @@ -177,6 +190,75 @@ std::shared_ptr DatabaseDataLake::getCatalog() const Context::getGlobalContextInstance()); break; } + case DB::DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE: + { + std::string google_project_id = settings[DatabaseDataLakeSetting::google_project_id].value; + std::string google_service_account = settings[DatabaseDataLakeSetting::google_service_account].value; + std::string google_metadata_service = settings[DatabaseDataLakeSetting::google_metadata_service].value; + std::string google_adc_client_id = settings[DatabaseDataLakeSetting::google_adc_client_id].value; + std::string google_adc_client_secret = settings[DatabaseDataLakeSetting::google_adc_client_secret].value; + std::string google_adc_refresh_token = settings[DatabaseDataLakeSetting::google_adc_refresh_token].value; + std::string google_adc_quota_project_id = settings[DatabaseDataLakeSetting::google_adc_quota_project_id].value; + + if (settings[DatabaseDataLakeSetting::google_adc_credentials_file].changed) + { + try + { + const std::string & credentials_file_path = settings[DatabaseDataLakeSetting::google_adc_credentials_file].value; + DB::ReadBufferFromFile file_buf(credentials_file_path); + std::string json_str; + DB::readStringUntilEOF(json_str, file_buf); + + Poco::JSON::Parser parser; + Poco::Dynamic::Var json = parser.parse(json_str); + const Poco::JSON::Object::Ptr & object = json.extract(); + + if (object->has("type")) + { + String type = object->get("type").extract(); + if (type != "authorized_user") + { + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "Unsupported credentials type '{}' in Google ADC credentials file. Expected 'authorized_user'", + type); + } + } + + if (google_adc_client_id.empty() && object->has("client_id")) + google_adc_client_id = object->get("client_id").extract(); + if (google_adc_client_secret.empty() && object->has("client_secret")) + google_adc_client_secret = object->get("client_secret").extract(); + if (google_adc_refresh_token.empty() && object->has("refresh_token")) + google_adc_refresh_token = object->get("refresh_token").extract(); + if (google_adc_quota_project_id.empty() && object->has("quota_project_id")) + google_adc_quota_project_id = object->get("quota_project_id").extract(); + if (google_project_id.empty() && object->has("project_id")) + google_project_id = object->get("project_id").extract(); + } + catch (const DB::Exception & e) + { + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "Failed to load Google ADC credentials from file '{}': {}", + settings[DatabaseDataLakeSetting::google_adc_credentials_file].value, + e.message()); + } + } + + catalog_impl = std::make_shared( + settings[DatabaseDataLakeSetting::warehouse].value, + url, + google_project_id, + google_service_account, + google_metadata_service, + google_adc_client_id, + google_adc_client_secret, + google_adc_refresh_token, + google_adc_quota_project_id, + Context::getGlobalContextInstance()); + break; + } case DB::DatabaseDataLakeCatalogType::UNITY: { catalog_impl = std::make_shared( @@ -272,6 +354,7 @@ std::shared_ptr DatabaseDataLake::getConfigur } case DatabaseDataLakeCatalogType::ICEBERG_HIVE: case DatabaseDataLakeCatalogType::ICEBERG_REST: + case DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE: { switch (type) { @@ -448,7 +531,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con auto table_metadata = DataLake::TableMetadata().withSchema().withLocation().withDataLakeSpecificProperties(); const bool with_vended_credentials = settings[DatabaseDataLakeSetting::vended_credentials].value; - if (!lightweight && with_vended_credentials) + if (with_vended_credentials) table_metadata = table_metadata.withStorageCredentials(); auto [namespace_name, table_name] = DataLake::parseTableName(name); @@ -487,7 +570,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con /// so we have a separate setting to know whether we should even try to fetch them. if (args.size() == 1) { - std::array vended_credentials_catalogs = {DatabaseDataLakeCatalogType::ICEBERG_ONELAKE, DatabaseDataLakeCatalogType::PAIMON_REST}; + std::array vended_credentials_catalogs = {DatabaseDataLakeCatalogType::ICEBERG_ONELAKE, DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE, DatabaseDataLakeCatalogType::PAIMON_REST}; if (table_metadata.hasStorageCredentials()) { LOG_DEBUG(log, "Getting credentials"); @@ -558,7 +641,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con auto azure_configuration = std::static_pointer_cast(configuration); if (!azure_configuration) throw Exception(ErrorCodes::LOGICAL_ERROR, "Configuration is not azure type for one lake catalog"); - auto rest_catalog = std::static_pointer_cast(catalog); + auto rest_catalog = std::static_pointer_cast(catalog); if (!rest_catalog) throw Exception(ErrorCodes::LOGICAL_ERROR, "Catalog is not equals to one lake"); azure_configuration->setInitializationAsOneLake( @@ -946,6 +1029,7 @@ void registerDatabaseDataLake(DatabaseFactory & factory) { case DatabaseDataLakeCatalogType::ICEBERG_ONELAKE: case DatabaseDataLakeCatalogType::ICEBERG_REST: + case DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE: { if (!args.create_query.attach && !args.context->getSettingsRef()[Setting::allow_experimental_database_iceberg]) diff --git a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp index e35c8b3403a8..95651621ee88 100644 --- a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp +++ b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp @@ -34,6 +34,14 @@ namespace ErrorCodes DECLARE(String, onelake_tenant_id, "", "Tenant id from azure", 0) \ DECLARE(String, onelake_client_id, "", "Client id from azure", 0) \ DECLARE(String, onelake_client_secret, "", "Client secret from azure", 0) \ + DECLARE(String, google_project_id, "", "Google Cloud project ID for BigLake. Required for BigLake catalog. Used in x-goog-user-project header. If not set and google_adc_quota_project_id is provided, it latter will be used", 0) \ + DECLARE(String, google_service_account, "", "Google Cloud service account email for metadata service authentication. Default: 'default'. Only used when ADC credentials are not provided", 0) \ + DECLARE(String, google_metadata_service, "", "Google Cloud metadata service endpoint for token retrieval. Default: 'metadata.google.internal'. Only used when ADC credentials are not provided", 0) \ + DECLARE(String, google_adc_client_id, "", "Google Application Default Credentials client_id for BigLake. Required if using ADC authentication instead of metadata service", 0) \ + DECLARE(String, google_adc_client_secret, "", "Google Application Default Credentials client_secret for BigLake. Required if using ADC authentication instead of metadata service", 0) \ + DECLARE(String, google_adc_refresh_token, "", "Google Application Default Credentials refresh_token for BigLake. Required if using ADC authentication instead of metadata service", 0) \ + DECLARE(String, google_adc_quota_project_id, "", "Google Application Default Credentials quota_project_id for BigLake. Optional, used if google_project_id is not set", 0) \ + DECLARE(String, google_adc_credentials_file, "", "Path to JSON file containing Google Application Default Credentials. If set, credentials will be loaded from this file. File should contain: type, client_id, client_secret, refresh_token, and optionally quota_project_id", 0) \ DECLARE(String, dlf_access_key_id, "", "Access id of DLF token for Paimon REST Catalog", 0) \ DECLARE(String, dlf_access_key_secret, "", "Access secret of DLF token for Paimon REST Catalog", 0) \ diff --git a/src/Databases/DataLake/RestCatalog.cpp b/src/Databases/DataLake/RestCatalog.cpp index 6229e84eca84..5f9fb264a7af 100644 --- a/src/Databases/DataLake/RestCatalog.cpp +++ b/src/Databases/DataLake/RestCatalog.cpp @@ -1,7 +1,13 @@ #include +#include #include +#include +#include #include #include +#include +#include +#include #include "config.h" #if USE_AVRO @@ -18,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -144,17 +151,15 @@ RestCatalog::RestCatalog( update_token_if_expired = true; } else if (!auth_header_.empty()) + { auth_header = parseAuthHeader(auth_header_); - + } config = loadConfig(); } RestCatalog::RestCatalog( const std::string & warehouse_, const std::string & base_url_, - const std::string & onelake_tenant_id, - const std::string & onelake_client_id, - const std::string & onelake_client_secret, const std::string & auth_scope_, const std::string & oauth_server_uri_, bool oauth_server_use_request_body_, @@ -163,15 +168,10 @@ RestCatalog::RestCatalog( , DB::WithContext(context_) , base_url(correctAPIURI(base_url_)) , log(getLogger("RestCatalog(" + warehouse_ + ")")) - , tenant_id(onelake_tenant_id) - , client_id(onelake_client_id) - , client_secret(onelake_client_secret) , auth_scope(auth_scope_) , oauth_server_uri(oauth_server_uri_) , oauth_server_use_request_body(oauth_server_use_request_body_) { - update_token_if_expired = true; - config = loadConfig(); } @@ -222,25 +222,148 @@ DB::HTTPHeaderEntries RestCatalog::getAuthHeaders(bool update_token) const return DB::HTTPHeaderEntries{auth_header.value()}; } - /// Option 2: user provided grant_type, client_id and client_secret. + /// Option 2: user provided catalog_credential (client_id:client_secret). + /// We make OAuthClientCredentialsRequest + /// https://github.com/apache/iceberg/blob/3badfe0c1fcf0c0adfc7aa4a10f0b50365c48cf9/open-api/rest-catalog-open-api.yaml#L3498C5-L3498C34 + if (!client_id.empty()) + { + if (!access_token.has_value() || update_token || access_token->isExpired()) + { + access_token = retrieveAccessTokenOAuth(); + } + + DB::HTTPHeaderEntries headers; + headers.emplace_back("Authorization", "Bearer " + access_token->token); + return headers; + } + + return {}; +} + +AccessToken RestCatalog::retrieveAccessTokenOAuth() const +{ + static constexpr auto oauth_tokens_endpoint = "oauth/tokens"; + + Poco::URI url; + DB::ReadWriteBufferFromHTTP::OutStreamCallback out_stream_callback; + size_t body_size = 0; + String body; + + if (oauth_server_uri.empty() && !oauth_server_use_request_body) + { + url = Poco::URI(base_url / oauth_tokens_endpoint); + + Poco::URI::QueryParameters params = { + {"grant_type", "client_credentials"}, + {"scope", auth_scope}, + {"client_id", client_id}, + {"client_secret", client_secret}, + }; + url.setQueryParameters(params); + } + else + { + String encoded_auth_scope; + String encoded_client_id; + String encoded_client_secret; + Poco::URI::encode(auth_scope, auth_scope, encoded_auth_scope); + Poco::URI::encode(client_id, client_id, encoded_client_id); + Poco::URI::encode(client_secret, client_secret, encoded_client_secret); + + body = fmt::format( + "grant_type=client_credentials&scope={}&client_id={}&client_secret={}", + encoded_auth_scope, encoded_client_id, encoded_client_secret); + body_size = body.size(); + out_stream_callback = [&](std::ostream & os) + { + os << body; + }; + + if (oauth_server_uri.empty()) + url = Poco::URI(base_url / oauth_tokens_endpoint); + else + url = Poco::URI(oauth_server_uri); + } + + const auto & context = getContext(); + auto timeouts = DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()); + auto session = makeHTTPSession(DB::HTTPConnectionGroupType::HTTP, url, timeouts, {}); + + Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, url.getPathAndQuery(), + Poco::Net::HTTPMessage::HTTP_1_1); + request.setContentType("application/x-www-form-urlencoded"); + request.setContentLength(body_size); + request.set("Accept", "application/json"); + + std::ostream & os = session->sendRequest(request); + out_stream_callback(os); + + Poco::Net::HTTPResponse response; + std::istream & rs = session->receiveResponse(response); + + std::string json_str; + Poco::StreamCopier::copyToString(rs, json_str); + + Poco::JSON::Parser parser; + Poco::Dynamic::Var res_json = parser.parse(json_str); + const Poco::JSON::Object::Ptr & object = res_json.extract(); + + AccessToken token; + token.token = object->get("access_token").extract(); + + if (object->has("expires_in")) + { + Int64 expires_in = object->getValue("expires_in"); + token.expires_at = std::chrono::system_clock::now() + std::chrono::seconds(expires_in - 300); + } + + return token; +} + +OneLakeCatalog::OneLakeCatalog( + const std::string & warehouse_, + const std::string & base_url_, + const std::string & onelake_tenant_id, + const std::string & onelake_client_id, + const std::string & onelake_client_secret, + const std::string & auth_scope_, + const std::string & oauth_server_uri_, + bool oauth_server_use_request_body_, + DB::ContextPtr context_) + : RestCatalog(warehouse_, base_url_, auth_scope_, oauth_server_uri_, oauth_server_use_request_body_, context_) + , tenant_id(onelake_tenant_id) +{ + client_id = onelake_client_id; + client_secret = onelake_client_secret; + update_token_if_expired = true; + if (!client_id.empty() && !client_secret.empty()) + { + access_token = retrieveAccessToken(); + } + config = loadConfig(); +} + +DB::HTTPHeaderEntries OneLakeCatalog::getAuthHeaders(bool update_token) const +{ + /// User provided grant_type, client_id and client_secret. /// We would make OAuthClientCredentialsRequest /// https://github.com/apache/iceberg/blob/3badfe0c1fcf0c0adfc7aa4a10f0b50365c48cf9/open-api/rest-catalog-open-api.yaml#L3498C5-L3498C34 if (!client_id.empty()) { - if (!access_token.has_value() || update_token) + if (!access_token.has_value() || update_token || access_token->isExpired()) { access_token = retrieveAccessToken(); } DB::HTTPHeaderEntries headers; - headers.emplace_back("Authorization", "Bearer " + access_token.value()); + headers.emplace_back("Authorization", "Bearer " + access_token->token); return headers; } - return {}; + return RestCatalog::getAuthHeaders(update_token); } -std::string RestCatalog::retrieveAccessToken() const +AccessToken OneLakeCatalog::retrieveAccessToken() const { static constexpr auto oauth_tokens_endpoint = "oauth/tokens"; @@ -312,7 +435,257 @@ std::string RestCatalog::retrieveAccessToken() const Poco::Dynamic::Var res_json = parser.parse(json_str); const Poco::JSON::Object::Ptr & object = res_json.extract(); - return object->get("access_token").extract(); + AccessToken token; + token.token = object->get("access_token").extract(); + + if (object->has("expires_in")) + { + Int64 expires_in = object->getValue("expires_in"); + token.expires_at = std::chrono::system_clock::now() + std::chrono::seconds(expires_in - 300); /// 5 minutes buffer + } + + return token; +} + +BigLakeCatalog::BigLakeCatalog( + const std::string & warehouse_, + const std::string & base_url_, + const std::string & google_project_id_, + const std::string & google_service_account_, + const std::string & google_metadata_service_, + const std::string & google_adc_client_id_, + const std::string & google_adc_client_secret_, + const std::string & google_adc_refresh_token_, + const std::string & google_adc_quota_project_id_, + DB::ContextPtr context_) + : RestCatalog(warehouse_, base_url_, "", "", false, context_) + , google_project_id(google_project_id_) + , google_service_account(google_service_account_) + , google_metadata_service(google_metadata_service_) + , google_adc_client_id(google_adc_client_id_) + , google_adc_client_secret(google_adc_client_secret_) + , google_adc_refresh_token(google_adc_refresh_token_) + , google_adc_quota_project_id(google_adc_quota_project_id_) +{ + update_token_if_expired = true; + if (!google_project_id.empty() || !google_adc_client_id.empty()) + { + access_token = retrieveGoogleCloudAccessToken(); + } + config = loadConfig(); +} + +DB::HTTPHeaderEntries BigLakeCatalog::getAuthHeaders(bool update_token) const +{ + if (!google_project_id.empty() || !google_adc_client_id.empty()) + { + if (!access_token.has_value() || update_token || access_token->isExpired()) + { + access_token = retrieveGoogleCloudAccessToken(); + } + + DB::HTTPHeaderEntries headers; + headers.emplace_back("Authorization", "Bearer " + access_token->token); + + std::string project_id = google_project_id; + if (project_id.empty() && !google_adc_quota_project_id.empty()) + { + project_id = google_adc_quota_project_id; + } + + if (!project_id.empty()) + { + headers.emplace_back("x-goog-user-project", project_id); + } + + return headers; + } + + return RestCatalog::getAuthHeaders(update_token); +} + +AccessToken BigLakeCatalog::retrieveGoogleCloudAccessTokenFromRefreshToken() const +{ + static constexpr auto GOOGLE_OAUTH2_TOKEN_ENDPOINT = "https://oauth2.googleapis.com/token"; + + if (google_adc_client_id.empty() || google_adc_client_secret.empty() || google_adc_refresh_token.empty()) + { + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "Invalid ADC credentials: client_id, client_secret, and refresh_token are required"); + } + + Poco::URI url(GOOGLE_OAUTH2_TOKEN_ENDPOINT); + + String encoded_refresh_token; + String encoded_client_id; + String encoded_client_secret; + Poco::URI::encode(google_adc_refresh_token, google_adc_refresh_token, encoded_refresh_token); + Poco::URI::encode(google_adc_client_id, google_adc_client_id, encoded_client_id); + Poco::URI::encode(google_adc_client_secret, google_adc_client_secret, encoded_client_secret); + String body = fmt::format( + "grant_type=refresh_token&client_id={}&client_secret={}&refresh_token={}", + encoded_client_id, encoded_client_secret, encoded_refresh_token + ); + + const auto & context = getContext(); + auto timeouts = DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()); + DB::HTTPSessionPtr session; + + for (size_t i = 0; i < 5; ++i) + { + try + { + session = makeHTTPSession(DB::HTTPConnectionGroupType::HTTP, url, timeouts, {}); + break; + } + catch (...) + { + DB::tryLogCurrentException(log); + } + } + if (!session) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Session for big lake catalog is not initialized"); + + Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, url.getPathAndQuery(), + Poco::Net::HTTPMessage::HTTP_1_1); + request.setContentType("application/x-www-form-urlencoded"); + request.setContentLength(body.size()); + request.set("Accept", "application/json"); + + LOG_DEBUG(log, "Token endpoint host={} path={}", url.getHost(), url.getPathAndQuery()); + + std::ostream & os = session->sendRequest(request); + os << body; + + Poco::Net::HTTPResponse response; + std::istream & rs = session->receiveResponse(response); + + String token_json_raw; + Poco::StreamCopier::copyToString(rs, token_json_raw); + + if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK) + { + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "Failed to request Google Cloud access token using refresh token: {} (status: {})", + response.getReason(), + static_cast(response.getStatus())); + } + + Poco::JSON::Parser parser; + auto object = parser.parse(token_json_raw).extract(); + + if (!object->has("access_token") || !object->has("token_type")) + { + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "Unexpected structure of Google Cloud token response. Response should have fields: 'access_token', 'token_type'"); + } + + auto token_type = object->getValue("token_type"); + if (token_type != "Bearer") + { + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "Unexpected token type in Google Cloud response. Expected Bearer token, got {}", + token_type); + } + + AccessToken token; + token.token = object->getValue("access_token"); + + if (object->has("expires_in")) + { + Int64 expires_in = object->getValue("expires_in"); + token.expires_at = std::chrono::system_clock::now() + std::chrono::seconds(expires_in - 300); /// 5 minutes buffer + } + + return token; +} + +AccessToken BigLakeCatalog::retrieveGoogleCloudAccessToken() const +{ + if (!google_adc_client_id.empty() && !google_adc_client_secret.empty() && !google_adc_refresh_token.empty()) + { + try + { + return retrieveGoogleCloudAccessTokenFromRefreshToken(); + } + catch (const DB::Exception & e) + { + LOG_DEBUG(log, "Failed to use ADC credentials, falling back to metadata service: {}", e.what()); + } + } + + /// Fallback to GCP metadata service (works inside GCP infrastructure) + /// https://cloud.google.com/compute/docs/metadata/overview + static constexpr auto DEFAULT_REQUEST_TOKEN_PATH = "/computeMetadata/v1/instance/service-accounts"; + + Poco::URI url; + url.setScheme("http"); + url.setHost(google_metadata_service); + url.setPath(fmt::format("{}/{}/token", DEFAULT_REQUEST_TOKEN_PATH, google_service_account)); + + Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.toString(), Poco::Net::HTTPRequest::HTTP_1_1); + request.add("metadata-flavor", "Google"); + + LOG_DEBUG(log, "Requesting Google Cloud access token from metadata service: {}", url.toString()); + + const auto & context = getContext(); + auto timeouts = DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()); + auto session = makeHTTPSession(DB::HTTPConnectionGroupType::HTTP, url, timeouts, {}); + + if (!session) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Can not create HTTP session"); + session->sendRequest(request); + + Poco::Net::HTTPResponse response; + auto & in = session->receiveResponse(response); + + if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK) + { + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "Failed to request Google Cloud bearer token from metadata service: {} (status: {})", + response.getReason(), + static_cast(response.getStatus())); + } + + String token_json_raw; + Poco::StreamCopier::copyToString(in, token_json_raw); + + LOG_DEBUG(log, "Received Google Cloud token response from metadata service"); + + Poco::JSON::Parser parser; + auto object = parser.parse(token_json_raw).extract(); + + if (!object->has("access_token") || !object->has("expires_in") || !object->has("token_type")) + { + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "Unexpected structure of Google Cloud token response. Response should have fields: 'access_token', 'expires_in', 'token_type'"); + } + + auto token_type = object->getValue("token_type"); + if (token_type != "Bearer") + { + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "Unexpected token type in Google Cloud response. Expected Bearer token, got {}", + token_type); + } + + AccessToken token; + token.token = object->getValue("access_token"); + + if (object->has("expires_in")) + { + Int64 expires_in = object->getValue("expires_in"); + token.expires_at = std::chrono::system_clock::now() + std::chrono::seconds(expires_in - 300); + } + + return token; } std::optional RestCatalog::getStorageType() const @@ -523,6 +896,11 @@ RestCatalog::Namespaces RestCatalog::parseNamespaces(DB::ReadBuffer & buf, const const int idx = static_cast(current_namespace_array->size()) - 1; const auto current_namespace = current_namespace_array->get(idx).extract(); + /// Biglake for some unknown reason can return `base_namespace` as child of `base_namespace` + if (getCatalogType() == DB::DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE && !base_namespace.empty() && current_namespace == base_namespace) + { + continue; + } const auto full_namespace = base_namespace.empty() ? current_namespace : base_namespace + "." + current_namespace; diff --git a/src/Databases/DataLake/RestCatalog.h b/src/Databases/DataLake/RestCatalog.h index f9ce53729ce4..7dd7e2a5d823 100644 --- a/src/Databases/DataLake/RestCatalog.h +++ b/src/Databases/DataLake/RestCatalog.h @@ -8,6 +8,7 @@ #include #include #include +#include #include namespace DB @@ -18,7 +19,20 @@ class ReadBuffer; namespace DataLake { -class RestCatalog final : public ICatalog, private DB::WithContext +struct AccessToken +{ + std::string token; + std::optional expires_at; + + bool isExpired() const + { + if (!expires_at.has_value()) + return false; + return std::chrono::system_clock::now() >= expires_at.value(); + } +}; + +class RestCatalog : public ICatalog, public DB::WithContext { public: explicit RestCatalog( @@ -31,17 +45,6 @@ class RestCatalog final : public ICatalog, private DB::WithContext bool oauth_server_use_request_body_, DB::ContextPtr context_); - explicit RestCatalog( - const std::string & warehouse_, - const std::string & base_url_, - const std::string & onelake_tenant_id, - const std::string & onelake_client_id, - const std::string & onelake_client_secret, - const std::string & auth_scope_, - const std::string & oauth_server_uri_, - bool oauth_server_use_request_body_, - DB::ContextPtr context_); - ~RestCatalog() override = default; bool empty() const override; @@ -64,9 +67,7 @@ class RestCatalog final : public ICatalog, private DB::WithContext DB::DatabaseDataLakeCatalogType getCatalogType() const override { - if (tenant_id.empty()) - return DB::DatabaseDataLakeCatalogType::ICEBERG_REST; - return DB::DatabaseDataLakeCatalogType::ICEBERG_ONELAKE; + return DB::DatabaseDataLakeCatalogType::ICEBERG_REST; } void createTable(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr metadata_content) const override; @@ -77,11 +78,18 @@ class RestCatalog final : public ICatalog, private DB::WithContext void dropTable(const String & namespace_name, const String & table_name) const override; - String getTenantId() const { return tenant_id; } String getClientId() const { return client_id; } String getClientSecret() const { return client_secret; } -private: +protected: + RestCatalog( + const std::string & warehouse_, + const std::string & base_url_, + const std::string & auth_scope_, + const std::string & oauth_server_uri_, + bool oauth_server_use_request_body_, + DB::ContextPtr context_); + void createNamespaceIfNotExists(const String & namespace_name, const String & location) const; struct Config @@ -105,15 +113,14 @@ class RestCatalog final : public ICatalog, private DB::WithContext /// Auth headers of format: "Authorization": " " std::optional auth_header; - /// Parameters for OAuth. + /// Parameters for OAuth (common for REST catalog). bool update_token_if_expired = false; - std::string tenant_id; std::string client_id; std::string client_secret; std::string auth_scope; std::string oauth_server_uri; bool oauth_server_use_request_body; - mutable std::optional access_token; + mutable std::optional access_token; Poco::Net::HTTPBasicCredentials credentials{}; @@ -147,8 +154,8 @@ class RestCatalog final : public ICatalog, private DB::WithContext TableMetadata & result) const; Config loadConfig(); - std::string retrieveAccessToken() const; - DB::HTTPHeaderEntries getAuthHeaders(bool update_token = false) const; + virtual DB::HTTPHeaderEntries getAuthHeaders(bool update_token) const; + AccessToken retrieveAccessTokenOAuth() const; static void parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result); void sendRequest( @@ -158,6 +165,70 @@ class RestCatalog final : public ICatalog, private DB::WithContext bool ignore_result = false) const; }; +class OneLakeCatalog : public RestCatalog +{ +public: + explicit OneLakeCatalog( + const std::string & warehouse_, + const std::string & base_url_, + const std::string & onelake_tenant_id, + const std::string & onelake_client_id, + const std::string & onelake_client_secret, + const std::string & auth_scope_, + const std::string & oauth_server_uri_, + bool oauth_server_use_request_body_, + DB::ContextPtr context_); + + DB::DatabaseDataLakeCatalogType getCatalogType() const override + { + return DB::DatabaseDataLakeCatalogType::ICEBERG_ONELAKE; + } + + DB::HTTPHeaderEntries getAuthHeaders(bool update_token) const override; + + String getTenantId() const { return tenant_id; } + +protected: + const std::string tenant_id; + + AccessToken retrieveAccessToken() const; +}; + +class BigLakeCatalog : public RestCatalog +{ +public: + explicit BigLakeCatalog( + const std::string & warehouse_, + const std::string & base_url_, + const std::string & google_project_id_, + const std::string & google_service_account_, + const std::string & google_metadata_service_, + const std::string & google_adc_client_id_, + const std::string & google_adc_client_secret_, + const std::string & google_adc_refresh_token_, + const std::string & google_adc_quota_project_id_, + DB::ContextPtr context_); + + DB::DatabaseDataLakeCatalogType getCatalogType() const override + { + return DB::DatabaseDataLakeCatalogType::ICEBERG_BIGLAKE; + } + + DB::HTTPHeaderEntries getAuthHeaders(bool update_token) const override; + +private: + const std::string google_project_id; + const std::string google_service_account; + const std::string google_metadata_service; + const std::string google_adc_client_id; + const std::string google_adc_client_secret; + const std::string google_adc_refresh_token; + const std::string google_adc_quota_project_id; + + AccessToken retrieveGoogleCloudAccessToken() const; + AccessToken retrieveGoogleCloudAccessTokenFromRefreshToken() const; +}; + } #endif