On Tue, 22 Jan 2013 10:08:49 -0800, Ross Finlayson wrote
> OK, so I'll make that change (changing the order of the calls to 
> "onSourceClosure(this);" and "stopPlaying();" in 
> "FileSink::afterGettingFrame()") in the next release of the software.
> 
> > The main difference - regarding this issue - between our server and the
> > testReplicator app is that, after an error occurs trying to write a video 
> > file,
> > we schedule the writing to start again at a later time. We do that by 
> > overriding
> > the MediaSink::stopPlaying method in our own FileSink class:
> > 
> > virtual void stopPlaying() {
> >  MediaSink::stopPlaying();
> > 
> >  // schedule to restart in 10 secs
> >  envir() << "OurFileSink stopped! Scheduling to restart in 10 secs.\n";
> >  envir().taskScheduler().scheduleDelayedTask(10000000,
> >      (TaskFunc *) OurFileSink::startPlaying, this);
> > }
> 
> No - that's a bad idea.  You shouldn't be redefining a virtual 
> function to do something completely unrelated to what the rest of the 
> code - that calls this virtual function - expects.  (That's why you 
> keep running into trouble :-)
> 
> The right place to be scheduling a new 'playing' task is in your 
> 'after playing' function - i.e., the function that you passed as a 
> parameter when you called "startPlaying()" on your "FileSink" 
> subclass.  That 'after playing' function will get called,
>  automatically, when writes to the file fail (or if the input stream 
> closes) - as a result of the call to "onSourceClosure(this);".  So 
> that's where you should be scheduling your new task.

Initially we were scheduling in afterPlaying function, but your suggestion of
reordering sets fAfterFunc to NULL in MediaSink::stopPlaying() before it gets
called in MediaSink::onSourceClosure().
We think the correct behavior would be to call stopGettingFrames on fSource
which will call doStopGettingFrames() on the specific replica and also have the
afterPlaying() function get called to make the scheduling.


> > I'm attaching the StreamReplicator class with our patches
> 
> I didn't see any attachment!

You're right, I'm sorry. Sending the files now.


> > 2.
> > Also, we needed to change the visibility of the FileSink::continuePlaying()
> > method to protected
> 
> OK, I'll make that change (along with the change to the implementation 
> of "FileSink::afterGettingFrame()" - noted earlier) in the next 
> release of the software.

Thank you.



-- 
Living Data - Sistemas de Informação e Apoio à Decisão, Lda.

LxFactory - Rua Rodrigues de Faria, 103,
edifício I - 4º piso                  Phone:  +351 213622163
1300-501 LISBOA                       Fax:    +351 213622165
Portugal                              URL: www.livingdata.pt

/**********
This library is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by the
Free Software Foundation; either version 2.1 of the License, or (at your
option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)

This library is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
more details.

You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
**********/
// "liveMedia"
// Copyright (c) 1996-2013 Live Networks, Inc.  All rights reserved.
// An class that can be used to create (possibly multiple) 'replicas' of an incoming stream.
// Implementation.

#include "StreamReplicator.hh"

////////// Definition of "StreamReplica": The class that implements each stream replica //////////

class StreamReplica: public FramedSource {
protected:
  friend class StreamReplicator;
  StreamReplica(StreamReplicator& ourReplicator, const char * ID); // called only by "StreamReplicator::createStreamReplica()"
  virtual ~StreamReplica();

private: // redefined virtual functions:
  virtual void doGetNextFrame();
  virtual void doStopGettingFrames();

private:
  static void copyReceivedFrame(StreamReplica* toReplica, StreamReplica* fromReplica);

private:
  StreamReplicator& fOurReplicator;
  int fFrameIndex; // 0 or 1, depending upon which frame we're currently requesting; could also be -1 if we've stopped playing

  // Replicas that are currently awaiting data are kept in a (singly-linked) list:
  StreamReplica* fNext;

  char * myID;
  bool justActivated;
  bool receivingFrame;
};


////////// StreamReplicator implementation //////////

StreamReplicator* StreamReplicator::createNew(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies) {
  return new StreamReplicator(env, inputSource, deleteWhenLastReplicaDies);
}

StreamReplicator::StreamReplicator(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies)
  : Medium(env),
    fInputSource(inputSource), fDeleteWhenLastReplicaDies(deleteWhenLastReplicaDies), fInputSourceHasClosed(False),
    fNumReplicas(0), fNumActiveReplicas(0), fNumDeliveriesMadeSoFar(0),
    fFrameIndex(0), fMasterReplica(NULL), fReplicasAwaitingCurrentFrame(NULL), fReplicasAwaitingNextFrame(NULL) {
}

StreamReplicator::~StreamReplicator() {
  Medium::close(fInputSource);
}

FramedSource* StreamReplicator::createStreamReplica(const char * replicaID) {
  ++fNumReplicas;
  return new StreamReplica(*this, replicaID);
}

void StreamReplicator::getNextFrame(StreamReplica* replica) {
  if (fInputSourceHasClosed) { // handle closure instead
    FramedSource::handleClosure(replica);
    return;
  }

  if (replica->fFrameIndex == -1) {
    // This replica had stopped playing (or had just been created), but is now actively reading.  Note this:
    replica->fFrameIndex = fFrameIndex;
    ++fNumActiveReplicas;
#ifdef DEBUG_REPLICATOR
    replica->justActivated = true;
    envir() << "StreamReplicator DEBUG: (re)activating replica " << replica->myID << "\n";
#endif
  }

  if (fMasterReplica == NULL) {
    // This is the first replica to request the next unread frame.  Make it the 'master' replica - meaning that we read the frame
    // into its buffer, and then copy from this into the other replicas' buffers.
    fMasterReplica = replica;

    // Arrange to read the next frame into this replica's buffer:
    if (fInputSource != NULL) fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize,
							 afterGettingFrame, this, onSourceClosure, this);
  } else if (replica->fFrameIndex != fFrameIndex) {
    // This replica is already asking for the next frame (because it has already received the current frame).  Enqueue it:
    replica->fNext = fReplicasAwaitingNextFrame;
    fReplicasAwaitingNextFrame = replica;
  } else {
    // This replica is asking for the current frame.  Enqueue it:
    replica->fNext = fReplicasAwaitingCurrentFrame;
    fReplicasAwaitingCurrentFrame = replica;

    if (fInputSource != NULL && !fInputSource->isCurrentlyAwaitingData()) {
      // The current frame has already arrived, so deliver it to this replica now:
      deliverReceivedFrame();
    }
  }
}

void StreamReplicator::deactivateStreamReplica(StreamReplica* replicaBeingDeactivated) {
  // Assert: fNumActiveReplicas > 0
  if (fNumReplicas == 0)
    // fprintf(stderr, "StreamReplicator::deactivateStreamReplica() Internal Error!\n"); // should not happen
    envir() << "StreamReplicator::deactivateStreamReplica() Internal Error!\n"; // should not happen

  --fNumActiveReplicas;

  if (replicaBeingDeactivated->receivingFrame)
      --fNumDeliveriesMadeSoFar;

#ifdef DEBUG_REPLICATOR
  envir() << "StreamReplicator DEBUG: deactivating replica " << replicaBeingDeactivated->myID << "\n";
#endif

  // Check whether the replica being deactivated is the 'master' replica, or is enqueued awaiting a frame:
  if (replicaBeingDeactivated == fMasterReplica) {
#ifdef DEBUG_REPLICATOR
    envir() << "StreamReplicator DEBUG: " << replicaBeingDeactivated->myID << " was master replica\n";
#endif
    // We need to replace the 'master replica', if we can:
    if (fReplicasAwaitingCurrentFrame == NULL) {
      // There's currently no replacement 'master replica'
      fMasterReplica = NULL;
#ifdef DEBUG_REPLICATOR
      envir() << "StreamReplicator DEBUG: there is no master replica replacement\n";
#endif
    } else {
      // There's another replica that we can use as a replacement 'master replica':
      fMasterReplica = fReplicasAwaitingCurrentFrame;
      fReplicasAwaitingCurrentFrame = fReplicasAwaitingCurrentFrame->fNext;
      fMasterReplica->fNext = NULL;
#ifdef DEBUG_REPLICATOR
      envir() << "StreamReplicator DEBUG: " << fMasterReplica->myID << " is the new master replica\n";
#endif
    }

    // Check whether the read into the old master replica's buffer is still pending, or has completed:
    if (fInputSource != NULL) {
      if (fInputSource->isCurrentlyAwaitingData()) {
	// We have a pending read into the old master replica's buffer.
	// We need to stop it, and retry the read with a new master (if available)
	fInputSource->stopGettingFrames();

	if (fMasterReplica != NULL) {
	  fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize,
				     afterGettingFrame, this, onSourceClosure, this);
	}
      } else {
	// The read into the old master replica's buffer has already completed.  Copy the data to the new master replica (if any):
	if (fMasterReplica != NULL) {
	  StreamReplica::copyReceivedFrame(fMasterReplica, replicaBeingDeactivated);
	} else {
	  // We don't have a new master replica, so we can't copy the received frame to any new replica that might ask for it.
	  // Fortunately this should be a very rare occurrence.
	}
      }
    }
  } else {
    // The replica that's being removed was not our 'master replica', but make sure it's not on either of our queues:
    if (fReplicasAwaitingCurrentFrame != NULL) {
      if (replicaBeingDeactivated == fReplicasAwaitingCurrentFrame) {
	fReplicasAwaitingCurrentFrame = replicaBeingDeactivated->fNext;
	replicaBeingDeactivated->fNext = NULL;
      }
      else {
	for (StreamReplica* r1 = fReplicasAwaitingCurrentFrame; r1->fNext != NULL; r1 = r1->fNext) {
	  if (r1->fNext == replicaBeingDeactivated) {
	    r1->fNext = replicaBeingDeactivated->fNext;
	    replicaBeingDeactivated->fNext = NULL;
	    break;
	  }
	}
      }
    }
    if (fReplicasAwaitingNextFrame != NULL) {
      if (replicaBeingDeactivated == fReplicasAwaitingNextFrame) {
	fReplicasAwaitingNextFrame = replicaBeingDeactivated->fNext;
	replicaBeingDeactivated->fNext = NULL;
      }
      else {
	for (StreamReplica* r2 = fReplicasAwaitingNextFrame; r2->fNext != NULL; r2 = r2->fNext) {
	  if (r2->fNext == replicaBeingDeactivated) {
	    r2->fNext = replicaBeingDeactivated->fNext;
	    replicaBeingDeactivated->fNext = NULL;
	    break;
	  }
	}
      }
    }
  }

  if (fNumActiveReplicas == 0 && fInputSource != NULL) fInputSource->stopGettingFrames(); // tell our source to stop too
}

void StreamReplicator::removeStreamReplica(StreamReplica* replicaBeingRemoved) {
  // Assert: fNumReplicas > 0
  if (fNumReplicas == 0)
    // fprintf(stderr, "StreamReplicator::removeStreamReplica() Internal Error!\n"); // should not happen
    envir() << "StreamReplicator::removeStreamReplica() Internal Error!\n"; // should not happen

  --fNumReplicas;

  // If this was the last replica, then delete ourselves (if we were set up to do so):
  if (fNumReplicas == 0 && fDeleteWhenLastReplicaDies) {
    delete this;
    return;
  }

  // Now handle the replica that's being removed the same way that we would if it were merely being deactivated:
  if (replicaBeingRemoved->fFrameIndex != -1) { // i.e., we haven't already done this
    deactivateStreamReplica(replicaBeingRemoved);
  }
}

void StreamReplicator::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes,
					 struct timeval presentationTime, unsigned durationInMicroseconds) {
  ((StreamReplicator*)clientData)->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds);
}

void StreamReplicator::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes,
					 struct timeval presentationTime, unsigned durationInMicroseconds) {
  // The frame was read into our master replica's buffer.  Update the master replica's state, but don't complete delivery to it
  // just yet.  We do that later, after we're sure that we've delivered it to all other replicas.
  fMasterReplica->fFrameSize = frameSize;
  fMasterReplica->fNumTruncatedBytes = numTruncatedBytes;
  fMasterReplica->fPresentationTime = presentationTime;
  fMasterReplica->fDurationInMicroseconds = durationInMicroseconds;

  deliverReceivedFrame();
}

void StreamReplicator::onSourceClosure(void* clientData) {
  ((StreamReplicator*)clientData)->onSourceClosure();
}

void StreamReplicator::onSourceClosure() {
  fInputSourceHasClosed = True;

  // Signal the closure to each replica that is currently awaiting a frame:
  StreamReplica* replica;
  while ((replica = fReplicasAwaitingCurrentFrame) != NULL) {
    fReplicasAwaitingCurrentFrame = replica->fNext;
    replica->fNext = NULL;
    FramedSource::handleClosure(replica);
  }
  while ((replica = fReplicasAwaitingNextFrame) != NULL) {
    fReplicasAwaitingNextFrame = replica->fNext;
    replica->fNext = NULL;
    FramedSource::handleClosure(replica);
  }
  if ((replica = fMasterReplica) != NULL) {
    fMasterReplica = NULL;
    FramedSource::handleClosure(replica);
  }
}

void StreamReplicator::deliverReceivedFrame() {
  // The 'master replica' has received its copy of the current frame.
  // Copy it (and complete delivery) to any other replica that has requested this frame.
  // Then, if no more requests for this frame are expected, complete delivery to the 'master replica' itself.
  StreamReplica* replica;
  while ((replica = fReplicasAwaitingCurrentFrame) != NULL) {
    fReplicasAwaitingCurrentFrame = replica->fNext;
    replica->fNext = NULL;

    replica->receivingFrame = true;

#ifdef DEBUG_REPLICATOR
    if (replica->justActivated == true) {
      envir() << "StreamReplicator DEBUG: copying frame to (re)activated replica " << replica->myID << "\n";
      replica->justActivated = false;
    }
#endif

    // Assert: fMasterReplica != NULL
    if (fMasterReplica == NULL)
      // fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 1!\n"); // shouldn't happen
      envir() << "StreamReplicator::deliverReceivedFrame() Internal Error 1!\n"; // shouldn't happen
    StreamReplica::copyReceivedFrame(replica, fMasterReplica);
    replica->fFrameIndex = 1 - replica->fFrameIndex; // toggle it (0<->1), because this replica no longer awaits the current frame
    ++fNumDeliveriesMadeSoFar;

    // Assert: fNumDeliveriesMadeSoFar < fNumActiveReplicas; // because we still have the 'master replica' to deliver to
    if (!(fNumDeliveriesMadeSoFar < fNumActiveReplicas))
      // fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 2(%d,%d)!\n", fNumDeliveriesMadeSoFar, fNumActiveReplicas); // should not happen
      envir() << "StreamReplicator::deliverReceivedFrame() Internal Error 2(" << fNumDeliveriesMadeSoFar << "," << fNumActiveReplicas << ")!\n"; // should not happen

    // Complete delivery to this replica:
    FramedSource::afterGetting(replica);

    replica->receivingFrame = false;
  }

  if (fNumDeliveriesMadeSoFar == fNumActiveReplicas - 1 && fMasterReplica != NULL) {
    // No more requests for this frame are expected, so complete delivery to the 'master replica':
#ifdef DEBUG_REPLICATOR
    if (fMasterReplica->justActivated == true) {
      envir() << "StreamReplicator DEBUG: completing delivery to (re)activated master replica " << fMasterReplica->myID << "\n";
      fMasterReplica->justActivated = false;
    }
#endif

    replica = fMasterReplica;
    fMasterReplica = NULL;
    replica->fFrameIndex = 1 - replica->fFrameIndex; // toggle it (0<->1), because this replica no longer awaits the current frame
    fFrameIndex = 1 - fFrameIndex; // toggle it (0<->1) for the next frame
    fNumDeliveriesMadeSoFar = 0; // reset for the next frame

    if (fReplicasAwaitingNextFrame != NULL) {
      // One of the other replicas has already requested the next frame, so make it the next 'master replica':
      fMasterReplica = fReplicasAwaitingNextFrame;
      fReplicasAwaitingNextFrame = fReplicasAwaitingNextFrame->fNext;
      fMasterReplica->fNext = NULL;

      // Arrange to read the next frame into this replica's buffer:
      if (fInputSource != NULL) fInputSource->getNextFrame(fMasterReplica->fTo, fMasterReplica->fMaxSize,
							   afterGettingFrame, this, onSourceClosure, this);
    }      

    // Move any other replicas that had already requested the next frame to the 'requesting current frame' list:
    // Assert: fReplicasAwaitingCurrentFrame == NULL;
    if (!(fReplicasAwaitingCurrentFrame == NULL))
      // fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 3!\n"); // should not happen
      envir() << "StreamReplicator::deliverReceivedFrame() Internal Error 3!\n"; // should not happen
    fReplicasAwaitingCurrentFrame = fReplicasAwaitingNextFrame;
    fReplicasAwaitingNextFrame = NULL;
    
    FramedSource::afterGetting(replica);
  }
}


////////// StreamReplica implementation //////////

StreamReplica::StreamReplica(StreamReplicator& ourReplicator, const char * ID)
  : FramedSource(ourReplicator.envir()),
    fOurReplicator(ourReplicator), fFrameIndex(-1/*we haven't started playing yet*/), fNext(NULL),
    myID(NULL), justActivated(false), receivingFrame(false) {

  myID = strDup(ID);
#ifdef DEBUG_REPLICATOR
  envir() << "StreamReplicator DEBUG: creating stream replica " << myID << "\n";
#endif
}

StreamReplica::~StreamReplica() {
#ifdef DEBUG_REPLICATOR
  envir() << "StreamReplicator DEBUG: deleting stream replica " << myID << "\n";
#endif
  fOurReplicator.removeStreamReplica(this);
  delete [] myID;
}

void StreamReplica::doGetNextFrame() {
#ifdef DEBUG_REPLICATOR
  // envir() << "StreamReplicator DEBUG: doGetNextFrame() called for " << myID << " replica\n";
#endif
  fOurReplicator.getNextFrame(this);
}

void StreamReplica::doStopGettingFrames() {
#ifdef DEBUG_REPLICATOR
  // envir() << "StreamReplicator DEBUG: doStopGettingFrames() called for " << myID << " replica\n";
#endif
  fFrameIndex = -1; // When we start reading again, this will tell the replicator that we were previously inactive.
  fOurReplicator.deactivateStreamReplica(this);
}

void StreamReplica::copyReceivedFrame(StreamReplica* toReplica, StreamReplica* fromReplica) {
  // First, figure out how much data to copy.  ("toReplica" might have a smaller buffer than "fromReplica".)
  unsigned numNewBytesToTruncate
    = toReplica->fMaxSize < fromReplica->fFrameSize ? fromReplica->fFrameSize - toReplica->fMaxSize : 0;
  toReplica->fFrameSize = fromReplica->fFrameSize - numNewBytesToTruncate;
  toReplica->fNumTruncatedBytes = fromReplica->fNumTruncatedBytes + numNewBytesToTruncate;

  memmove(toReplica->fTo, fromReplica->fTo, toReplica->fFrameSize);
  toReplica->fPresentationTime = fromReplica->fPresentationTime;
  toReplica->fDurationInMicroseconds = fromReplica->fDurationInMicroseconds;
}
/**********
This library is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by the
Free Software Foundation; either version 2.1 of the License, or (at your
option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)

This library is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
more details.

You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
**********/
// Copyright (c) 1996-2013, Live Networks, Inc.  All rights reserved
// A demo application that receives a UDP multicast stream, replicates it (using the "FrameReplicator" class),
// and retransmits one replica stream to another (multicast or unicast) address & port,
// and writes the other replica stream to a file.
//
// main program

#include <liveMedia.hh>
#include "BasicUsageEnvironment.hh"
#include "GroupsockHelper.hh"
#include "OutputFile.hh"

UsageEnvironment* env;

// To receive a "source-specific multicast" (SSM) stream, uncomment this:
//#define USE_SSM 1


class OurFileSink : public FileSink {
  public:
    static OurFileSink * createNew(UsageEnvironment & env, StreamReplicator * replicator,
            char const* outputFileName, unsigned bufferSize = 100000) {
        do {
            FILE* fid;
            fid = OpenOutputFile(env, outputFileName);
            if (fid == NULL) break;

            return new OurFileSink(env, fid, replicator, bufferSize);
        } while (0);
        return NULL;
    }

    virtual void stopPlaying() {
        MediaSink::stopPlaying();

        // schedule to restart in 10 secs
        envir() << "OurFileSink has stopped! Scheduling to restart 10 secs from now.\n";
        envir().taskScheduler().scheduleDelayedTask(10000000, (TaskFunc *) OurFileSink::startPlaying, this);
    }

    static void startPlaying(void * data) {
        OurFileSink * fileSink = reinterpret_cast<OurFileSink *>(data);
        MediaSink * mediaSink = reinterpret_cast<MediaSink *>(data);

        *env << "OurFileSink trying to start...\n";
        mediaSink->startPlaying(*(fileSink->source), NULL, NULL);
    }

  private:
    OurFileSink(UsageEnvironment & env, FILE* fid, StreamReplicator * replicator, unsigned bufferSize)
        : FileSink(env, fid, bufferSize, NULL), replicator(replicator), source(NULL) {
            // Begin by creating an input stream from our replicator:
            source = replicator->createStreamReplica("OurFileSink");
    }

    StreamReplicator * replicator;
    FramedSource * source;
};


void startReplicaUDPSink(StreamReplicator* replicator, char const* outputAddressStr, portNumBits outputPortNum); // forward
// void startReplicaFileSink(StreamReplicator* replicator, char const* outputFileName); // forward
// void fileSinkAfterPlaying(void* clientData);
// void fileSinkStartPlaying(void* clientData);


int main(int argc, char** argv) {
  // Begin by setting up our usage environment:
  TaskScheduler* scheduler = BasicTaskScheduler::createNew();
  env = BasicUsageEnvironment::createNew(*scheduler);

#define TRANSPORT_PACKET_SIZE 188
#define TRANSPORT_PACKETS_PER_NETWORK_PACKET 7
  // Create the video source:
  unsigned const inputDataChunkSize
    = TRANSPORT_PACKETS_PER_NETWORK_PACKET*TRANSPORT_PACKET_SIZE;
  ByteStreamFileSource* fileSource
    = ByteStreamFileSource::createNew(*env, "bipbop-gear1-all.ts", inputDataChunkSize);
  if (fileSource == NULL) return NULL;
  MPEG2TransportStreamFramer* source = MPEG2TransportStreamFramer::createNew(*env, fileSource);

  // And feed this into a 'stream replicator':
  StreamReplicator* replicator = StreamReplicator::createNew(*env, source);

  // Then create a network (UDP) 'sink' object to receive a replica of the input stream, and start it.
  // If you wish, you can duplicate this line - with different network addresses and ports - to create multiple output UDP streams:
  startReplicaUDPSink(replicator, "239.255.43.43", 4444);

  // Then create a file 'sink' object to receive a replica of the input stream, and start it.
  // If you wish, you can duplicate this line - with a different file name - to create multiple output files:
  // startReplicaFileSink(replicator, "test.out");

  // do the above but with OurFileSink (starts playing immediately)
  OurFileSink * fileSink = OurFileSink::createNew(*env, replicator, "test.out");
  OurFileSink::startPlaying(fileSink);

  // Finally, enter the 'event loop' (which is where most of the 'real work' in a LIVE555-based application gets done):
  env->taskScheduler().doEventLoop(); // does not return

  return 0; // only to prevent compiler warning
}

void startReplicaUDPSink(StreamReplicator* replicator, char const* outputAddressStr, portNumBits outputPortNum) {
  // Begin by creating an input stream from our replicator:
  FramedSource* source = replicator->createStreamReplica("UDPSink");

  // Create a 'groupsock' for the destination address and port:
  struct in_addr outputAddress;
  outputAddress.s_addr = our_inet_addr(outputAddressStr);

  Port const outputPort(outputPortNum);
  unsigned char const outputTTL = 255;

  Groupsock* outputGroupsock = new Groupsock(*env, outputAddress, outputPort, outputTTL);

  // Then create a liveMedia 'sink' object, encapsulating this groupsock:
  unsigned const maxPacketSize = 65536; // allow for large UDP packets
  MediaSink* sink = BasicUDPSink::createNew(*env, outputGroupsock, maxPacketSize);

  // Now, start playing, feeding the sink object from the source:
  sink->startPlaying(*source, NULL, NULL);
}

/*
void startReplicaFileSink(StreamReplicator* replicator, char const* outputFileName) {
  // Begin by creating an input stream from our replicator:
  fileSinkSource = replicator->createStreamReplica("FileSink");

  // Then create a 'file sink' object to receive thie replica stream:
  fileSink = FileSink::createNew(*env, outputFileName);

  // Now, start playing, feeding the sink object from the source:
  fileSink->startPlaying(*fileSinkSource, fileSinkAfterPlaying, fileSink);
}
*/

/*
void fileSinkAfterPlaying(void* clientData) {
    *env << "FileSink stopped! Please check upstream source read task schedule\n";

    env->taskScheduler().scheduleDelayedTask(10000000,
                (TaskFunc *) &fileSinkStartPlaying, clientData);
}
*/

/*
void fileSinkStartPlaying(void* clientData) {
    *env << "FileSink restarting\n";

    fileSink->startPlaying(*fileSinkSource, fileSinkAfterPlaying, fileSink);
}
*/
_______________________________________________
live-devel mailing list
live-devel@lists.live555.com
http://lists.live555.com/mailman/listinfo/live-devel

Reply via email to