Hi On Thu, Nov 10, 2016 at 6:47 AM Wei Wang <[email protected]> wrote:
> This patch enables a qemu server socket to be connected by multiple > client sockets. > > Thanks for sharing this early version of the series, I hope some early feedback will help you. I'll be waiting for a more complete implementation for detailed review. Is this patch necessary as a first step? I would rather start with a vhost-pci 1-1 Master-Slave series. Keep 1-n for a following improvement. This would also probably post-pone the discussion regarding connection-id, or uuid. In short, I think it would help if you can break your proposal in smaller independant steps. Signed-off-by: Wei Wang <[email protected]> > --- > include/sysemu/char.h | 64 ++++++- > qapi-schema.json | 3 +- > qemu-char.c | 512 > ++++++++++++++++++++++++++++++++++++++------------ > 3 files changed, 456 insertions(+), 123 deletions(-) > > diff --git a/include/sysemu/char.h b/include/sysemu/char.h > index ee7e554..ff5dda6 100644 > --- a/include/sysemu/char.h > +++ b/include/sysemu/char.h > @@ -58,17 +58,24 @@ struct ParallelIOArg { > > typedef void IOEventHandler(void *opaque, int event); > > +#define MAX_CLIENTS 256 > +#define ANONYMOUS_CLIENT (~((uint64_t)0)) > struct CharDriverState { > QemuMutex chr_write_lock; > void (*init)(struct CharDriverState *s); > int (*chr_write)(struct CharDriverState *s, const uint8_t *buf, int > len); > + int (*chr_write_n)(struct CharDriverState *s, uint64_t id, const > uint8_t *buf, int len); > int (*chr_sync_read)(struct CharDriverState *s, > const uint8_t *buf, int len); > + int (*chr_sync_read_n)(struct CharDriverState *s, uint64_t id, > + const uint8_t *buf, int len); > GSource *(*chr_add_watch)(struct CharDriverState *s, GIOCondition > cond); > void (*chr_update_read_handler)(struct CharDriverState *s); > int (*chr_ioctl)(struct CharDriverState *s, int cmd, void *arg); > int (*get_msgfds)(struct CharDriverState *s, int* fds, int num); > + int (*get_msgfds_n)(struct CharDriverState *s, uint64_t id, int* fds, > int num); > int (*set_msgfds)(struct CharDriverState *s, int *fds, int num); > + int (*set_msgfds_n)(struct CharDriverState *s, uint64_t id, int *fds, > int num); > int (*chr_add_client)(struct CharDriverState *chr, int fd); > int (*chr_wait_connected)(struct CharDriverState *chr, Error **errp); > IOEventHandler *chr_event; > @@ -77,6 +84,7 @@ struct CharDriverState { > void *handler_opaque; > void (*chr_close)(struct CharDriverState *chr); > void (*chr_disconnect)(struct CharDriverState *chr); > + void (*chr_disconnect_n)(struct CharDriverState *chr, uint64_t id); > void (*chr_accept_input)(struct CharDriverState *chr); > void (*chr_set_echo)(struct CharDriverState *chr, bool echo); > void (*chr_set_fe_open)(struct CharDriverState *chr, int fe_open); > @@ -91,7 +99,10 @@ struct CharDriverState { > int explicit_be_open; > int avail_connections; > int is_mux; > - guint fd_in_tag; > + guint fd_in_tag[MAX_CLIENTS]; > + uint64_t max_connections; > + unsigned long *conn_bitmap; > + uint64_t conn_id; > QemuOpts *opts; > bool replay; > QTAILQ_ENTRY(CharDriverState) next; > @@ -281,6 +292,20 @@ int qemu_chr_fe_write(CharDriverState *s, const > uint8_t *buf, int len); > int qemu_chr_fe_write_all(CharDriverState *s, const uint8_t *buf, int > len); > > /** > + * @qemu_chr_fe_write_all_n: > + * > + * Write data to the selected character backend from the front end. > + * > + * @id the connection id of the character backend > + * @buf the data > + * @len the number of bytes to send > + * > + * Returns: the number of bytes consumed > + */ > +int qemu_chr_fe_write_all_n(CharDriverState *s, uint64_t id, > + const uint8_t *buf, int len); > + > +/** > * @qemu_chr_fe_read_all: > * > * Read data to a buffer from the back end. > @@ -293,6 +318,20 @@ int qemu_chr_fe_write_all(CharDriverState *s, const > uint8_t *buf, int len); > int qemu_chr_fe_read_all(CharDriverState *s, uint8_t *buf, int len); > > /** > + * @qemu_chr_fe_read_all_n: > + * > + * Read data to a buffer from the selected back end. > + * > + * @id the connection id > + * @buf the data buffer > + * @len the number of bytes to read > + * > + * Returns: the number of bytes read > + */ > +int qemu_chr_fe_read_all_n(CharDriverState *s, uint64_t id, > + uint8_t *buf, int len); > + > +/** > * @qemu_chr_fe_ioctl: > * > * Issue a device specific ioctl to a backend. This function is > thread-safe. > @@ -331,6 +370,19 @@ int qemu_chr_fe_get_msgfd(CharDriverState *s); > */ > int qemu_chr_fe_get_msgfds(CharDriverState *s, int *fds, int num); > > + > +/** > + * @qemu_chr_fe_get_msgfds_n: > + * > + * The multi-client version of @qemu_chr_fe_get_msgfds. > + * > + * Returns: -1 if fd passing isn't supported or there are no pending file > + * descriptors. If file descriptors are returned, subsequent > calls to > + * this function will return -1 until a client sends a new set > of file > + * descriptors. > + */ > +int qemu_chr_fe_get_msgfds_n(CharDriverState *s, uint64_t id, int *fds, > int num); > + > /** > * @qemu_chr_fe_set_msgfds: > * > @@ -345,6 +397,16 @@ int qemu_chr_fe_get_msgfds(CharDriverState *s, int > *fds, int num); > int qemu_chr_fe_set_msgfds(CharDriverState *s, int *fds, int num); > > /** > + * @qemu_chr_fe_set_msgfds_n: > + * > + * The multi-client version of @qemu_chr_fe_set_msgfds. > + * > + * Returns: -1 if fd passing isn't supported. > + */ > +int qemu_chr_fe_set_msgfds_n(CharDriverState *s, uint64_t id, int *fds, > int num); > + > + > +/** > * @qemu_chr_fe_claim: > * > * Claim a backend before using it, should be called before calling > diff --git a/qapi-schema.json b/qapi-schema.json > index 5658723..9bb5d7d 100644 > --- a/qapi-schema.json > +++ b/qapi-schema.json > @@ -3327,7 +3327,8 @@ > '*wait' : 'bool', > '*nodelay' : 'bool', > '*telnet' : 'bool', > - '*reconnect' : 'int' }, > + '*reconnect' : 'int' , > + '*connections' : 'uint64' }, > 'base': 'ChardevCommon' } > > ## > diff --git a/qemu-char.c b/qemu-char.c > index 5f82ebb..dfad6d1 100644 > --- a/qemu-char.c > +++ b/qemu-char.c > @@ -265,6 +265,35 @@ static int qemu_chr_fe_write_buffer(CharDriverState > *s, const uint8_t *buf, int > return res; > } > > +static int qemu_chr_fe_write_buffer_n(CharDriverState *s, uint64_t id, > + const uint8_t *buf, int len, int > *offset) > +{ > + int res = 0; > + *offset = 0; > + > + qemu_mutex_lock(&s->chr_write_lock); > + while (*offset < len) { > + retry: > + res = s->chr_write_n(s, id, buf + *offset, len - *offset); > + if (res < 0 && errno == EAGAIN) { > + g_usleep(100); > + goto retry; > + } > + > + if (res <= 0) { > + break; > + } > + > + *offset += res; > + } > + if (*offset > 0) { > + qemu_chr_fe_write_log(s, buf, *offset); > + } > + qemu_mutex_unlock(&s->chr_write_lock); > + > + return res; > +} > + > int qemu_chr_fe_write(CharDriverState *s, const uint8_t *buf, int len) > { > int ret; > @@ -317,6 +346,31 @@ int qemu_chr_fe_write_all(CharDriverState *s, const > uint8_t *buf, int len) > return offset; > } > > +int qemu_chr_fe_write_all_n(CharDriverState *s, uint64_t id, > + const uint8_t *buf, int len) > +{ > + int offset; > + int res; > + > + if (s->replay && replay_mode == REPLAY_MODE_PLAY) { > + replay_char_write_event_load(&res, &offset); > + assert(offset <= len); > + qemu_chr_fe_write_buffer_n(s, id, buf, offset, &offset); > + return res; > + } > + > + res = qemu_chr_fe_write_buffer_n(s, id, buf, len, &offset); > + > + if (s->replay && replay_mode == REPLAY_MODE_RECORD) { > + replay_char_write_event_save(res, offset); > + } > + > + if (res < 0) { > + return res; > + } > + return offset; > +} > + > int qemu_chr_fe_read_all(CharDriverState *s, uint8_t *buf, int len) > { > int offset = 0, counter = 10; > @@ -325,7 +379,7 @@ int qemu_chr_fe_read_all(CharDriverState *s, uint8_t > *buf, int len) > if (!s->chr_sync_read) { > return 0; > } > - > + > if (s->replay && replay_mode == REPLAY_MODE_PLAY) { > return replay_char_read_all_load(buf); > } > @@ -362,6 +416,52 @@ int qemu_chr_fe_read_all(CharDriverState *s, uint8_t > *buf, int len) > return offset; > } > > +int qemu_chr_fe_read_all_n(CharDriverState *s, uint64_t id, > + uint8_t *buf, int len) > +{ > + int offset = 0, counter = 10; > + int res; > + > + if (!s->chr_sync_read_n) { > + return 0; > + } > + > + if (s->replay && replay_mode == REPLAY_MODE_PLAY) { > + return replay_char_read_all_load(buf); > + } > + > + while (offset < len) { > + retry: > + res = s->chr_sync_read_n(s, id, buf + offset, len - offset); > + if (res == -1 && errno == EAGAIN) { > + g_usleep(100); > + goto retry; > + } > + > + if (res == 0) { > + break; > + } > + > + if (res < 0) { > + if (s->replay && replay_mode == REPLAY_MODE_RECORD) { > + replay_char_read_all_save_error(res); > + } > + return res; > + } > + > + offset += res; > + > + if (!counter--) { > + break; > + } > + } > + > + if (s->replay && replay_mode == REPLAY_MODE_RECORD) { > + replay_char_read_all_save_buf(buf, offset); > + } > + return offset; > +} > + > int qemu_chr_fe_ioctl(CharDriverState *s, int cmd, void *arg) > { > int res; > @@ -417,11 +517,23 @@ int qemu_chr_fe_get_msgfds(CharDriverState *s, int > *fds, int len) > return s->get_msgfds ? s->get_msgfds(s, fds, len) : -1; > } > > +int qemu_chr_fe_get_msgfds_n(CharDriverState *s, > + uint64_t id, int *fds, int len) > +{ > + return s->get_msgfds_n ? s->get_msgfds_n(s, id, fds, len) : -1; > +} > + > int qemu_chr_fe_set_msgfds(CharDriverState *s, int *fds, int num) > { > return s->set_msgfds ? s->set_msgfds(s, fds, num) : -1; > } > > +int qemu_chr_fe_set_msgfds_n(CharDriverState *s, > + uint64_t id, int *fds, int num) > +{ > + return s->set_msgfds_n ? s->set_msgfds_n(s, id, fds, num) : -1; > +} > + > int qemu_chr_add_client(CharDriverState *s, int fd) > { > return s->chr_add_client ? s->chr_add_client(s, fd) : -1; > @@ -951,12 +1063,19 @@ static void io_remove_watch_poll(guint tag) > > static void remove_fd_in_watch(CharDriverState *chr) > { > - if (chr->fd_in_tag) { > - io_remove_watch_poll(chr->fd_in_tag); > - chr->fd_in_tag = 0; > + if (chr->fd_in_tag[0]) { > + io_remove_watch_poll(chr->fd_in_tag[0]); > + chr->fd_in_tag[0] = 0; > } > } > > +static void remove_fd_in_watch_n(CharDriverState *chr, uint64_t id) > +{ > + if (chr->fd_in_tag[id]) { > + io_remove_watch_poll(chr->fd_in_tag[id]); > + chr->fd_in_tag[id] = 0; > + } > +} > > static int io_channel_send_full(QIOChannel *ioc, > const void *buf, size_t len, > @@ -1063,7 +1182,7 @@ static void > fd_chr_update_read_handler(CharDriverState *chr) > > remove_fd_in_watch(chr); > if (s->ioc_in) { > - chr->fd_in_tag = io_add_watch_poll(s->ioc_in, > + chr->fd_in_tag[0] = io_add_watch_poll(s->ioc_in, > fd_chr_read_poll, > fd_chr_read, chr); > } > @@ -1410,8 +1529,8 @@ static void pty_chr_state(CharDriverState *chr, int > connected) > s->connected = 1; > s->open_tag = g_idle_add(qemu_chr_be_generic_open_func, chr); > } > - if (!chr->fd_in_tag) { > - chr->fd_in_tag = io_add_watch_poll(s->ioc, > + if (!chr->fd_in_tag[0]) { > + chr->fd_in_tag[0] = io_add_watch_poll(s->ioc, > pty_chr_read_poll, > pty_chr_read, chr); > } > @@ -2558,7 +2677,7 @@ static void > udp_chr_update_read_handler(CharDriverState *chr) > > remove_fd_in_watch(chr); > if (s->ioc) { > - chr->fd_in_tag = io_add_watch_poll(s->ioc, > + chr->fd_in_tag[0] = io_add_watch_poll(s->ioc, > udp_chr_read_poll, > udp_chr_read, chr); > } > @@ -2605,20 +2724,21 @@ static CharDriverState > *qemu_chr_open_udp(QIOChannelSocket *sioc, > /* TCP Net console */ > > typedef struct { > - QIOChannel *ioc; /* Client I/O channel */ > - QIOChannelSocket *sioc; /* Client master channel */ > + QIOChannel *ioc[MAX_CLIENTS]; /* Client I/O channels */ > + QIOChannelSocket *sioc[MAX_CLIENTS]; /* Client master channels */ > QIOChannelSocket *listen_ioc; > guint listen_tag; > QCryptoTLSCreds *tls_creds; > - int connected; > + int connected[MAX_CLIENTS]; > int max_size; > int do_telnetopt; > int do_nodelay; > int is_unix; > - int *read_msgfds; > - size_t read_msgfds_num; > - int *write_msgfds; > - size_t write_msgfds_num; > + int *read_msgfds[MAX_CLIENTS]; > + size_t read_msgfds_num[MAX_CLIENTS]; > + int *write_msgfds[MAX_CLIENTS]; > + size_t write_msgfds_num[MAX_CLIENTS]; > + uint64_t connections; > > SocketAddress *addr; > bool is_listen; > @@ -2634,7 +2754,7 @@ static gboolean socket_reconnect_timeout(gpointer > opaque); > static void qemu_chr_socket_restart_timer(CharDriverState *chr) > { > TCPCharDriver *s = chr->opaque; > - assert(s->connected == 0); > + assert(s->connected[0] == 0); > s->reconnect_timer = g_timeout_add_seconds(s->reconnect_time, > socket_reconnect_timeout, > chr); > } > @@ -2660,16 +2780,16 @@ static gboolean tcp_chr_accept(QIOChannel *chan, > static int tcp_chr_write(CharDriverState *chr, const uint8_t *buf, int > len) > { > TCPCharDriver *s = chr->opaque; > - if (s->connected) { > - int ret = io_channel_send_full(s->ioc, buf, len, > - s->write_msgfds, > - s->write_msgfds_num); > + if (s->connected[0]) { > + int ret = io_channel_send_full(s->ioc[0], buf, len, > + s->write_msgfds[0], > + s->write_msgfds_num[0]); > > /* free the written msgfds, no matter what */ > - if (s->write_msgfds_num) { > - g_free(s->write_msgfds); > - s->write_msgfds = 0; > - s->write_msgfds_num = 0; > + if (s->write_msgfds_num[0]) { > + g_free(s->write_msgfds[0]); > + s->write_msgfds[0] = 0; > + s->write_msgfds_num[0] = 0; > } > > return ret; > @@ -2679,11 +2799,41 @@ static int tcp_chr_write(CharDriverState *chr, > const uint8_t *buf, int len) > } > } > > +/* Called with chr_write_lock held. */ > +static int tcp_chr_write_n(CharDriverState *chr, uint64_t id, > + const uint8_t *buf, int len) > +{ > + TCPCharDriver *s = chr->opaque; > + if (s->connected[id]) { > + int ret = io_channel_send_full(s->ioc[id], buf, len, > + s->write_msgfds[id], > + s->write_msgfds_num[id]); > + > + /* free the written msgfds, no matter what */ > + if (s->write_msgfds_num[id]) { > + g_free(s->write_msgfds[id]); > + s->write_msgfds[id] = 0; > + s->write_msgfds_num[id] = 0; > + } > + > + return ret; > + } else { > + /* XXX: indicate an error ? */ > + return len; > + } > +} > + > static int tcp_chr_read_poll(void *opaque) > { > CharDriverState *chr = opaque; > TCPCharDriver *s = chr->opaque; > - if (!s->connected) > + uint64_t id; > + > + for (id = 0; id < s->connections; id++) { > + if (s->connected[id]) > + break; > + } > + if (id == s->connections) > return 0; > s->max_size = qemu_chr_be_can_write(chr); > return s->max_size; > @@ -2742,54 +2892,107 @@ static void > tcp_chr_process_IAC_bytes(CharDriverState *chr, > static int tcp_get_msgfds(CharDriverState *chr, int *fds, int num) > { > TCPCharDriver *s = chr->opaque; > - int to_copy = (s->read_msgfds_num < num) ? s->read_msgfds_num : num; > + int to_copy = (s->read_msgfds_num[0] < num) ? s->read_msgfds_num[0] : > num; > > assert(num <= TCP_MAX_FDS); > > if (to_copy) { > int i; > > - memcpy(fds, s->read_msgfds, to_copy * sizeof(int)); > + memcpy(fds, s->read_msgfds[0], to_copy * sizeof(int)); > > /* Close unused fds */ > - for (i = to_copy; i < s->read_msgfds_num; i++) { > - close(s->read_msgfds[i]); > + for (i = to_copy; i < s->read_msgfds_num[0]; i++) { > + close(s->read_msgfds[0][i]); > } > > - g_free(s->read_msgfds); > - s->read_msgfds = 0; > - s->read_msgfds_num = 0; > + g_free(s->read_msgfds[0]); > + s->read_msgfds[0] = 0; > + s->read_msgfds_num[0] = 0; > } > > return to_copy; > } > > +static int tcp_get_msgfds_n(CharDriverState *chr, uint64_t id, > + int *fds, int num) > +{ > + TCPCharDriver *s = chr->opaque; > + int to_copy = (s->read_msgfds_num[id] < num) ? s->read_msgfds_num[id] > : num; > + > + assert(num <= TCP_MAX_FDS); > + > + if (to_copy) { > + int i; > + > + memcpy(fds, s->read_msgfds[id], to_copy * sizeof(int)); > + > + /* Close unused fds */ > + for (i = to_copy; i < s->read_msgfds_num[id]; i++) { > + close(s->read_msgfds[id][i]); > + } > + > + g_free(s->read_msgfds[id]); > + s->read_msgfds[id] = 0; > + s->read_msgfds_num[id] = 0; > + } > + > + return to_copy; > +} > + > static int tcp_set_msgfds(CharDriverState *chr, int *fds, int num) > { > TCPCharDriver *s = chr->opaque; > > /* clear old pending fd array */ > - g_free(s->write_msgfds); > - s->write_msgfds = NULL; > - s->write_msgfds_num = 0; > + g_free(s->write_msgfds[0]); > + s->write_msgfds[0] = NULL; > + s->write_msgfds_num[0] = 0; > > - if (!s->connected || > - !qio_channel_has_feature(s->ioc, > + if (!s->connected[0] || > + !qio_channel_has_feature(s->ioc[0], > QIO_CHANNEL_FEATURE_FD_PASS)) { > return -1; > } > > if (num) { > - s->write_msgfds = g_new(int, num); > - memcpy(s->write_msgfds, fds, num * sizeof(int)); > + s->write_msgfds[0] = g_new(int, num); > + memcpy(s->write_msgfds[0], fds, num * sizeof(int)); > } > > - s->write_msgfds_num = num; > + s->write_msgfds_num[0] = num; > > return 0; > } > > -static ssize_t tcp_chr_recv(CharDriverState *chr, char *buf, size_t len) > +static int tcp_set_msgfds_n(CharDriverState *chr, uint64_t id, > + int *fds, int num) > +{ > + TCPCharDriver *s = chr->opaque; > + > + /* clear old pending fd array */ > + g_free(s->write_msgfds[id]); > + s->write_msgfds[id] = NULL; > + s->write_msgfds_num[id] = 0; > + > + if (!s->connected[id] || > + !qio_channel_has_feature(s->ioc[id], > + QIO_CHANNEL_FEATURE_FD_PASS)) { > + return -1; > + } > + > + if (num) { > + s->write_msgfds[id] = g_new(int, num); > + memcpy(s->write_msgfds[id], fds, num * sizeof(int)); > + } > + > + s->write_msgfds_num[id] = num; > + > + return 0; > +} > + > +static ssize_t tcp_chr_recv(CharDriverState *chr, uint64_t id, > + char *buf, size_t len) > { > TCPCharDriver *s = chr->opaque; > struct iovec iov = { .iov_base = buf, .iov_len = len }; > @@ -2798,12 +3001,12 @@ static ssize_t tcp_chr_recv(CharDriverState *chr, > char *buf, size_t len) > int *msgfds = NULL; > size_t msgfds_num = 0; > > - if (qio_channel_has_feature(s->ioc, QIO_CHANNEL_FEATURE_FD_PASS)) { > - ret = qio_channel_readv_full(s->ioc, &iov, 1, > + if (qio_channel_has_feature(s->ioc[id], QIO_CHANNEL_FEATURE_FD_PASS)) > { > + ret = qio_channel_readv_full(s->ioc[id], &iov, 1, > &msgfds, &msgfds_num, > NULL); > } else { > - ret = qio_channel_readv_full(s->ioc, &iov, 1, > + ret = qio_channel_readv_full(s->ioc[id], &iov, 1, > NULL, NULL, > NULL); > } > @@ -2817,20 +3020,20 @@ static ssize_t tcp_chr_recv(CharDriverState *chr, > char *buf, size_t len) > > if (msgfds_num) { > /* close and clean read_msgfds */ > - for (i = 0; i < s->read_msgfds_num; i++) { > - close(s->read_msgfds[i]); > + for (i = 0; i < s->read_msgfds_num[id]; i++) { > + close(s->read_msgfds[id][i]); > } > > - if (s->read_msgfds_num) { > - g_free(s->read_msgfds); > + if (s->read_msgfds_num[id]) { > + g_free(s->read_msgfds[id]); > } > > - s->read_msgfds = msgfds; > - s->read_msgfds_num = msgfds_num; > + s->read_msgfds[id] = msgfds; > + s->read_msgfds_num[id] = msgfds_num; > } > > - for (i = 0; i < s->read_msgfds_num; i++) { > - int fd = s->read_msgfds[i]; > + for (i = 0; i < s->read_msgfds_num[id]; i++) { > + int fd = s->read_msgfds[id][i]; > if (fd < 0) { > continue; > } > @@ -2849,47 +3052,47 @@ static ssize_t tcp_chr_recv(CharDriverState *chr, > char *buf, size_t len) > static GSource *tcp_chr_add_watch(CharDriverState *chr, GIOCondition cond) > { > TCPCharDriver *s = chr->opaque; > - return qio_channel_create_watch(s->ioc, cond); > + return qio_channel_create_watch(s->ioc[0], cond); > } > > -static void tcp_chr_free_connection(CharDriverState *chr) > +static void tcp_chr_free_connection(CharDriverState *chr, uint64_t id) > { > TCPCharDriver *s = chr->opaque; > int i; > > - if (!s->connected) { > + if (!s->connected[id]) { > return; > } > > - if (s->read_msgfds_num) { > - for (i = 0; i < s->read_msgfds_num; i++) { > - close(s->read_msgfds[i]); > + if (s->read_msgfds_num[id]) { > + for (i = 0; i < s->read_msgfds_num[id]; i++) { > + close(s->read_msgfds[id][i]); > } > - g_free(s->read_msgfds); > - s->read_msgfds = NULL; > - s->read_msgfds_num = 0; > + g_free(s->read_msgfds[id]); > + s->read_msgfds[id] = NULL; > + s->read_msgfds_num[id] = 0; > } > > - tcp_set_msgfds(chr, NULL, 0); > - remove_fd_in_watch(chr); > - object_unref(OBJECT(s->sioc)); > - s->sioc = NULL; > - object_unref(OBJECT(s->ioc)); > - s->ioc = NULL; > + tcp_set_msgfds_n(chr, id, NULL, 0); > + remove_fd_in_watch_n(chr, id); > + object_unref(OBJECT(s->sioc[id])); > + s->sioc[id] = NULL; > + object_unref(OBJECT(s->ioc[id])); > + s->ioc[id] = NULL; > g_free(chr->filename); > chr->filename = NULL; > - s->connected = 0; > + s->connected[id] = 0; > } > > -static void tcp_chr_disconnect(CharDriverState *chr) > +static void tcp_chr_disconnect_n(CharDriverState *chr, uint64_t id) > { > TCPCharDriver *s = chr->opaque; > > - if (!s->connected) { > + if (!s->connected[id]) { > return; > } > > - tcp_chr_free_connection(chr); > + tcp_chr_free_connection(chr, id); > > if (s->listen_ioc) { > s->listen_tag = qio_channel_add_watch( > @@ -2903,23 +3106,34 @@ static void tcp_chr_disconnect(CharDriverState > *chr) > } > } > > +static void tcp_chr_disconnect(CharDriverState *chr) > +{ > + tcp_chr_disconnect_n(chr, 0); > +} > + > static gboolean tcp_chr_read(QIOChannel *chan, GIOCondition cond, void > *opaque) > { > CharDriverState *chr = opaque; > TCPCharDriver *s = chr->opaque; > uint8_t buf[READ_BUF_LEN]; > int len, size; > + uint64_t id; > > - if (!s->connected || s->max_size <= 0) { > + for (id = 0; id < s->connections; id++) { > + if (s->ioc[id] == chan) > + break; > + } > + > + if ((id == s->connections) || !s->connected[id] || s->max_size <= 0) { > return TRUE; > } > len = sizeof(buf); > if (len > s->max_size) > len = s->max_size; > - size = tcp_chr_recv(chr, (void *)buf, len); > + size = tcp_chr_recv(chr, id, (void *)buf, len); > if (size == 0 || size == -1) { > /* connection closed */ > - tcp_chr_disconnect(chr); > + tcp_chr_disconnect_n(chr, id); > } else if (size > 0) { > if (s->do_telnetopt) > tcp_chr_process_IAC_bytes(chr, s, buf, &size); > @@ -2935,33 +3149,52 @@ static int tcp_chr_sync_read(CharDriverState *chr, > const uint8_t *buf, int len) > TCPCharDriver *s = chr->opaque; > int size; > > - if (!s->connected) { > + if (!s->connected[0]) { > + return 0; > + } > + > + size = tcp_chr_recv(chr, 0, (void *) buf, len); > + if (size == 0) { > + /* connection closed */ > + tcp_chr_disconnect_n(chr, 0); > + } > + > + return size; > +} > + > +static int tcp_chr_sync_read_n(CharDriverState *chr, uint64_t id, > + const uint8_t *buf, int len) > +{ > + TCPCharDriver *s = chr->opaque; > + int size; > + > + if (!s->connected[id]) { > return 0; > } > > - size = tcp_chr_recv(chr, (void *) buf, len); > + size = tcp_chr_recv(chr, id, (void *) buf, len); > if (size == 0) { > /* connection closed */ > - tcp_chr_disconnect(chr); > + tcp_chr_disconnect_n(chr, id); > } > > return size; > } > > -static void tcp_chr_connect(void *opaque) > +static void tcp_chr_connect(void *opaque, uint64_t id) > { > CharDriverState *chr = opaque; > TCPCharDriver *s = chr->opaque; > > g_free(chr->filename); > chr->filename = sockaddr_to_str( > - &s->sioc->localAddr, s->sioc->localAddrLen, > - &s->sioc->remoteAddr, s->sioc->remoteAddrLen, > + &s->sioc[id]->localAddr, s->sioc[id]->localAddrLen, > + &s->sioc[id]->remoteAddr, s->sioc[id]->remoteAddrLen, > s->is_listen, s->is_telnet); > > - s->connected = 1; > - if (s->ioc) { > - chr->fd_in_tag = io_add_watch_poll(s->ioc, > + s->connected[id] = 1; > + if (s->ioc[id]) { > + chr->fd_in_tag[id] = io_add_watch_poll(s->ioc[id], > tcp_chr_read_poll, > tcp_chr_read, chr); > } > @@ -2971,16 +3204,18 @@ static void tcp_chr_connect(void *opaque) > static void tcp_chr_update_read_handler(CharDriverState *chr) > { > TCPCharDriver *s = chr->opaque; > + uint64_t id; > > - if (!s->connected) { > - return; > - } > + for (id = 0; id < s->connections; id++) { > + if (!s->connected[id]) > + continue; > > - remove_fd_in_watch(chr); > - if (s->ioc) { > - chr->fd_in_tag = io_add_watch_poll(s->ioc, > - tcp_chr_read_poll, > - tcp_chr_read, chr); > + remove_fd_in_watch_n(chr, id); > + if (s->ioc[id]) { > + chr->fd_in_tag[id] = io_add_watch_poll(s->ioc[id], > + tcp_chr_read_poll, > + tcp_chr_read, chr); > + } > } > } > > @@ -3002,14 +3237,14 @@ static gboolean tcp_chr_telnet_init_io(QIOChannel > *ioc, > if (ret == QIO_CHANNEL_ERR_BLOCK) { > ret = 0; > } else { > - tcp_chr_disconnect(init->chr); > + tcp_chr_disconnect_n(init->chr, 0); > return FALSE; > } > } > init->buflen -= ret; > > if (init->buflen == 0) { > - tcp_chr_connect(init->chr); > + tcp_chr_connect(init->chr, 0); > return FALSE; > } > > @@ -3018,7 +3253,7 @@ static gboolean tcp_chr_telnet_init_io(QIOChannel > *ioc, > return TRUE; > } > > -static void tcp_chr_telnet_init(CharDriverState *chr) > +static void tcp_chr_telnet_init(CharDriverState *chr, uint64_t id) > { > TCPCharDriver *s = chr->opaque; > TCPCharDriverTelnetInit *init = > @@ -3045,7 +3280,7 @@ static void tcp_chr_telnet_init(CharDriverState *chr) > #undef IACSET > > qio_channel_add_watch( > - s->ioc, G_IO_OUT, > + s->ioc[id], G_IO_OUT, > tcp_chr_telnet_init_io, > init, NULL); > } > @@ -3059,18 +3294,18 @@ static void tcp_chr_tls_handshake(Object *source, > TCPCharDriver *s = chr->opaque; > > if (err) { > - tcp_chr_disconnect(chr); > + tcp_chr_disconnect_n(chr, 0); > } else { > if (s->do_telnetopt) { > - tcp_chr_telnet_init(chr); > + tcp_chr_telnet_init(chr, 0); > } else { > - tcp_chr_connect(chr); > + tcp_chr_connect(chr, 0); > } > } > } > > > -static void tcp_chr_tls_init(CharDriverState *chr) > +static void tcp_chr_tls_init(CharDriverState *chr, uint64_t id) > { > TCPCharDriver *s = chr->opaque; > QIOChannelTLS *tioc; > @@ -3078,21 +3313,21 @@ static void tcp_chr_tls_init(CharDriverState *chr) > > if (s->is_listen) { > tioc = qio_channel_tls_new_server( > - s->ioc, s->tls_creds, > + s->ioc[id], s->tls_creds, > NULL, /* XXX Use an ACL */ > &err); > } else { > tioc = qio_channel_tls_new_client( > - s->ioc, s->tls_creds, > + s->ioc[id], s->tls_creds, > s->addr->u.inet.data->host, > &err); > } > if (tioc == NULL) { > error_free(err); > - tcp_chr_disconnect(chr); > + tcp_chr_disconnect_n(chr, id); > } > - object_unref(OBJECT(s->ioc)); > - s->ioc = QIO_CHANNEL(tioc); > + object_unref(OBJECT(s->ioc[id])); > + s->ioc[id] = QIO_CHANNEL(tioc); > > qio_channel_tls_handshake(tioc, > tcp_chr_tls_handshake, > @@ -3100,36 +3335,52 @@ static void tcp_chr_tls_init(CharDriverState *chr) > NULL); > } > > +static int find_avail_ioc(TCPCharDriver *s, uint64_t *id) > +{ > + uint64_t i; > + > + for(i = 0; i < MAX_CLIENTS; i++) { > + if (s->ioc[i] == NULL) { > + *id = i; > + return 0; > + } > + } > + return -1; > +} > > static int tcp_chr_new_client(CharDriverState *chr, QIOChannelSocket > *sioc) > { > TCPCharDriver *s = chr->opaque; > - if (s->ioc != NULL) { > - return -1; > - } > + uint64_t id; > > - s->ioc = QIO_CHANNEL(sioc); > + if(find_avail_ioc(s, &id) < 0) > + return -1; > + > + s->ioc[id] = QIO_CHANNEL(sioc); > object_ref(OBJECT(sioc)); > - s->sioc = sioc; > + s->sioc[id] = sioc; > object_ref(OBJECT(sioc)); > + if(chr->conn_bitmap != NULL) > + set_bit(id, chr->conn_bitmap); > > - qio_channel_set_blocking(s->ioc, false, NULL); > + qio_channel_set_blocking(s->ioc[id], false, NULL); > > if (s->do_nodelay) { > - qio_channel_set_delay(s->ioc, false); > + qio_channel_set_delay(s->ioc[id], false); > } > +/* > if (s->listen_tag) { > g_source_remove(s->listen_tag); > s->listen_tag = 0; > } > - > +*/ > if (s->tls_creds) { > - tcp_chr_tls_init(chr); > + tcp_chr_tls_init(chr, id); > } else { > if (s->do_telnetopt) { > - tcp_chr_telnet_init(chr); > + tcp_chr_telnet_init(chr, id); > } else { > - tcp_chr_connect(chr); > + tcp_chr_connect(chr, id); > } > } > > @@ -3178,7 +3429,7 @@ static int tcp_chr_wait_connected(CharDriverState > *chr, Error **errp) > > /* It can't wait on s->connected, since it is set asynchronously > * in TLS and telnet cases, only wait for an accepted socket */ > - while (!s->ioc) { > + while (!s->ioc[0]) { > if (s->is_listen) { > fprintf(stderr, "QEMU waiting for connection on: %s\n", > chr->filename); > @@ -3211,9 +3462,11 @@ int qemu_chr_wait_connected(CharDriverState *chr, > Error **errp) > static void tcp_chr_close(CharDriverState *chr) > { > TCPCharDriver *s = chr->opaque; > + uint64_t id; > > - tcp_chr_free_connection(chr); > - > + for (id = 0; id < s->connections; id++) { > + tcp_chr_free_connection(chr, id); > + } > if (s->reconnect_timer) { > g_source_remove(s->reconnect_timer); > s->reconnect_timer = 0; > @@ -3721,6 +3974,7 @@ static void qemu_chr_parse_socket(QemuOpts *opts, > ChardevBackend *backend, > bool is_telnet = qemu_opt_get_bool(opts, "telnet", false); > bool do_nodelay = !qemu_opt_get_bool(opts, "delay", true); > int64_t reconnect = qemu_opt_get_number(opts, "reconnect", 0); > + uint64_t connections = qemu_opt_get_number(opts, "connections", 1); > const char *path = qemu_opt_get(opts, "path"); > const char *host = qemu_opt_get(opts, "host"); > const char *port = qemu_opt_get(opts, "port"); > @@ -3758,6 +4012,8 @@ static void qemu_chr_parse_socket(QemuOpts *opts, > ChardevBackend *backend, > sock->has_reconnect = true; > sock->reconnect = reconnect; > sock->tls_creds = g_strdup(tls_creds); > + sock->has_connections = true; > + sock->connections = connections; > > addr = g_new0(SocketAddress, 1); > if (path) { > @@ -4241,6 +4497,9 @@ QemuOptsList qemu_chardev_opts = { > },{ > .name = "logappend", > .type = QEMU_OPT_BOOL, > + },{ > + .name = "connections", > + .type = QEMU_OPT_NUMBER, > }, > { /* end of list */ } > }, > @@ -4413,6 +4672,7 @@ static CharDriverState > *qmp_chardev_open_socket(const char *id, > bool is_telnet = sock->has_telnet ? sock->telnet : false; > bool is_waitconnect = sock->has_wait ? sock->wait : false; > int64_t reconnect = sock->has_reconnect ? sock->reconnect : 0; > + uint64_t connections = sock->has_connections ? sock->connections : 1; > ChardevCommon *common = qapi_ChardevSocket_base(sock); > QIOChannelSocket *sioc = NULL; > > @@ -4426,6 +4686,7 @@ static CharDriverState > *qmp_chardev_open_socket(const char *id, > s->is_listen = is_listen; > s->is_telnet = is_telnet; > s->do_nodelay = do_nodelay; > + s->connections = connections; > if (sock->tls_creds) { > Object *creds; > creds = object_resolve_path_component( > @@ -4461,6 +4722,15 @@ static CharDriverState > *qmp_chardev_open_socket(const char *id, > > s->addr = QAPI_CLONE(SocketAddress, sock->addr); > > + if (sock->connections > 1) { > + chr->conn_bitmap = bitmap_new(sock->connections); > + chr->max_connections = sock->connections; > + chr->chr_write_n = tcp_chr_write_n; > + chr->chr_sync_read_n = tcp_chr_sync_read_n; > + chr->get_msgfds_n = tcp_get_msgfds_n; > + chr->set_msgfds_n = tcp_set_msgfds_n; > + chr->chr_disconnect_n = tcp_chr_disconnect_n; > + } > chr->opaque = s; > chr->chr_wait_connected = tcp_chr_wait_connected; > chr->chr_write = tcp_chr_write; > @@ -4478,10 +4748,12 @@ static CharDriverState > *qmp_chardev_open_socket(const char *id, > chr->filename = SocketAddress_to_str("disconnected:", > addr, is_listen, is_telnet); > > + chr->conn_id = ANONYMOUS_CLIENT; > if (is_listen) { > if (is_telnet) { > s->do_telnetopt = 1; > } > + chr->conn_id = 0; > } else if (reconnect > 0) { > s->reconnect_time = reconnect; > } > @@ -4502,11 +4774,9 @@ static CharDriverState > *qmp_chardev_open_socket(const char *id, > qemu_chr_wait_connected(chr, errp) < 0) { > goto error; > } > - if (!s->ioc) { > - s->listen_tag = qio_channel_add_watch( > + s->listen_tag = qio_channel_add_watch( > QIO_CHANNEL(s->listen_ioc), G_IO_IN, > tcp_chr_accept, chr, NULL); > - } > } else if (qemu_chr_wait_connected(chr, errp) < 0) { > goto error; > } > -- > 2.7.4 > > -- Marc-André Lureau
