correct port numbers differences due to NAT. This implementation functions correctly, but may not be the best integration into live architecture. --- groupsock/NetAddress.cpp | 18 +++++ groupsock/include/NetAddress.hh | 5 +- liveMedia/OnDemandServerMediaSubsession.cpp | 69 ++++++++++++++++++-- liveMedia/RTCP.cpp | 33 ++++++++++ liveMedia/include/OnDemandServerMediaSubsession.hh | 17 +++-- 5 files changed, 133 insertions(+), 9 deletions(-)
diff --git a/groupsock/NetAddress.cpp b/groupsock/NetAddress.cpp index ee2e605..66832dd 100644 --- a/groupsock/NetAddress.cpp +++ b/groupsock/NetAddress.cpp @@ -261,6 +261,11 @@ Boolean AddressPortLookupTable::Remove(netAddressBits address1, return fTable->Remove((char*)key); } +Boolean AddressPortLookupTable::IsEmpty() +{ + return fTable->IsEmpty(); +} + AddressPortLookupTable::Iterator::Iterator(AddressPortLookupTable& table) : fIter(HashTable::Iterator::create(*(table.fTable))) { } @@ -274,6 +279,19 @@ void* AddressPortLookupTable::Iterator::next() { return fIter->next(key); } +void* AddressPortLookupTable::Iterator::next(netAddressBits &address1, + netAddressBits &address2, + portNumBits &port) +{ + const char *key; + const int *k; + void *v = fIter->next(key); + k = (const int *)key; + address1 = k[0]; + address2 = k[1]; + port = k[2]; + return v; +} ////////// isMulticastAddress() implementation ////////// diff --git a/groupsock/include/NetAddress.hh b/groupsock/include/NetAddress.hh index 5b47c03..21b7588 100644 --- a/groupsock/include/NetAddress.hh +++ b/groupsock/include/NetAddress.hh @@ -122,6 +122,7 @@ class AddressPortLookupTable { void* Lookup(netAddressBits address1, netAddressBits address2, Port port); // Returns 0 if not found + Boolean IsEmpty(); // Used to iterate through the entries in the table class Iterator { @@ -129,8 +130,10 @@ class AddressPortLookupTable { Iterator(AddressPortLookupTable& table); virtual ~Iterator(); - void* next(); // NULL iff none + void *next(); // NULL iff none + void *next(netAddressBits &address1, netAddressBits &address2, + portNumBits &port); private: HashTable::Iterator* fIter; }; diff --git a/liveMedia/OnDemandServerMediaSubsession.cpp b/liveMedia/OnDemandServerMediaSubsession.cpp index 978a965..da81a0a 100644 --- a/liveMedia/OnDemandServerMediaSubsession.cpp +++ b/liveMedia/OnDemandServerMediaSubsession.cpp @@ -72,6 +72,37 @@ OnDemandServerMediaSubsession::sdpLines() { return fSDPLines; } +void StreamState::incomingRTP() +{ + struct sockaddr_in source; + int freei = fDestsCount; + unsigned i, id; + + readSocket(fRTPSink->envir(), fRTPgs->socketNum(), + (unsigned char *)&id, sizeof(id), source); + + for (i = 0; i < fDestsCount; i++) { + if (fDests[i].addr.sin_addr.s_addr == source.sin_addr.s_addr + && fDests[i].addr.sin_port == source.sin_port) + return; + if (fDests[i].addr.sin_addr.s_addr == 0) + freei = i; + } + i = freei; + if (i == fDestsCount) { + ++fDestsCount; + fDests = (struct Dests_t *)realloc(fDests, fDestsCount * sizeof(*fDests)); + } + fDests[i].id = id; + fDests[i].addr = source; +} + +static void _incomingRTP(void *instance, int) +{ + StreamState *stream = (StreamState *)instance; + stream->incomingRTP(); +} + void OnDemandServerMediaSubsession ::getStreamParameters(unsigned clientSessionId, netAddressBits clientAddress, @@ -177,9 +208,9 @@ void OnDemandServerMediaSubsession // Record these destinations as being for this client session id: Destinations* destinations; if (tcpSocketNum < 0) { // UDP - destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort); + destinations = new Destinations(destinationAddr, clientRTPPort, clientRTCPPort, clientSessionId); } else { // TCP - destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId); + destinations = new Destinations(tcpSocketNum, rtpChannelId, rtcpChannelId, clientSessionId); } fDestinationsHashTable->Add((char const*)clientSessionId, destinations); } @@ -415,7 +446,12 @@ StreamState::StreamState(OnDemandServerMediaSubsession& master, fServerRTPPort(serverRTPPort), fServerRTCPPort(serverRTCPPort), fRTPSink(rtpSink), fUDPSink(udpSink), fStreamDuration(master.duration()), fTotalBW(totalBW), fRTCPInstance(NULL) /* created later */, - fMediaSource(mediaSource), fStartNPT(0.0), fRTPgs(rtpGS), fRTCPgs(rtcpGS) { + fMediaSource(mediaSource), fStartNPT(0.0), fRTPgs(rtpGS), fRTCPgs(rtcpGS), + fDests(NULL), fDestsCount(0) { + + fRTPSink->envir().taskScheduler().setBackgroundHandling + (fRTPgs->socketNum(), SOCKET_READABLE|SOCKET_EXCEPTION, + (TaskScheduler::BackgroundHandlerProc *)_incomingRTP, this); } StreamState::~StreamState() { @@ -455,7 +491,30 @@ void StreamState } else { // Tell the RTP and RTCP 'groupsocks' about this destination // (in case they don't already have it): - if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort); + if (fRTPgs != NULL) { + unsigned i; + for (i = 0; i < fDestsCount; i++) + if (dests->addr.s_addr == fDests[i].addr.sin_addr.s_addr + && (dests->rtpPort.num() == fDests[i].addr.sin_port || + fDests[i].id == dests->sessionId)) + break; + if (i == fDestsCount) + for (i = 0; i < fDestsCount; i++) + if (dests->addr.s_addr == fDests[i].addr.sin_addr.s_addr) + break; + if (i < fDestsCount) { + if (dests->rtpPort.num() != fDests[i].addr.sin_port) { + fprintf(stderr, "Received RTP for session %x, " + "with different ports, changing %hu -> %hu\n", + dests->sessionId, ntohs(dests->rtpPort.num()), + ntohs(fDests[i].addr.sin_port)); + dests->rtpPort = ntohs(fDests[i].addr.sin_port); + } + fDests[i].addr.sin_addr.s_addr = 0; + } + + fRTPgs->addDestination(dests->addr, dests->rtpPort); + } if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort); if (fRTCPInstance != NULL) { fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort, @@ -519,6 +578,7 @@ void StreamState::endPlaying(Destinations* dests) { } void StreamState::reclaim() { + fRTPSink->envir().taskScheduler().turnOffBackgroundReadHandling(fRTPgs->socketNum()); // Delete allocated media objects Medium::close(fRTCPInstance) /* will send a RTCP BYE */; fRTCPInstance = NULL; Medium::close(fRTPSink); fRTPSink = NULL; @@ -529,4 +589,5 @@ void StreamState::reclaim() { delete fRTPgs; fRTPgs = NULL; delete fRTCPgs; fRTCPgs = NULL; + free(fDests); } diff --git a/liveMedia/RTCP.cpp b/liveMedia/RTCP.cpp index fce4624..dfe07d3 100644 --- a/liveMedia/RTCP.cpp +++ b/liveMedia/RTCP.cpp @@ -169,6 +169,8 @@ RTCPInstance::RTCPInstance(UsageEnvironment& env, Groupsock* RTCPgs, } struct RRHandlerRecord { + portNumBits origPortNum; + Boolean seen; TaskFunc* rrHandlerTask; void* rrHandlerClientData; }; @@ -258,6 +260,8 @@ void RTCPInstance } RRHandlerRecord* rrHandler = new RRHandlerRecord; + rrHandler->seen = False; + rrHandler->origPortNum = fromPort.num(); rrHandler->rrHandlerTask = handlerTask; rrHandler->rrHandlerClientData = clientData; if (fSpecificRRHandlerTable == NULL) { @@ -503,9 +507,38 @@ void RTCPInstance::incomingReportHandler1() { RRHandlerRecord* rrHandler = (RRHandlerRecord*)(fSpecificRRHandlerTable->Lookup(fromAddr, (~0), fromPort)); if (rrHandler != NULL) { + rrHandler->seen = True; if (rrHandler->rrHandlerTask != NULL) { (*(rrHandler->rrHandlerTask))(rrHandler->rrHandlerClientData); } + } else if (tcpReadStreamSocketNum < 0 && !fIsSSMSource && + !fSpecificRRHandlerTable->IsEmpty()) { + AddressPortLookupTable::Iterator i = *fSpecificRRHandlerTable; + netAddressBits a1, a2; + portNumBits p; + while(1) { + RRHandlerRecord *r = (RRHandlerRecord *)i.next(a1, a2, p); + if (r == NULL) + break; + if (a1 != fromAddr) + continue; + if (r->seen && p == r->origPortNum) + continue; + fSpecificRRHandlerTable->Remove(a1, a2, ntohs(p)); + fSpecificRRHandlerTable->Add(a1, (~0), fromPort, r); + + struct in_addr ia1; + ia1.s_addr = a1; + RTCPgs()->removeDestination(ia1, ntohs(p)); + RTCPgs()->addDestination(ia1, fromPort); + fprintf(stderr, "RTCP, changing port from %hu to %hu\n", + ntohs(p), fromPortNum); + + r->seen = True; + if (r->rrHandlerTask != NULL) + (*(r->rrHandlerTask))(r->rrHandlerClientData); + break; + } } } diff --git a/liveMedia/include/OnDemandServerMediaSubsession.hh b/liveMedia/include/OnDemandServerMediaSubsession.hh index bdfc228..dbd8aa2 100644 --- a/liveMedia/include/OnDemandServerMediaSubsession.hh +++ b/liveMedia/include/OnDemandServerMediaSubsession.hh @@ -121,12 +121,14 @@ class Destinations { public: Destinations(struct in_addr const& destAddr, Port const& rtpDestPort, - Port const& rtcpDestPort) - : isTCP(False), addr(destAddr), rtpPort(rtpDestPort), rtcpPort(rtcpDestPort) { + Port const& rtcpDestPort, unsigned sessionId) + : isTCP(False), addr(destAddr), rtpPort(rtpDestPort), rtcpPort(rtcpDestPort), + sessionId(sessionId) { } - Destinations(int tcpSockNum, unsigned char rtpChanId, unsigned char rtcpChanId) + Destinations(int tcpSockNum, unsigned char rtpChanId, unsigned char rtcpChanId, unsigned sessionId) : isTCP(True), rtpPort(0) /*dummy*/, rtcpPort(0) /*dummy*/, - tcpSocketNum(tcpSockNum), rtpChannelId(rtpChanId), rtcpChannelId(rtcpChanId) { + tcpSocketNum(tcpSockNum), rtpChannelId(rtpChanId), + rtcpChannelId(rtcpChanId), sessionId(sessionId) { } public: @@ -136,6 +138,7 @@ public: Port rtcpPort; int tcpSocketNum; unsigned char rtpChannelId, rtcpChannelId; + unsigned sessionId; }; class StreamState { @@ -154,6 +157,7 @@ public: void pause(); void endPlaying(Destinations* destinations); void reclaim(); + void incomingRTP(); unsigned& referenceCount() { return fReferenceCount; } @@ -186,6 +190,11 @@ private: Groupsock* fRTPgs; Groupsock* fRTCPgs; + struct Dests_t { + unsigned id; + struct sockaddr_in addr; + } *fDests; + unsigned fDestsCount; }; #endif -- 1.7.10.4 _______________________________________________ live-devel mailing list live-devel@lists.live555.com http://lists.live555.com/mailman/listinfo/live-devel