correct port numbers differences due to NAT. This implementation functions
correctly, but may not be the best integration into live architecture.
---
 groupsock/NetAddress.cpp                           |   18 +++++
 groupsock/include/NetAddress.hh                    |    5 +-
 liveMedia/OnDemandServerMediaSubsession.cpp        |   69 ++++++++++++++++++--
 liveMedia/RTCP.cpp                                 |   33 ++++++++++
 liveMedia/include/OnDemandServerMediaSubsession.hh |   17 +++--
 5 files changed, 133 insertions(+), 9 deletions(-)

diff --git a/groupsock/NetAddress.cpp b/groupsock/NetAddress.cpp
index ee2e605..66832dd 100644
--- a/groupsock/NetAddress.cpp
+++ b/groupsock/NetAddress.cpp
@@ -261,6 +261,11 @@ Boolean AddressPortLookupTable::Remove(netAddressBits 
address1,
   return fTable->Remove((char*)key);
 }
 
+Boolean AddressPortLookupTable::IsEmpty()
+{
+  return fTable->IsEmpty();
+}
+
 AddressPortLookupTable::Iterator::Iterator(AddressPortLookupTable& table)
   : fIter(HashTable::Iterator::create(*(table.fTable))) {
 }
@@ -274,6 +279,19 @@ void* AddressPortLookupTable::Iterator::next() {
   return fIter->next(key);
 }
 
+void* AddressPortLookupTable::Iterator::next(netAddressBits &address1, 
+                                       netAddressBits &address2,
+                                       portNumBits &port)
+{
+  const char *key;
+  const int *k;
+  void *v = fIter->next(key);
+  k = (const int *)key;
+  address1 = k[0];
+  address2 = k[1];
+  port = k[2];
+  return v;
+}
 
 ////////// isMulticastAddress() implementation //////////
 
diff --git a/groupsock/include/NetAddress.hh b/groupsock/include/NetAddress.hh
index 5b47c03..21b7588 100644
--- a/groupsock/include/NetAddress.hh
+++ b/groupsock/include/NetAddress.hh
@@ -122,6 +122,7 @@ class AddressPortLookupTable {
        void* Lookup(netAddressBits address1, netAddressBits address2,
                     Port port);
                // Returns 0 if not found
+       Boolean IsEmpty();
 
        // Used to iterate through the entries in the table
        class Iterator {
@@ -129,8 +130,10 @@ class AddressPortLookupTable {
                Iterator(AddressPortLookupTable& table);
                virtual ~Iterator();
 
-               void* next(); // NULL iff none
+               void *next(); // NULL iff none
 
+               void *next(netAddressBits &address1, netAddressBits &address2,
+                       portNumBits &port);
            private:
                HashTable::Iterator* fIter;
        };
diff --git a/liveMedia/OnDemandServerMediaSubsession.cpp 
b/liveMedia/OnDemandServerMediaSubsession.cpp
index 978a965..da81a0a 100644
--- a/liveMedia/OnDemandServerMediaSubsession.cpp
+++ b/liveMedia/OnDemandServerMediaSubsession.cpp
@@ -72,6 +72,37 @@ OnDemandServerMediaSubsession::sdpLines() {
   return fSDPLines;
 }
 
+void StreamState::incomingRTP()
+{
+  struct sockaddr_in source;
+  int freei = fDestsCount;
+  unsigned i, id;
+  
+  readSocket(fRTPSink->envir(), fRTPgs->socketNum(), 
+           (unsigned char *)&id, sizeof(id), source);
+
+  for (i = 0; i < fDestsCount; i++) {
+    if (fDests[i].addr.sin_addr.s_addr == source.sin_addr.s_addr
+       && fDests[i].addr.sin_port == source.sin_port)
+      return;
+    if (fDests[i].addr.sin_addr.s_addr == 0)
+      freei = i;
+  }
+  i = freei;
+  if (i == fDestsCount) {
+    ++fDestsCount;
+    fDests = (struct Dests_t *)realloc(fDests, fDestsCount * sizeof(*fDests));
+  }
+  fDests[i].id = id;
+  fDests[i].addr = source;
+}
+
+static void _incomingRTP(void *instance, int)
+{
+  StreamState *stream = (StreamState *)instance;
+  stream->incomingRTP();
+}
+
 void OnDemandServerMediaSubsession
 ::getStreamParameters(unsigned clientSessionId,
                      netAddressBits clientAddress,
@@ -177,9 +208,9 @@ void OnDemandServerMediaSubsession
   // Record these destinations as being for this client session id:
   Destinations* destinations;
   if (tcpSocketNum < 0) { // UDP
-    destinations = new Destinations(destinationAddr, clientRTPPort, 
clientRTCPPort);
+    destinations = new Destinations(destinationAddr, clientRTPPort, 
clientRTCPPort, clientSessionId);
   } else { // TCP
-    destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId);
+    destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId, 
clientSessionId);
   }
   fDestinationsHashTable->Add((char const*)clientSessionId, destinations);
 }
@@ -415,7 +446,12 @@ StreamState::StreamState(OnDemandServerMediaSubsession& 
master,
     fServerRTPPort(serverRTPPort), fServerRTCPPort(serverRTCPPort),
     fRTPSink(rtpSink), fUDPSink(udpSink), fStreamDuration(master.duration()),
     fTotalBW(totalBW), fRTCPInstance(NULL) /* created later */,
-    fMediaSource(mediaSource), fStartNPT(0.0), fRTPgs(rtpGS), fRTCPgs(rtcpGS) {
+    fMediaSource(mediaSource), fStartNPT(0.0), fRTPgs(rtpGS), fRTCPgs(rtcpGS), 
+    fDests(NULL), fDestsCount(0) {
+  
+  fRTPSink->envir().taskScheduler().setBackgroundHandling
+    (fRTPgs->socketNum(), SOCKET_READABLE|SOCKET_EXCEPTION,
+      (TaskScheduler::BackgroundHandlerProc *)_incomingRTP, this);
 }
 
 StreamState::~StreamState() {
@@ -455,7 +491,30 @@ void StreamState
   } else {
     // Tell the RTP and RTCP 'groupsocks' about this destination
     // (in case they don't already have it):
-    if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort);
+    if (fRTPgs != NULL) {
+      unsigned i;
+      for (i = 0; i < fDestsCount; i++)
+       if (dests->addr.s_addr == fDests[i].addr.sin_addr.s_addr
+           && (dests->rtpPort.num() == fDests[i].addr.sin_port ||
+               fDests[i].id == dests->sessionId))
+         break;
+      if (i == fDestsCount)
+       for (i = 0; i < fDestsCount; i++)
+         if (dests->addr.s_addr == fDests[i].addr.sin_addr.s_addr)
+           break;
+      if (i < fDestsCount) {
+       if (dests->rtpPort.num() != fDests[i].addr.sin_port) {
+         fprintf(stderr, "Received RTP for session %x, "
+                 "with different ports, changing %hu -> %hu\n", 
+                 dests->sessionId, ntohs(dests->rtpPort.num()), 
+                 ntohs(fDests[i].addr.sin_port));
+         dests->rtpPort = ntohs(fDests[i].addr.sin_port);
+       }
+       fDests[i].addr.sin_addr.s_addr = 0;
+      }
+
+      fRTPgs->addDestination(dests->addr, dests->rtpPort);
+    }
     if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort);
     if (fRTCPInstance != NULL) {
       fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
@@ -519,6 +578,7 @@ void StreamState::endPlaying(Destinations* dests) {
 }
 
 void StreamState::reclaim() {
+  
fRTPSink->envir().taskScheduler().turnOffBackgroundReadHandling(fRTPgs->socketNum());
   // Delete allocated media objects
   Medium::close(fRTCPInstance) /* will send a RTCP BYE */; fRTCPInstance = 
NULL;
   Medium::close(fRTPSink); fRTPSink = NULL;
@@ -529,4 +589,5 @@ void StreamState::reclaim() {
 
   delete fRTPgs; fRTPgs = NULL;
   delete fRTCPgs; fRTCPgs = NULL;
+  free(fDests);
 }
diff --git a/liveMedia/RTCP.cpp b/liveMedia/RTCP.cpp
index fce4624..dfe07d3 100644
--- a/liveMedia/RTCP.cpp
+++ b/liveMedia/RTCP.cpp
@@ -169,6 +169,8 @@ RTCPInstance::RTCPInstance(UsageEnvironment& env, 
Groupsock* RTCPgs,
 }
 
 struct RRHandlerRecord {
+  portNumBits origPortNum;
+  Boolean seen;
   TaskFunc* rrHandlerTask;
   void* rrHandlerClientData;
 };
@@ -258,6 +260,8 @@ void RTCPInstance
   }
 
   RRHandlerRecord* rrHandler = new RRHandlerRecord;
+  rrHandler->seen = False;
+  rrHandler->origPortNum = fromPort.num();
   rrHandler->rrHandlerTask = handlerTask;
   rrHandler->rrHandlerClientData = clientData;
   if (fSpecificRRHandlerTable == NULL) {
@@ -503,9 +507,38 @@ void RTCPInstance::incomingReportHandler1() {
              RRHandlerRecord* rrHandler
                = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddr, 
(~0), fromPort));
              if (rrHandler != NULL) {
+               rrHandler->seen = True;
                if (rrHandler->rrHandlerTask != NULL) {
                  (*(rrHandler->rrHandlerTask))(rrHandler->rrHandlerClientData);
                }
+             } else if (tcpReadStreamSocketNum < 0 && !fIsSSMSource && 
+                       !fSpecificRRHandlerTable->IsEmpty()) {
+               AddressPortLookupTable::Iterator i = *fSpecificRRHandlerTable;
+               netAddressBits a1, a2;
+               portNumBits p;
+               while(1) {
+                 RRHandlerRecord *r = (RRHandlerRecord *)i.next(a1, a2, p);
+                 if (r == NULL)
+                   break;
+                 if (a1 != fromAddr)
+                   continue;
+                 if (r->seen && p == r->origPortNum)
+                   continue;
+                 fSpecificRRHandlerTable->Remove(a1, a2, ntohs(p));
+                 fSpecificRRHandlerTable->Add(a1, (~0), fromPort, r);
+
+                 struct in_addr ia1;
+                 ia1.s_addr = a1;
+                 RTCPgs()->removeDestination(ia1, ntohs(p));
+                 RTCPgs()->addDestination(ia1, fromPort);
+                 fprintf(stderr, "RTCP, changing port from %hu to %hu\n",
+                       ntohs(p), fromPortNum);
+
+                 r->seen = True;
+                 if (r->rrHandlerTask != NULL)
+                   (*(r->rrHandlerTask))(r->rrHandlerClientData);
+                 break;
+               }
              }
            }
 
diff --git a/liveMedia/include/OnDemandServerMediaSubsession.hh 
b/liveMedia/include/OnDemandServerMediaSubsession.hh
index bdfc228..dbd8aa2 100644
--- a/liveMedia/include/OnDemandServerMediaSubsession.hh
+++ b/liveMedia/include/OnDemandServerMediaSubsession.hh
@@ -121,12 +121,14 @@ class Destinations {
 public:
   Destinations(struct in_addr const& destAddr,
                Port const& rtpDestPort,
-               Port const& rtcpDestPort)
-    : isTCP(False), addr(destAddr), rtpPort(rtpDestPort), 
rtcpPort(rtcpDestPort) {
+             Port const& rtcpDestPort, unsigned sessionId)
+    : isTCP(False), addr(destAddr), rtpPort(rtpDestPort), 
rtcpPort(rtcpDestPort),
+      sessionId(sessionId) {
   }
-  Destinations(int tcpSockNum, unsigned char rtpChanId, unsigned char 
rtcpChanId)
+  Destinations(int tcpSockNum, unsigned char rtpChanId, unsigned char 
rtcpChanId, unsigned sessionId)
     : isTCP(True), rtpPort(0) /*dummy*/, rtcpPort(0) /*dummy*/,
-      tcpSocketNum(tcpSockNum), rtpChannelId(rtpChanId), 
rtcpChannelId(rtcpChanId) {
+      tcpSocketNum(tcpSockNum), rtpChannelId(rtpChanId), 
+      rtcpChannelId(rtcpChanId), sessionId(sessionId) {
   }
 
 public:
@@ -136,6 +138,7 @@ public:
   Port rtcpPort;
   int tcpSocketNum;
   unsigned char rtpChannelId, rtcpChannelId;
+  unsigned sessionId;
 };
 
 class StreamState {
@@ -154,6 +157,7 @@ public:
   void pause();
   void endPlaying(Destinations* destinations);
   void reclaim();
+  void incomingRTP();
 
   unsigned& referenceCount() { return fReferenceCount; }
 
@@ -186,6 +190,11 @@ private:
 
   Groupsock* fRTPgs;
   Groupsock* fRTCPgs;
+  struct Dests_t {
+    unsigned id;
+    struct sockaddr_in addr;
+  } *fDests;
+  unsigned fDestsCount;
 };
 
 #endif
-- 
1.7.10.4

_______________________________________________
live-devel mailing list
live-devel@lists.live555.com
http://lists.live555.com/mailman/listinfo/live-devel

Reply via email to