Attached an update of the sources. Working for audio and video now.
Comments are are welcomed.
Bernhard

----- Original Message ----- From: "Bernhard Feiten" <[EMAIL PROTECTED]> To: "LIVE555 Streaming Media - development & use" <[EMAIL PROTECTED]>
Sent: Monday, August 11, 2008 8:39 AM
Subject: Re: [Live-devel] Media stream duplication on a streaming relay


Dear Ross, all,

attached you find a first uncomplete version of the StreamDup classes.
Perhpas you have some comments.  Is this in principle a way how to do it?
I'm quite unsure what needs to be regarded concerning the getNextFrame,
aftergetting  filter chain mechanism.

Audio is working, but when I also activate the  video branch it crashes in
H264VideoRTPSink
Must be error,  perhaps also somwhere else in my code.

Thank you for your comments,

Bernhard




----- Original Message ----- From: "Bernhard Feiten" <[EMAIL PROTECTED]>
To: "LIVE555 Streaming Media - development & use"
<[EMAIL PROTECTED]>
Sent: Thursday, August 07, 2008 7:59 AM
Subject: Re: [Live-devel] Media stream duplication on a streaming relay


Dear Ross, all,

as nobody answered I started to write a  StreamDup class following your
hints from the e-mail below.

Related to that I have two questions?
Would it be required that the sequnce number and timestamp of the
duplicates are newly initialized?
Would RTCP also need to be considered then?

Thank you in advance,
Bernhard


----- Original Message ----- From: "Bernhard Feiten" <[EMAIL PROTECTED]>
To: "LIVE555 Streaming Media - development & use"
<[EMAIL PROTECTED]>
Sent: Thursday, June 26, 2008 7:28 AM
Subject: Re: [Live-devel] Media stream duplication on a streaming relay


Dear Ross, all,

did somebody wrote the StreamDuplication class you proposed below
already?

Do you have a hint how the getNextFrame functions could be synchronized?

Thank  you very much,
Bernhard




----- Original Message ----- From: "Ross Finlayson" <[EMAIL PROTECTED]>
To: "LIVE555 Streaming Media - development & use"
<[EMAIL PROTECTED]>
Sent: Thursday, December 08, 2005 8:57 PM
Subject: Re: [Live-devel] Media stream duplication on a streaming relay


As you noticed, you can't duplicate a stream by having each recipient
read from a single object, because "getNextFrame()" can't be called more
than once on the same object concurrently.

The solution, instead, is to create a separate object (of some
"FramedSource" subclass) for each recipient.  This new class would
implement the "doGetNextFrame()" virtual function by somehow
'registering' with the data source object - to request a copy of the
next incoming frame.

The data source object (which would *not* be a subclass of
"FramedSource", and so would not implement "doGetNextFrame()") would
handle these requests by delivering copies of each incoming frame to
each recipient, and not ask for a new incoming frame (from its upstream
data source) until it has finished delivering data to each downstream
recipient.

For an example of code that is similar to this, note the relationship
between the "MPEG1or2Demux" and "MPEG1or2DemuxedElementaryStream"
classes. (In this case, however, we are demultiplexing data to
downstream recipients, rather than duplicating it.)  Note in particular
that (i) "MPEG1or2Demux" is subclassed from "Medium", not
"FramedSource", and (ii) "MPEG1or2Demux" implements a "getNextFrame()"
function that is similar to, but different from
"FramedSource::getNextFrame()".

(This functionality (data duplication) is something that should probably
be added to the library someday.  Until then, however, you will need to
implement it yourself.)


Ross Finlayson
Live Networks, Inc. (LIVE555.COM)
<http://www.live555.com/>

_______________________________________________
live-devel mailing list
live-devel@lists.live555.com
http://lists.live555.com/mailman/listinfo/live-devel

_______________________________________________
live-devel mailing list
live-devel@lists.live555.com
http://lists.live555.com/mailman/listinfo/live-devel

_______________________________________________
live-devel mailing list
live-devel@lists.live555.com
http://lists.live555.com/mailman/listinfo/live-devel



--------------------------------------------------------------------------------


_______________________________________________
live-devel mailing list
live-devel@lists.live555.com
http://lists.live555.com/mailman/listinfo/live-devel

/**********
This library is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by the
Free Software Foundation; either version 2.1 of the License, or (at your
option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)

This library is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
more details.

You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
**********/
// "liveMedia"
// Copyright (c) 1996-2007 Live Networks, Inc.  All rights reserved.
// Dupplication of Streams
// C++ header

// Author: Bernhard Feiten, 11.8.2008, based on MPEG1or2Demux

#ifndef _STREAM_DUP_HH
#define _STREAM_DUP_HH

#ifndef _FRAMED_SOURCE_HH
#include "FramedSource.hh"
#endif

class StreamDup: public Medium {

public:
typedef void (afterGettingFunc)( void* clientData, unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds);

   typedef void (onCloseFunc      )(void* clientData);

   static StreamDup* createNew( UsageEnvironment& env,
                                FramedSource* inputSource,
                                Boolean reclaimWhenLastReferenceDies = False);
   // If "reclaimWhenLastReferenceDies" is True, the StreamDup object is 
deleted when
// all duplication of the streams that we created get deleted.
   FramedSource* registerDuplicateStream();
   void unregisterDuplicateStream(unsigned int streamId);

   void getNextFrame( unsigned int streamId,
                           unsigned long frameCount,
unsigned char* to, unsigned maxSize,
                               StreamDup::afterGettingFunc* afterGettingFunc,
                               void* afterGettingClientData,
                               StreamDup::onCloseFunc* onCloseFunc,
                               void* onCloseClientData);

   void doGetNextFrame( );

static void afterGettingFrame( void* clientdata, unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds ); void afterGettingFrame1( unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds );

  int NumOfStreamDups() { return fNumOfStreamDups;}

static void handleClosure(void* clientData); FramedSource* inputSource() const { return fInputSource; }

private:
  StreamDup( UsageEnvironment& env,          // called only by createNew()
FramedSource* inputSource, Boolean reclaimWhenLastReferenceDies); virtual ~StreamDup();

private:
   FramedSource*  fInputSource;

   unsigned char  fTo[65535];                // in
   static const unsigned  fMaxSize = 65535;  // in
   unsigned       fFrameSize;                // out
   unsigned       fNumTruncatedBytes;        // out
   struct timeval fPresentationTime;         // out
   unsigned       fDurationInMicroseconds;   // out

   unsigned long  fSourceFrameCount;
   Boolean             fReclaimWhenLastReferenceDies;
   unsigned       fNumOfStreamDups;
   unsigned       fNumPendingReads;

private:
   // redefined virtual functions:
   afterGettingFunc* fAfterGettingFunc;
   void*             fAfterGettingClientData;
   onCloseFunc*      fOnCloseFunc;
   void*             fOnCloseClientData;
   Boolean           fIsCurrentlyAwaitingData;

   // A descriptor for each possible duplicate stream:
   typedef struct OutputDescriptor {
       // input parameters
unsigned char* to; unsigned maxSize;
       FramedSource::afterGettingFunc* fAfterGettingFunc;
       void* afterGettingClientData;
       FramedSource::onCloseFunc* fOnCloseFunc;
       void* onCloseClientData;

       // status parameters
       Boolean isPotentiallyReadable;
       Boolean isCurrentlyActive;
       Boolean isCurrentlyAwaitingData;
   } OutputDescriptor_t;
   OutputDescriptor_t fOutput[256];

};

#endif
/**********
This library is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by the
Free Software Foundation; either version 2.1 of the License, or (at your
option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)

This library is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
more details.

You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
**********/
// "liveMedia"
// Copyright (c) 1996-2007 Live Networks, Inc.  All rights reserved.
// Duplication of Streams
// Implementation

// Author: Bernhard Feiten, 11.8.2008, based on MPEG1or2Demux

#include "StreamDup.h"

////////// StreamDupFrame ///////////////////////////
// instantiated for each duplicated stream and called e.g. from the different 
RTPSink objects.
class StreamDupFrame: public FramedSource {
public:
   StreamDup& source() const { return fOurSource; }

private: // We are created only by a StreamDup (a friend)
   StreamDupFrame( UsageEnvironment& env,
                                StreamDup& source,
                                        unsigned int streamId);
   virtual ~StreamDupFrame();

private:
   // redefined virtual functions:
   virtual void doGetNextFrame();
//    virtual void doStopGettingFrames();
virtual char const* MIMEtype() const; virtual unsigned maxFrameSize() const;

private:
   static void afterGettingFrame( void* clientData,
unsigned frameSize, unsigned numTruncatedBytes,
                                                   struct timeval 
presentationTime,
                                                   unsigned 
durationInMicroseconds);

void afterGettingFrame1( unsigned frameSize, unsigned numTruncatedBytes,
                                         struct timeval presentationTime,
                                         unsigned durationInMicroseconds);

private:
   StreamDup&    fOurSource;
        unsigned int  fStreamId;
        unsigned long fFrameCount;
   char const*   fMIMEtype;

   friend class StreamDup;
};

///////// StreamDupFrame implementation ///////
StreamDupFrame::StreamDupFrame( UsageEnvironment& env, StreamDup& source,
                                                                unsigned int 
streamId)
                               : FramedSource(env),
                                                                  
fOurSource(source),
                                                                  
fStreamId(streamId),
                                                                  fFrameCount(0)
{
        // Set our MIME type string for known media types:
   fMIMEtype = MediaSource::MIMEtype();
}

StreamDupFrame::~StreamDupFrame() {
   fOurSource.unregisterDuplicateStream(fStreamId);
}

void StreamDupFrame::doGetNextFrame() { // Arrange to read data directly into the client's buffer:
   fFrameCount++;
   fOurSource.getNextFrame( fStreamId, fFrameCount, fTo, fMaxSize,
                                         afterGettingFrame, this,
                                         handleClosure, this);
}

//void StreamDupFrame::doStopGettingFrames() //{
//    fOurSource.stopGettingFrames();
//}

char const* StreamDupFrame::MIMEtype() const {
 return fMIMEtype;
}

unsigned StreamDupFrame::maxFrameSize() const {
   return 65535;
}

void StreamDupFrame::afterGettingFrame( void* clientData,
unsigned frameSize, unsigned numTruncatedBytes,
                                                struct timeval presentationTime,
unsigned durationInMicroseconds) {
   StreamDupFrame* stream = (StreamDupFrame*)clientData;
   stream->afterGettingFrame1( frameSize, numTruncatedBytes,
                                            presentationTime, 
durationInMicroseconds);
}

void StreamDupFrame::afterGettingFrame1( unsigned frameSize, unsigned 
numTruncatedBytes,
                                                 struct timeval 
presentationTime,
unsigned durationInMicroseconds) {
   fFrameSize = frameSize;
   fPresentationTime = presentationTime;
   fDurationInMicroseconds = durationInMicroseconds;

   FramedSource::afterGetting(this);
}
//////////////// end  StreamDupFrame implementation //////////////////

//////////////// StreamDup implementation /////////////////////////////
StreamDup* StreamDup::createNew( UsageEnvironment& env,
FramedSource* inputSource, Boolean reclaimWhenLastReferenceDies) {
   // Need to add source type checking here???  #####

   return new StreamDup(env, inputSource, reclaimWhenLastReferenceDies);
}

StreamDup::StreamDup( UsageEnvironment& env,
FramedSource* inputSource, Boolean reclaimWhenLastReferenceDies)
                     : Medium(env),
                       fInputSource(inputSource),
fReclaimWhenLastReferenceDies(reclaimWhenLastReferenceDies), fNumOfStreamDups(0),
                       fNumPendingReads(0),
                                                fSourceFrameCount(0),
                                                fIsCurrentlyAwaitingData(false)
{
   for (unsigned i = 0; i < 256; ++i) {
      fOutput[i].isPotentiallyReadable = False;
      fOutput[i].isCurrentlyActive = False;
      fOutput[i].isCurrentlyAwaitingData = False;
   }    
}

StreamDup::~StreamDup() {
   Medium::close(fInputSource);
}


FramedSource* StreamDup::registerDuplicateStream()
{ fNumOfStreamDups++ ;
   return new StreamDupFrame(envir(), *this, fNumOfStreamDups-1); // Where is 
it deleted ?
}

void StreamDup::unregisterDuplicateStream(unsigned int streamId)
{
   if (fNumOfStreamDups> 0) fNumOfStreamDups --;
   fOutput[streamId].isCurrentlyActive       = False;
   fOutput[streamId].isCurrentlyAwaitingData = False;
}

void StreamDup::getNextFrame( unsigned int streamId,
                                                          unsigned long 
frameCount,
unsigned char* to, unsigned maxSize,
                                              StreamDup::afterGettingFunc* 
afterGettingFunc,
                                              void* afterGettingClientData,
                                              StreamDup::onCloseFunc* 
onCloseFunc,
void* onCloseClientData) {
        // register the request in the out stream array
//      envir() << "FramedSource[" << this << "]::getNextFrame() streamId: " << streamId 
<< "\n";
   fOutput[streamId].to                      = to;
   fOutput[streamId].maxSize                 = maxSize;
   fOutput[streamId].fAfterGettingFunc       = afterGettingFunc;
   fOutput[streamId].afterGettingClientData  = afterGettingClientData;
   fOutput[streamId].fOnCloseFunc            = onCloseFunc;
   fOutput[streamId].onCloseClientData       = onCloseClientData;
   fOutput[streamId].isCurrentlyActive       = True;
   fOutput[streamId].isCurrentlyAwaitingData = True;

   if (fIsCurrentlyAwaitingData) return;
        // check if a new frame from the source should be fetched
if (frameCount > fSourceFrameCount) doGetNextFrame();
        else {
       memmove(fOutput[streamId].to, fTo, fFrameSize);
//              
memmove(fOutput[streamId].afterGettingClientData,this,sizeof(this));
                fOutput[streamId].isCurrentlyAwaitingData = false;
(*fOutput[streamId].fAfterGettingFunc) (fOutput[streamId].afterGettingClientData,
                                    fFrameSize, fNumTruncatedBytes,
                                    fPresentationTime,
                                        fDurationInMicroseconds);
        }
   return;
}


void StreamDup::doGetNextFrame( )
{
   if ( fIsCurrentlyAwaitingData ) return;

   fIsCurrentlyAwaitingData = true;
   fInputSource->getNextFrame(fTo, fMaxSize, afterGettingFrame, this, 
handleClosure, this);
}


void StreamDup::afterGettingFrame( void* clientdata, unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds ) {
        StreamDup * stdp;
   if((stdp = (StreamDup*)clientdata) == NULL )  return;

        stdp->afterGettingFrame1(frameSize, numTruncatedBytes, 
presentationTime, durationInMicroseconds);
}


void StreamDup::afterGettingFrame1( unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds ) {
   fSourceFrameCount ++;
   fFrameSize = frameSize;
   fNumTruncatedBytes = numTruncatedBytes;
   fPresentationTime = presentationTime;
   fDurationInMicroseconds = durationInMicroseconds;
   fIsCurrentlyAwaitingData = false;

   for (int i=0;i<256;i++){
       if (fOutput[i].isCurrentlyAwaitingData) {
           memmove(fOutput[i].to, fTo, fFrameSize);
//                      
memmove(fOutput[i].afterGettingClientData,this,sizeof(this));
           fOutput[i].isCurrentlyAwaitingData = false;
(*fOutput[i].fAfterGettingFunc) (fOutput[i].afterGettingClientData,
                    fFrameSize, 0 /* numTruncatedBytes */,
                    fPresentationTime,
                    fDurationInMicroseconds);
       }
        }                                                               
}


void StreamDup::handleClosure(void* clientData) {
    FramedSource* source = (FramedSource*)clientData;
//  source->fIsCurrentlyAwaitingData = False;                   // because we 
got a close instead
// if (source->fOnCloseFunc != NULL) // (*(source->fOnCloseFunc))(source->fOnCloseClientData);
}

_______________________________________________
live-devel mailing list
live-devel@lists.live555.com
http://lists.live555.com/mailman/listinfo/live-devel

Reply via email to