 libavformat/udp.c | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 129 insertions(+), 4 deletions(-)

diff --git a/libavformat/udp.c b/libavformat/udp.c
index ea80e52..ff676f4 100644
--- a/libavformat/udp.c
+++ b/libavformat/udp.c
@@ -92,6 +92,7 @@ typedef struct UDPContext {
     int circular_buffer_size;
     AVFifoBuffer *fifo;
     int circular_buffer_error;
+    int packet_gap; /* delay between transmitted packets */
 #if HAVE_PTHREAD_CANCEL
     pthread_t circular_buffer_thread;
     pthread_mutex_t mutex;
@@ -112,6 +113,7 @@ typedef struct UDPContext {
 #define E AV_OPT_FLAG_ENCODING_PARAM
 static const AVOption options[] = {
     { "buffer_size",    "System data size (in bytes)",                     OFFSET(buffer_size),    AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
+    { "packet_gap",     "Delay between packets, in usec",                  OFFSET(packet_gap),     AV_OPT_TYPE_INT,    { .i64 = 0  },     0, INT_MAX, .flags = E },
     { "localport",      "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, D|E },
     { "local_port",     "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
     { "localaddr",      "Local address",                                   OFFSET(localaddr),      AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
@@ -486,7 +488,7 @@ static int udp_get_file_handle(URLContext *h)
 }
 
 #if HAVE_PTHREAD_CANCEL
-static void *circular_buffer_task( void *_URLContext)
+static void *circular_buffer_task_rx( void *_URLContext)
 {
     URLContext *h = _URLContext;
     UDPContext *s = h->priv_data;
@@ -499,7 +501,7 @@ static void *circular_buffer_task( void *_URLContext)
         s->circular_buffer_error = AVERROR(EIO);
         goto end;
     }
-    while(1) {
+    for(;;) {
         int len;
 
         pthread_mutex_unlock(&s->mutex);
@@ -542,6 +544,81 @@ end:
     pthread_mutex_unlock(&s->mutex);
     return NULL;
 }
+
+static void do_udp_write(void *arg, void *buf, int size) {
+    URLContext *h = arg;
+    UDPContext *s = h->priv_data;
+
+    int ret;
+
+    if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
+        ret = ff_network_wait_fd(s->udp_fd, 1);
+        if (ret < 0) {
+            s->circular_buffer_error=ret;
+            return;
+        }
+    }
+
+    if (!s->is_connected) {
+        ret = sendto (s->udp_fd, buf, size, 0,
+                      (struct sockaddr *) &s->dest_addr,
+                      s->dest_addr_len);
+    } else
+        ret = send(s->udp_fd, buf, size, 0);
+
+    s->circular_buffer_error=ret;
+}
+
+static void *circular_buffer_task_tx( void *_URLContext)
+{
+    URLContext *h = _URLContext;
+    UDPContext *s = h->priv_data;
+    int old_cancelstate;
+
+    pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+
+    for(;;) {
+        int len;
+        uint8_t tmp[4];
+
+        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
+
+        av_usleep(s->packet_gap);
+
+        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
+
+        pthread_mutex_lock(&s->mutex);
+
+        len=av_fifo_size(s->fifo);
+
+        while (len<4) {
+            if (pthread_cond_wait(&s->cond, &s->mutex) < 0) {
+                goto end;
+            }
+            len=av_fifo_size(s->fifo);
+        }
+
+        av_fifo_generic_peek(s->fifo, tmp, 4, NULL);
+        len=AV_RL32(tmp);
+
+        if (len>0 && av_fifo_size(s->fifo)>=len) {
+            av_fifo_drain(s->fifo, 4); /* skip packet length */
+            av_fifo_generic_read(s->fifo, h, len, do_udp_write); /* use function for write from fifo buffer */
+            if (s->circular_buffer_error == len) {
+              /* all ok - reset error */
+              s->circular_buffer_error=0;
+            }
+        }
+
+        pthread_mutex_unlock(&s->mutex);
+    }
+
+end:
+    pthread_mutex_unlock(&s->mutex);
+    return NULL;
+}
+
+
 #endif
 
 static int parse_source_list(char *buf, char **sources, int *num_sources,
@@ -650,6 +727,13 @@ static int udp_open(URLContext *h, const char *uri, int flags)
                        "'circular_buffer_size' option was set but it is not supported "
                        "on this build (pthread support is required)\n");
         }
+        if (av_find_info_tag(buf, sizeof(buf), "packet_gap", p)) {
+            s->packet_gap = strtol(buf, NULL, 10);
+            if (!HAVE_PTHREAD_CANCEL)
+                av_log(h, AV_LOG_WARNING,
+                       "'packet_gap' option was set but it is not supported "
+                       "on this build (pthread support is required)\n");
+        }
         if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
             av_strlcpy(localaddr, buf, sizeof(localaddr));
         }
@@ -829,7 +913,18 @@ static int udp_open(URLContext *h, const char *uri, int flags)
     s->udp_fd = udp_fd;
 
 #if HAVE_PTHREAD_CANCEL
-    if (!is_output && s->circular_buffer_size) {
+    /*
+      Create thread in case of:
+      1. Input and circular_buffer_size is set
+      2. Output and packet_gap and circular_buffer_size is set
+    */
+
+    if (is_output && s->packet_gap && !s->circular_buffer_size) {
+      /* Warn user in case of 'circular_buffer_size' is not set */
+      av_log(h, AV_LOG_WARNING,"'packet_gap' option was set but 'circular_buffer_size' is not, but required\n");
+    }
+
+    if ((!is_output && s->circular_buffer_size) || (is_output && s->packet_gap && s->circular_buffer_size)) {
         int ret;
 
         /* start the task going */
@@ -844,7 +939,7 @@ static int udp_open(URLContext *h, const char *uri, int flags)
             av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
             goto cond_fail;
         }
-        ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h);
+        ret = pthread_create(&s->circular_buffer_thread, NULL, is_output?circular_buffer_task_tx:circular_buffer_task_rx, h);
         if (ret != 0) {
             av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
             goto thread_fail;
@@ -945,6 +1040,36 @@ static int udp_write(URLContext *h, const uint8_t *buf, int size)
     UDPContext *s = h->priv_data;
     int ret;
 
+#if HAVE_PTHREAD_CANCEL
+    if (s->fifo) {
+        uint8_t tmp[4];
+
+        pthread_mutex_lock(&s->mutex);
+
+        /*
+          Return error if last tx failed.
+          Here we can't know on which packet error was, but it needs to know that error exists.
+        */
+        if (s->circular_buffer_error<0) {
+            int err=s->circular_buffer_error;
+            s->circular_buffer_error=0;
+            pthread_mutex_unlock(&s->mutex);
+            return err;
+        }
+
+        if(av_fifo_space(s->fifo) < size + 4) {
+            /* What about a partial packet tx ? */
+            pthread_mutex_unlock(&s->mutex);
+            return AVERROR(ENOMEM);
+        }
+        AV_WL32(tmp, size);
+        av_fifo_generic_write(s->fifo, tmp, 4, NULL); /* size of packet */
+        av_fifo_generic_write(s->fifo, (uint8_t *)buf, size, NULL); /* the data */
+        pthread_cond_signal(&s->cond);
+        pthread_mutex_unlock(&s->mutex);
+        return size;
+    }
+#endif
     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
         ret = ff_network_wait_fd(s->udp_fd, 1);
         if (ret < 0)
