slfan1989 commented on code in PR #5169:
URL: https://github.com/apache/hadoop/pull/5169#discussion_r1043266394
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java:
##########
@@ -1353,45 +1386,426 @@ public Connection getConn() {
return conn;
}
+ /**
+ * SQLFederationStateStore Supports Store New MasterKey.
+ *
+ * @param request The request contains RouterMasterKey, which is an
abstraction for DelegationKey.
+ * @return routerMasterKeyResponse, the response contains the
RouterMasterKey.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ * @throws IOException An IO Error occurred.
+ */
@Override
public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest
request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ // Step1: Verify parameters to ensure that key fields are not empty.
+ FederationRouterRMTokenInputValidator.validate(request);
+
+ // Step2: Parse the parameters and serialize the DelegationKey as a string.
+ DelegationKey delegationKey = convertMasterKeyToDelegationKey(request);
+ int keyId = delegationKey.getKeyId();
+ String delegationKeyStr =
FederationStateStoreUtils.encodeWritable(delegationKey);
+
+ // Step3. store data in database.
+ try {
+
+ FederationSQLOutParameter<Integer> rowCountOUT =
+ new FederationSQLOutParameter<>("rowCount_OUT",
java.sql.Types.INTEGER, Integer.class);
+
+ // Execute the query
+ long startTime = clock.getTime();
+ Integer rowCount = getRowCountByProcedureSQL(CALL_SP_ADD_MASTERKEY,
keyId,
+ delegationKeyStr, rowCountOUT);
+ long stopTime = clock.getTime();
+
+ // We hope that 1 record can be written to the database.
+ // If the number of records is not 1, it means that the data was written
incorrectly.
+ if (rowCount != 1) {
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Wrong behavior during the insertion of masterKey, keyId = %s. " +
+ "please check the records of the database.",
String.valueOf(keyId));
+ }
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime -
startTime);
+ } catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+ "Unable to insert the newly masterKey, keyId = %s.",
String.valueOf(keyId));
+ }
+
+ // Step4. Query Data from the database and return the result.
+ return getMasterKeyByDelegationKey(request);
}
+ /**
+ * SQLFederationStateStore Supports Remove MasterKey.
+ *
+ * Defined the sp_deleteMasterKey procedure.
+ * This procedure requires 1 input parameters, 1 output parameters.
+ * Input parameters
+ * 1. IN keyId_IN int
+ * Output parameters
+ * 2. OUT rowCount_OUT int
+ *
+ * @param request The request contains RouterMasterKey, which is an
abstraction for DelegationKey
+ * @return routerMasterKeyResponse, the response contains the
RouterMasterKey.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ * @throws IOException An IO Error occurred.
+ */
@Override
public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest
request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ // Step1: Verify parameters to ensure that key fields are not empty.
+ FederationRouterRMTokenInputValidator.validate(request);
+
+ // Step2: Parse parameters and get KeyId.
+ RouterMasterKey paramMasterKey = request.getRouterMasterKey();
+ int paramKeyId = paramMasterKey.getKeyId();
+
+ // Step3. Clear data from database.
+ try {
+
+ // Execute the query
+ long startTime = clock.getTime();
+ FederationSQLOutParameter<Integer> rowCountOUT =
+ new FederationSQLOutParameter<>("rowCount_OUT",
java.sql.Types.INTEGER, Integer.class);
+ Integer rowCount = getRowCountByProcedureSQL(CALL_SP_DELETE_MASTERKEY,
+ paramKeyId, rowCountOUT);
+ long stopTime = clock.getTime();
+
+ // if it is equal to 0 it means the call
+ // did not delete the reservation from FederationStateStore
+ if (rowCount == 0) {
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "masterKeyId = %s does not exist.", String.valueOf(paramKeyId));
+ } else if (rowCount != 1) {
+ // if it is different from 1 it means the call
+ // had a wrong behavior. Maybe the database is not set correctly.
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Wrong behavior during deleting the keyId %s. " +
+ "The database is expected to delete 1 record, " +
+ "but the number of deleted records returned by the database is
greater than 1, " +
+ "indicating that a duplicate masterKey occurred during the
deletion process.",
+ paramKeyId);
+ }
+
+ LOG.info("Delete from the StateStore the keyId: {}.", paramKeyId);
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime -
startTime);
+ return RouterMasterKeyResponse.newInstance(paramMasterKey);
+ } catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+ "Unable to delete the keyId %s.", paramKeyId);
+ }
+
+ throw new YarnException("Unable to delete the masterKey, keyId = " +
paramKeyId);
}
+ /**
+ * SQLFederationStateStore Supports Remove MasterKey.
+ *
+ * Defined the sp_getMasterKey procedure.
+ * this procedure requires 2 parameters.
+ * Input parameters:
+ * 1. IN keyId_IN int
+ * Output parameters:
+ * 2. OUT masterKey_OUT varchar(1024)
+ *
+ * @param request The request contains RouterMasterKey, which is an
abstraction for DelegationKey
+ * @return routerMasterKeyResponse, the response contains the
RouterMasterKey.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ * @throws IOException An IO Error occurred.
+ */
@Override
public RouterMasterKeyResponse
getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+ // Step1: Verify parameters to ensure that key fields are not empty.
+ FederationRouterRMTokenInputValidator.validate(request);
+
+ // Step2: Parse parameters and get KeyId.
+ RouterMasterKey paramMasterKey = request.getRouterMasterKey();
+ int paramKeyId = paramMasterKey.getKeyId();
+
+ // Step3: Call the stored procedure to get the result.
+ try {
+
+ FederationQueryRunner runner = new FederationQueryRunner();
+ FederationSQLOutParameter<String> masterKeyOUT =
+ new FederationSQLOutParameter<>("masterKey_OUT",
java.sql.Types.VARCHAR, String.class);
+
+ // Execute the query
+ long startTime = clock.getTime();
+ RouterMasterKey routerMasterKey = runner.execute(
+ conn, CALL_SP_GET_MASTERKEY, new RouterMasterKeyHandler(),
paramKeyId, masterKeyOUT);
+ long stopTime = clock.getTime();
+
+ LOG.info("Got the information about the specified masterKey = {}
according to keyId = {}.",
+ routerMasterKey, paramKeyId);
+
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime -
startTime);
+
+ // Return query result.
+ return RouterMasterKeyResponse.newInstance(routerMasterKey);
+
+ } catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+ "Unable to obtain the masterKey information according to %s.",
+ String.valueOf(paramKeyId));
+ }
+
+ // Throw exception information
+ throw new YarnException(
+ "Unable to obtain the masterKey information according to " +
paramKeyId);
}
+ /**
+ * SQLFederationStateStore Supports Store RMDelegationTokenIdentifier.
+ *
+ * Defined the sp_addDelegationToken procedure.
+ * This procedure requires 4 input parameters, 1 output parameters.
+ * Input parameters:
+ * 1. IN sequenceNum_IN int
+ * 2. IN tokenIdent_IN varchar(1024)
+ * 3. IN token_IN varchar(1024)
+ * 4. IN renewDate_IN bigint
+ * Output parameters:
+ * 5. OUT rowCount_OUT int
+ *
+ * @param request The request contains RouterRMToken
(RMDelegationTokenIdentifier and renewDate)
+ * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ * @throws IOException An IO Error occurred.
+ */
@Override
public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ // Step1: Verify parameters to ensure that key fields are not empty.
+ FederationRouterRMTokenInputValidator.validate(request);
+
+ // Step2. store data in database.
+ try {
+ long duration = addOrUpdateToken(request, true);
+ FederationStateStoreClientMetrics.succeededStateStoreCall(duration);
+ } catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ throw new YarnException(e);
+ }
+
+ // Step3. Query Data from the database and return the result.
+ return getTokenByRouterStoreToken(request);
}
+ /**
+ * SQLFederationStateStore Supports Update RMDelegationTokenIdentifier.
+ *
+ * Defined the sp_updateDelegationToken procedure.
+ * This procedure requires 4 input parameters, 1 output parameters.
+ * Input parameters:
+ * 1. IN sequenceNum_IN int
+ * 2. IN tokenIdent_IN varchar(1024)
+ * 3. IN token_IN varchar(1024)
+ * 4. IN renewDate_IN bigint
+ * Output parameters:
+ * 5. OUT rowCount_OUT int
+ *
+ * @param request The request contains RouterRMToken
(RMDelegationTokenIdentifier and renewDate)
+ * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ * @throws IOException An IO Error occurred.
+ */
@Override
public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ // Step1: Verify parameters to ensure that key fields are not empty.
+ FederationRouterRMTokenInputValidator.validate(request);
+
+ // Step2. update data in database.
+ try {
+ long duration = addOrUpdateToken(request, false);
+ FederationStateStoreClientMetrics.succeededStateStoreCall(duration);
+ } catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ throw new YarnException(e);
+ }
+
+ // Step3. Query Data from the database and return the result.
+ return getTokenByRouterStoreToken(request);
}
+ /**
+ * Add Or Update RMDelegationTokenIdentifier.
+ *
+ * @param request The request contains RouterRMToken
(RMDelegationTokenIdentifier and renewDate)
+ * @param isAdd true, addData; false, updateData.
+ * @return method operation time.
+ * @throws IOException An IO Error occurred.
+ * @throws SQLException An SQL Error occurred.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ */
+ private long addOrUpdateToken(RouterRMTokenRequest request, boolean isAdd)
+ throws IOException, SQLException, YarnException {
+
+ // Parse parameters and get KeyId.
+ RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+ YARNDelegationTokenIdentifier identifier =
routerStoreToken.getTokenIdentifier();
+ String tokenIdentifier =
FederationStateStoreUtils.encodeWritable(identifier);
+ String tokenInfo = routerStoreToken.getTokenInfo();
+ long renewDate = routerStoreToken.getRenewDate();
+ int sequenceNum = identifier.getSequenceNumber();
+
+ FederationQueryRunner runner = new FederationQueryRunner();
+ FederationSQLOutParameter<Integer> rowCountOUT =
+ new FederationSQLOutParameter<>("rowCount_OUT",
java.sql.Types.INTEGER, Integer.class);
+
+ // Execute the query
+ long startTime = clock.getTime();
+ String procedure = isAdd ? CALL_SP_ADD_DELEGATIONTOKEN :
CALL_SP_UPDATE_DELEGATIONTOKEN;
+ Integer rowCount = runner.execute(conn, procedure, new
RowCountHandler("rowCount_OUT"),
+ sequenceNum, tokenIdentifier, tokenInfo, renewDate, rowCountOUT);
+ long stopTime = clock.getTime();
+
+ // Get rowCount
+ // In the process of updating the code, rowCount may be 0 or 1;
+ // if rowCount=1, it is as expected, indicating that we have updated the
Token correctly;
+ // if rowCount=0, it is not as expected,
+ // indicating that we have not updated the Token correctly.
+ if (rowCount != 1) {
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Wrong behavior during the insertion of delegationToken, tokenId =
%s. " +
+ "Please check the records of the database.",
String.valueOf(sequenceNum));
+ }
+
+ // return execution time
+ return (stopTime - startTime);
+ }
+
+ /**
+ * SQLFederationStateStore Supports Remove RMDelegationTokenIdentifier.
+ *
+ * Defined the sp_deleteDelegationToken procedure.
+ * This procedure requires 1 input parameters, 1 output parameters.
+ * Input parameters:
+ * 1. IN sequenceNum_IN bigint
+ * Output parameters:
+ * 2. OUT rowCount_OUT int
+ *
+ * @param request The request contains RouterRMToken
(RMDelegationTokenIdentifier and renewDate)
+ * @return routerRMTokenResponse, the response contains the RouterStoreToken.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ * @throws IOException An IO Error occurred.
+ */
@Override
public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+
+ // Step1: Verify parameters to ensure that key fields are not empty.
+ FederationRouterRMTokenInputValidator.validate(request);
+
+ // Step2: Parse parameters and get KeyId.
+ RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+ YARNDelegationTokenIdentifier identifier =
routerStoreToken.getTokenIdentifier();
+ int sequenceNum = identifier.getSequenceNumber();
+
+ try {
+
+ FederationSQLOutParameter<Integer> rowCountOUT =
+ new FederationSQLOutParameter<>("rowCount_OUT",
java.sql.Types.INTEGER, Integer.class);
+
+ // Execute the query
+ long startTime = clock.getTime();
+ Integer rowCount =
getRowCountByProcedureSQL(CALL_SP_DELETE_DELEGATIONTOKEN,
+ sequenceNum, rowCountOUT);
+ long stopTime = clock.getTime();
+
+ // if it is equal to 0 it means the call
+ // did not delete the reservation from FederationStateStore
+ if (rowCount == 0) {
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "TokenId %s does not exist", String.valueOf(sequenceNum));
+ } else if (rowCount != 1) {
+ // if it is different from 1 it means the call
+ // had a wrong behavior. Maybe the database is not set correctly.
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Wrong behavior during deleting the delegationToken %s. " +
+ "The database is expected to delete 1 record, " +
+ "but the number of deleted records returned by the database is
greater than 1, " +
+ "indicating that a duplicate tokenId occurred during the deletion
process.",
+ String.valueOf(sequenceNum));
+ }
+
+ LOG.info("Delete from the StateStore the delegationToken, tokenId =
{}.", sequenceNum);
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime -
startTime);
+ return RouterRMTokenResponse.newInstance(routerStoreToken);
+ } catch (SQLException e) {
+ FederationStateStoreClientMetrics.failedStateStoreCall();
+ FederationStateStoreUtils.logAndThrowRetriableException(e, LOG,
+ "Unable to delete the delegationToken, tokenId = %s.", sequenceNum);
+ }
+ throw new YarnException("Unable to delete the delegationToken, tokenId = "
+ sequenceNum);
}
+ /**
+ * The Router Supports GetTokenByRouterStoreToken.
+ *
+ * @param request The request contains RouterRMToken
(RMDelegationTokenIdentifier and renewDate)
+ * @return RouterRMTokenResponse.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ * @throws IOException An IO Error occurred.
+ */
@Override
public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest
request)
throws YarnException, IOException {
- throw new NotImplementedException("Code is not implemented");
+ // Step1: Verify parameters to ensure that key fields are not empty.
+ FederationRouterRMTokenInputValidator.validate(request);
+
+ // Step2: Parse parameters and get KeyId.
+ RouterStoreToken routerStoreToken = request.getRouterStoreToken();
+ YARNDelegationTokenIdentifier identifier =
routerStoreToken.getTokenIdentifier();
+ int sequenceNum = identifier.getSequenceNumber();
+
+ try {
+ FederationQueryRunner runner = new FederationQueryRunner();
+ FederationSQLOutParameter<String> tokenIdentOUT =
+ new FederationSQLOutParameter<>("tokenIdent_OUT",
java.sql.Types.VARCHAR, String.class);
Review Comment:
Thank you very much for helping to review the code, I will fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]