Thanks for you advice.I wil read your tell me content.
tabish121 wrote: > >> >> >> follow program,why cann't receive data? >> How should I modify code? > > Looks like you produce before the durable consumer has been connected > once. > > See this link: > http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html > > A durable consumer must have connected and then disconnected before > messages are persisted in anticipation of the consumer reconnecting. > That's why its important to use the same subscription name when > reconnecting, so that the broker know that the consumer that was > subscribed as durable is now back and it can deliver the messages that > were stored in its absence. > >> >> #include <activemq/concurrent/Thread.h> >> #include <activemq/concurrent/Runnable.h> >> #include <activemq/core/ActiveMQConnectionFactory.h> >> #include <activemq/util/Integer.h> >> #include <cms/Connection.h> >> #include <cms/Session.h> >> #include <cms/TextMessage.h> >> #include <cms/ExceptionListener.h> >> #include <cms/MessageListener.h> >> #include <stdlib.h> >> >> using namespace activemq::core; >> using namespace activemq::util; >> using namespace activemq::concurrent; >> using namespace cms; >> using namespace std; >> >> class HelloWorldProducer : public Runnable { >> private: >> >> Connection* connection; >> Session* session; >> Topic* destination; >> MessageProducer* producer; >> int numMessages; >> >> public: >> >> HelloWorldProducer( int numMessages ){ >> connection = NULL; >> session = NULL; >> destination = NULL; >> producer = NULL; >> this->numMessages = numMessages; >> } >> >> virtual ~HelloWorldProducer(){ >> cleanup(); >> } >> >> virtual void run() { >> try { >> string user,passwd,sID; >> user="default"; >> passwd=""; >> sID="lsgID"; >> ActiveMQConnectionFactory* connectionFactory = new >> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); >> >> connection = >> connectionFactory->createConnection(user,passwd,sID); >> connection->start(); >> >> string sss=connection->getClientId(); >> cout << sss << endl; >> >> session = connection->createSession( > Session::AUTO_ACKNOWLEDGE >> ); >> destination = session->createTopic( "mytopic" ); >> >> producer = session->createProducer( destination > ); >> producer->setDeliveryMode( DeliveryMode::PERSISTANT ); >> >> producer->setTimeToLive(100000000); >> string threadIdStr = Integer::toString( Thread::getId() ); >> >> // Create a messages >> string text = (string)"Hello world! from thread " + >> threadIdStr; >> >> for( int ix=0; ix<numMessages; ++ix ){ >> TextMessage* message = session->createTextMessage( > text >> ); >> >> string messageID="messageID"; >> message->setCMSExpiration(10000000000); >> message->setCMSMessageId(messageID); >> >> // Tell the producer to send the message >> printf( "Sent message from thread %s\n", >> threadIdStr.c_str() ); >> producer->send( message ); >> >> delete message; >> } >> >> }catch ( CMSException& e ) { >> e.printStackTrace(); >> } >> } >> >> private: >> >> void cleanup(){ >> >> // Destroy resources. >> try{ >> if( destination != NULL ) delete destination; >> }catch ( CMSException& e ) {} >> destination = NULL; >> >> try{ >> if( producer != NULL ) delete producer; >> }catch ( CMSException& e ) {} >> producer = NULL; >> >> // Close open resources. >> try{ >> if( session != NULL ) session->close(); >> if( connection != NULL ) connection->close(); >> }catch ( CMSException& e ) {} >> >> try{ >> if( session != NULL ) delete session; >> }catch ( CMSException& e ) {} >> session = NULL; >> >> try{ >> if( connection != NULL ) delete connection; >> }catch ( CMSException& e ) {} >> connection = NULL; >> } >> }; >> >> class HelloWorldConsumer : public ExceptionListener, >> public MessageListener, >> public Runnable { >> >> private: >> >> Connection* connection; >> Session* session; >> Topic* destination; >> MessageConsumer* consumer; >> long waitMillis; >> >> public: >> >> HelloWorldConsumer( long waitMillis ){ >> connection = NULL; >> session = NULL; >> destination = NULL; >> consumer = NULL; >> this->waitMillis = waitMillis; >> } >> virtual ~HelloWorldConsumer(){ >> cleanup(); >> } >> >> virtual void run() { >> >> try { >> >> string user,passwd,sID; >> user="default"; >> passwd=""; >> sID="lsgID"; >> // Create a ConnectionFactory >> ActiveMQConnectionFactory* connectionFactory = >> new ActiveMQConnectionFactory( >> "tcp://localhost:61613",user,passwd,sID); >> >> // Create a Connection >> connection = >> connectionFactory->createConnection();//user,passwd,sID); >> delete connectionFactory; >> connection->start(); >> >> connection->setExceptionListener(this); >> >> // Create a Session >> session = connection->createSession( > Session::AUTO_ACKNOWLEDGE >> ); >> destination = session->createTopic( "mytopic" ); >> consumer = session->createDurableConsumer( > destination , >> user , >> "",false); >> >> consumer->setMessageListener( this ); >> >> Thread::sleep( waitMillis ); >> >> } catch (CMSException& e) { >> e.printStackTrace(); >> } >> } >> >> virtual void onMessage( const Message* message ){ >> >> try >> { >> const TextMessage* textMessage = >> dynamic_cast< const TextMessage* >( message ); >> string text = textMessage->getText(); >> printf( "Received: %s\n", text.c_str() ); >> } catch (CMSException& e) { >> e.printStackTrace(); >> } >> } >> >> virtual void onException( const CMSException& ex ) { >> printf("JMS Exception occured. Shutting down client.\n"); >> } >> >> private: >> >> void cleanup(){ >> >> // Destroy resources. >> try{ >> if( destination != NULL ) delete destination; >> }catch (CMSException& e) {} >> destination = NULL; >> >> try{ >> if( consumer != NULL ) delete consumer; >> }catch (CMSException& e) {} >> consumer = NULL; >> >> // Close open resources. >> try{ >> if( session != NULL ) session->close(); >> if( connection != NULL ) connection->close(); >> }catch (CMSException& e) {} >> >> try{ >> if( session != NULL ) delete session; >> }catch (CMSException& e) {} >> session = NULL; >> >> try{ >> if( connection != NULL ) delete connection; >> }catch (CMSException& e) {} >> connection = NULL; >> } >> }; >> void Produce() >> { >> HelloWorldProducer producer( 2 ); >> Thread producerThread( &producer ); >> producerThread.start(); >> producerThread.join(); >> } >> void Consumer() >> { >> HelloWorldConsumer consumer( 10000 ); >> Thread consumerThread( &consumer ); >> consumerThread.start(); >> consumerThread.join(); >> } >> int main(int argc, char* argv[]) >> { >> Produce(); >> Consumer(); >> } >> -- >> View this message in context: http://www.nabble.com/above-topic- >> tf2641566.html#a7373622 >> Sent from the ActiveMQ - User mailing list archive at Nabble.com. > > > -- View this message in context: http://www.nabble.com/above-topic-tf2641566.html#a7392444 Sent from the ActiveMQ - User mailing list archive at Nabble.com.
