Thanks for you advice.I wil read your tell me content.



tabish121 wrote:
> 
>> 
>> 
>> follow program,why cann't receive data?
>> How should I modify code?
> 
> Looks like you produce before the durable consumer has been connected
> once. 
> 
> See this link:
> http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html
> 
> A durable consumer must have connected and then disconnected before
> messages are persisted in anticipation of the consumer reconnecting.
> That's why its important to use the same subscription name when
> reconnecting, so that the broker know that the consumer that was
> subscribed as durable is now back and it can deliver the messages that
> were stored in its absence.  
> 
>> 
>> #include <activemq/concurrent/Thread.h>
>> #include <activemq/concurrent/Runnable.h>
>> #include <activemq/core/ActiveMQConnectionFactory.h>
>> #include <activemq/util/Integer.h>
>> #include <cms/Connection.h>
>> #include <cms/Session.h>
>> #include <cms/TextMessage.h>
>> #include <cms/ExceptionListener.h>
>> #include <cms/MessageListener.h>
>> #include <stdlib.h>
>> 
>> using namespace activemq::core;
>> using namespace activemq::util;
>> using namespace activemq::concurrent;
>> using namespace cms;
>> using namespace std;
>> 
>> class HelloWorldProducer : public Runnable {
>> private:
>> 
>>      Connection* connection;
>>      Session* session;
>>      Topic* destination;
>>      MessageProducer* producer;
>>      int numMessages;
>> 
>> public:
>> 
>>      HelloWorldProducer( int numMessages ){
>>              connection = NULL;
>>      session = NULL;
>>      destination = NULL;
>>      producer = NULL;
>>      this->numMessages = numMessages;
>>      }
>> 
>>      virtual ~HelloWorldProducer(){
>>              cleanup();
>>      }
>> 
>>     virtual void run() {
>>         try {
>>                      string user,passwd,sID;
>>                      user="default";
>>                      passwd="";
>>                      sID="lsgID";
>>             ActiveMQConnectionFactory* connectionFactory = new
>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>> 
>>             connection =
>> connectionFactory->createConnection(user,passwd,sID);
>>             connection->start();
>> 
>>                      string sss=connection->getClientId();
>>                      cout << sss << endl;
>> 
>>             session = connection->createSession(
> Session::AUTO_ACKNOWLEDGE
>> );
>>                      destination = session->createTopic( "mytopic" );
>> 
>>                      producer = session->createProducer( destination
> );
>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>> 
>>                      producer->setTimeToLive(100000000);
>>             string threadIdStr = Integer::toString( Thread::getId() );
>> 
>>             // Create a messages
>>             string text = (string)"Hello world! from thread " +
>> threadIdStr;
>> 
>>             for( int ix=0; ix<numMessages; ++ix ){
>>                  TextMessage* message = session->createTextMessage(
> text
>> );
>> 
>>                              string messageID="messageID";
>>                              message->setCMSExpiration(10000000000);
>>                              message->setCMSMessageId(messageID);
>> 
>>              // Tell the producer to send the message
>>                  printf( "Sent message from thread %s\n",
>> threadIdStr.c_str() );
>>                      producer->send( message );
>> 
>>              delete message;
>>             }
>> 
>>         }catch ( CMSException& e ) {
>>             e.printStackTrace();
>>         }
>>     }
>> 
>> private:
>> 
>>     void cleanup(){
>> 
>>                      // Destroy resources.
>>                      try{
>>              if( destination != NULL ) delete destination;
>>                      }catch ( CMSException& e ) {}
>>                      destination = NULL;
>> 
>>                      try{
>>                  if( producer != NULL ) delete producer;
>>                      }catch ( CMSException& e ) {}
>>                      producer = NULL;
>> 
>>              // Close open resources.
>>              try{
>>                      if( session != NULL ) session->close();
>>                      if( connection != NULL ) connection->close();
>>                      }catch ( CMSException& e ) {}
>> 
>>                      try{
>>              if( session != NULL ) delete session;
>>                      }catch ( CMSException& e ) {}
>>                      session = NULL;
>> 
>>             try{
>>              if( connection != NULL ) delete connection;
>>                      }catch ( CMSException& e ) {}
>>              connection = NULL;
>>     }
>> };
>> 
>> class HelloWorldConsumer : public ExceptionListener,
>>                            public MessageListener,
>>                            public Runnable {
>> 
>> private:
>> 
>>      Connection* connection;
>>      Session* session;
>>      Topic* destination;
>>      MessageConsumer* consumer;
>>      long waitMillis;
>> 
>> public:
>> 
>>      HelloWorldConsumer( long waitMillis ){
>>              connection = NULL;
>>      session = NULL;
>>      destination = NULL;
>>      consumer = NULL;
>>      this->waitMillis = waitMillis;
>>      }
>>     virtual ~HelloWorldConsumer(){
>>      cleanup();
>>     }
>> 
>>     virtual void run() {
>> 
>>         try {
>> 
>>                      string user,passwd,sID;
>>                      user="default";
>>                      passwd="";
>>                      sID="lsgID";
>>             // Create a ConnectionFactory
>>             ActiveMQConnectionFactory* connectionFactory =
>>                 new ActiveMQConnectionFactory(
>> "tcp://localhost:61613",user,passwd,sID);
>> 
>>             // Create a Connection
>>             connection =
>> connectionFactory->createConnection();//user,passwd,sID);
>>             delete connectionFactory;
>>             connection->start();
>> 
>>             connection->setExceptionListener(this);
>> 
>>             // Create a Session
>>             session = connection->createSession(
> Session::AUTO_ACKNOWLEDGE
>> );
>>                      destination = session->createTopic( "mytopic" );
>>                      consumer = session->createDurableConsumer(
> destination ,
>> user ,
>> "",false);
>> 
>>             consumer->setMessageListener( this );
>> 
>>             Thread::sleep( waitMillis );
>> 
>>         } catch (CMSException& e) {
>>             e.printStackTrace();
>>         }
>>     }
>> 
>>     virtual void onMessage( const Message* message ){
>> 
>>         try
>>         {
>>          const TextMessage* textMessage =
>>                 dynamic_cast< const TextMessage* >( message );
>>             string text = textMessage->getText();
>>             printf( "Received: %s\n", text.c_str() );
>>         } catch (CMSException& e) {
>>             e.printStackTrace();
>>         }
>>     }
>> 
>>     virtual void onException( const CMSException& ex ) {
>>         printf("JMS Exception occured.  Shutting down client.\n");
>>     }
>> 
>> private:
>> 
>>     void cleanup(){
>> 
>>              // Destroy resources.
>>              try{
>>              if( destination != NULL ) delete destination;
>>              }catch (CMSException& e) {}
>>              destination = NULL;
>> 
>>              try{
>>             if( consumer != NULL ) delete consumer;
>>              }catch (CMSException& e) {}
>>              consumer = NULL;
>> 
>>              // Close open resources.
>>              try{
>>                      if( session != NULL ) session->close();
>>                      if( connection != NULL ) connection->close();
>>              }catch (CMSException& e) {}
>> 
>>         try{
>>              if( session != NULL ) delete session;
>>              }catch (CMSException& e) {}
>>              session = NULL;
>> 
>>              try{
>>              if( connection != NULL ) delete connection;
>>              }catch (CMSException& e) {}
>>              connection = NULL;
>>     }
>> };
>>   void Produce()
>>  {
>>      HelloWorldProducer producer( 2 );
>>      Thread producerThread( &producer );
>>      producerThread.start();
>>      producerThread.join();
>>  }
>>  void Consumer()
>>  {
>>       HelloWorldConsumer consumer( 10000 );
>>       Thread consumerThread( &consumer );
>>       consumerThread.start();
>>       consumerThread.join();
>>  }
>>  int main(int argc, char* argv[])
>>  {
>>       Produce();
>>       Consumer();
>>  }
>> --
>> View this message in context: http://www.nabble.com/above-topic-
>> tf2641566.html#a7373622
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> 
> 
> 

-- 
View this message in context: 
http://www.nabble.com/above-topic-tf2641566.html#a7392444
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to