[ https://issues.apache.org/jira/browse/GEODE-2736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15951203#comment-15951203 ]
ASF GitHub Bot commented on GEODE-2736: --------------------------------------- Github user PivotalSarge commented on a diff in the pull request: https://github.com/apache/geode-native/pull/82#discussion_r109194938 --- Diff: src/cppcache/integration-test/testThinClientPoolExecuteFunctionThrowsException.cpp --- @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define ROOT_NAME "testThinClientPoolExecuteFunctionThrowsException" + +#include "fw_dunit.hpp" +#include "ThinClientHelper.hpp" +#include "testobject/VariousPdxTypes.hpp" + +#include <thread> +#include <chrono> + +using namespace PdxTests; +/* This is to test +1- funtion execution on pool + */ + +#define CLIENT1 s1p1 +#define LOCATOR1 s2p1 +#define SERVER s2p2 + +bool isLocalServer = false; +bool isLocator = false; +bool isPoolWithEndpoint = false; + +const char* locHostPort = + CacheHelper::getLocatorHostPort(isLocator, isLocalServer, 1); +const char* poolRegNames[] = {"partition_region", "PoolRegion2"}; + +const char* serverGroup = "ServerGroup1"; + +char* getFuncIName = (char*)"MultiGetFunctionI"; +char* putFuncIName = (char*)"MultiPutFunctionI"; +char* getFuncName = (char*)"MultiGetFunction"; +char* putFuncName = (char*)"MultiPutFunction"; +char* rjFuncName = (char*)"RegionOperationsFunction"; +char* exFuncName = (char*)"ExceptionHandlingFunction"; +char* exFuncNameSendException = (char*)"executeFunction_SendException"; +char* exFuncNamePdxType = (char*)"PdxFunctionTest"; +char* FEOnRegionPrSHOP = (char*)"FEOnRegionPrSHOP"; +char* FEOnRegionPrSHOP_OptimizeForWrite = + (char*)"FEOnRegionPrSHOP_OptimizeForWrite"; +char* FETimeOut = (char*)"FunctionExecutionTimeOut"; + +#define verifyGetResults() \ + bool found = false; \ + for (int j = 0; j < 34; j++) { \ + if (j % 2 == 0) continue; \ + sprintf(buf, "VALUE--%d", j); \ + if (strcmp(buf, dynCast<CacheableStringPtr>(resultList->operator[](i)) \ + ->asChar()) == 0) { \ + LOGINFO( \ + "buf = %s " \ + "dynCast<CacheableStringPtr>(resultList->operator[](i))->asChar() " \ + "= %s ", \ + buf, \ + dynCast<CacheableStringPtr>(resultList->operator[](i))->asChar()); \ + found = true; \ + break; \ + } \ + } \ + ASSERT(found, "this returned value is invalid"); + +#define verifyGetKeyResults() \ + bool found = false; \ + for (int j = 0; j < 34; j++) { \ + if (j % 2 == 0) continue; \ + sprintf(buf, "KEY--%d", j); \ + if (strcmp(buf, dynCast<CacheableStringPtr>(resultList->operator[](i)) \ + ->asChar()) == 0) { \ + LOGINFO( \ + "buf = %s " \ + "dynCast<CacheableStringPtr>(resultList->operator[](i))->asChar() " \ + "= %s ", \ + buf, \ + dynCast<CacheableStringPtr>(resultList->operator[](i))->asChar()); \ + found = true; \ + break; \ + } \ + } \ + ASSERT(found, "this returned KEY is invalid"); + +#define verifyPutResults() \ + bool found = false; \ + for (int j = 0; j < 34; j++) { \ + if (j % 2 == 0) continue; \ + sprintf(buf, "KEY--%d", j); \ + if (strcmp(buf, value->asChar()) == 0) { \ + found = true; \ + break; \ + } \ + } \ + ASSERT(found, "this returned value is invalid"); +class MyResultCollector : public ResultCollector { + public: + MyResultCollector() + : m_resultList(CacheableVector::create()), + m_isResultReady(false), + m_endResultCount(0), + m_addResultCount(0), + m_getResultCount(0) {} + ~MyResultCollector() {} + CacheableVectorPtr getResult(uint32_t timeout) { + m_getResultCount++; + if (m_isResultReady == true) { + return m_resultList; + } else { + for (uint32_t i = 0; i < timeout; i++) { + SLEEP(1); + if (m_isResultReady == true) return m_resultList; + } + throw FunctionExecutionException( + "Result is not ready, endResults callback is called before invoking " + "getResult() method"); + } + } + + void addResult(CacheablePtr& resultItem) { + m_addResultCount++; + if (resultItem == NULLPTR) { + return; + } + try { + CacheableArrayListPtr result = dynCast<CacheableArrayListPtr>(resultItem); + for (int32_t i = 0; i < result->size(); i++) { + m_resultList->push_back(result->operator[](i)); + } + } catch (ClassCastException) { + UserFunctionExecutionExceptionPtr result = + dynCast<UserFunctionExecutionExceptionPtr>(resultItem); + m_resultList->push_back(result); + } + } + void endResults() { + m_isResultReady = true; + m_endResultCount++; + } + uint32_t getEndResultCount() { return m_endResultCount; } + uint32_t getAddResultCount() { return m_addResultCount; } + uint32_t getGetResultCount() { return m_getResultCount; } + + private: + CacheableVectorPtr m_resultList; + volatile bool m_isResultReady; + uint32_t m_endResultCount; + uint32_t m_addResultCount; + uint32_t m_getResultCount; +}; +typedef SharedPtr<MyResultCollector> MyResultCollectorPtr; + +DUNIT_TASK_DEFINITION(LOCATOR1, StartLocator1) +{ + // starting locator + if (isLocator) { + CacheHelper::initLocator(1); + LOG("Locator1 started"); + } +} +END_TASK_DEFINITION + +DUNIT_TASK_DEFINITION(SERVER, StartS12) +{ + const char* lhp = NULL; + if (!isPoolWithEndpoint) lhp = locHostPort; + if (isLocalServer) { + CacheHelper::initServer(1, "func_cacheserver1_pool.xml", lhp); + } + if (isLocalServer) { + CacheHelper::initServer(2, "func_cacheserver2_pool.xml", lhp); + } +} +END_TASK_DEFINITION + +DUNIT_TASK_DEFINITION(CLIENT1, StartC1) +{ + // initClient(true); + initClientWithPool(true, NULL, locHostPort, serverGroup, NULLPTR, 0, true, + -1, -1, 60000, /*singlehop*/ true, + /*threadLocal*/ true); + // createPool(poolName, locHostPort,serverGroup, NULL, 0, true ); + // createRegionAndAttachPool(poolRegNames[0],USE_ACK, poolName); + + RegionPtr regPtr0 = + createRegionAndAttachPool(poolRegNames[0], USE_ACK, NULL); + ; // getHelper()->createRegion( poolRegNames[0], USE_ACK); + regPtr0->registerAllKeys(); + + LOG("Clnt1Init complete."); +} +END_TASK_DEFINITION + +DUNIT_TASK_DEFINITION(CLIENT1, Client1OpTest) +{ + RegionPtr regPtr0 = getHelper()->getRegion(poolRegNames[0]); + char buf[128]; + + for (int i = 0; i < 34; i++) { + sprintf(buf, "VALUE--%d", i); + CacheablePtr value(CacheableString::create(buf)); + + sprintf(buf, "KEY--%d", i); + CacheableKeyPtr key = CacheableKey::create(buf); + regPtr0->put(key, value); + } + std::this_thread::sleep_for(std::chrono::seconds(10)); // let the put finish + + //-----------------------Test with sendException + // onRegion-------------------------------// + + for (int i = 1; i <= 200; i++) { + CacheablePtr value(CacheableInt32::create(i)); + + sprintf(buf, "execKey-%d", i); + CacheableKeyPtr key = CacheableKey::create(buf); + regPtr0->put(key, value); + } + LOG("Put for execKey's on region complete."); + + LOG("Adding filter"); + CacheableArrayListPtr arrList = CacheableArrayList::create(); + for (int i = 100; i < 120; i++) { + sprintf(buf, "execKey-%d", i); + CacheableKeyPtr key = CacheableKey::create(buf); + arrList->push_back(key); + } + + CacheableVectorPtr filter = CacheableVector::create(); + for (int i = 100; i < 120; i++) { + sprintf(buf, "execKey-%d", i); + CacheableKeyPtr key = CacheableKey::create(buf); + filter->push_back(key); + } + LOG("Adding filter done."); + + CacheablePtr args = CacheableBoolean::create(1); + + ExecutionPtr funcExec = FunctionService::onRegion(regPtr0); + ASSERT(funcExec != NULLPTR, "onRegion Returned NULL"); + + ResultCollectorPtr collector = + funcExec->withArgs(args)->withFilter(filter)->execute( + exFuncNameSendException, 15); + ASSERT(collector != NULLPTR, "onRegion collector NULL"); + + CacheableVectorPtr result = collector->getResult(); + + if (result == NULLPTR) { + ASSERT(false, "echo String : result is NULL"); + } else { + try { + for (int i = 0; i < result->size(); i++) { + UserFunctionExecutionExceptionPtr uFEPtr = + dynCast<UserFunctionExecutionExceptionPtr>( + result->operator[](i)); + ASSERT(uFEPtr != NULLPTR, "uFEPtr exception is NULL"); + LOGINFO("Done casting to uFEPtr"); + LOGINFO("Read expected uFEPtr exception %s ", + uFEPtr->getMessage()->asChar()); + } + } catch (ClassCastException& ex) { + std::string logmsg = ""; + logmsg += ex.getName(); + logmsg += ": "; + logmsg += ex.getMessage(); + LOG(logmsg.c_str()); + ex.printStackTrace(); + FAIL( + "exFuncNameSendException casting to string for bool argument " + "exception."); + } catch (...) { + FAIL( + "exFuncNameSendException casting to string for bool argument " + "Unknown exception."); + } + } + + LOG("exFuncNameSendException done for bool argument."); + + collector = funcExec->withArgs(arrList)->withFilter(filter)->execute( + exFuncNameSendException, 15); + ASSERT(collector != NULLPTR, "onRegion collector for arrList NULL"); + std::this_thread::sleep_for(std::chrono::seconds(2)); + + try { + CacheableVectorPtr fil = CacheableVector::create(); + fil->push_back(CacheableInt32::create(1)); + ExecutionPtr exe = FunctionService::onRegion(regPtr0); + + LOGINFO("Executing the exception test it is expected to throw."); + CacheableVectorPtr executeFunctionResult3 = + funcExec->withArgs(arrList)->withFilter(filter)->execute("ThinClientRegionExceptionTest", 15)->getResult(); --- End diff -- Is the expectation that this method invocation will throw an exception? If so, it probably should be followed in the try block by a FAIL() to catch the cases where it doesn't. > Native client thread pool get stuck sometimes during rolling restart > -------------------------------------------------------------------- > > Key: GEODE-2736 > URL: https://issues.apache.org/jira/browse/GEODE-2736 > Project: Geode > Issue Type: Bug > Components: native client > Reporter: Mark Hanson > > Users has a function which executes on all nodes hosting the region, during > rolling restart of the Geode servers, occasionally all the threads in the > Geode native client thread pool get stuck sometimes when certain exceptions > are thrown. > The user thinks it is due to possible incorrect exception handling inside the > Geode native client code > This leads to application threads getting stuck as the thread pool is not > available -- This message was sent by Atlassian JIRA (v6.3.15#6346)