From c86cd23da11c0f9cc3b8bef7682fac4703d41260 Mon Sep 17 00:00:00 2001 From: Georg Chini Date: Mon, 19 Feb 2018 15:50:59 +0100 Subject: [PATCH] bluez5-device: Rewrite thread function and reduce send buffer size for a2dp sink This patch is intended to fix lag problems seen with a2dp sink. --- src/modules/bluetooth/module-bluez5-device.c | 319 +++++++++++++++++---------- 1 file changed, 205 insertions(+), 114 deletions(-) diff --git a/src/modules/bluetooth/module-bluez5-device.c b/src/modules/bluetooth/module-bluez5-device.c index a3315c03..0b4da102 100644 --- a/src/modules/bluetooth/module-bluez5-device.c +++ b/src/modules/bluetooth/module-bluez5-device.c @@ -680,6 +680,35 @@ static int a2dp_process_push(struct userdata *u) { return ret; } +static void update_buffer_size(struct userdata *u) { + int old_bufsize; + socklen_t len = sizeof(int); + int ret; + + ret = getsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &old_bufsize, &len); + if (ret == -1) { + pa_log_warn("Changing bluetooth buffer size: Failed to getsockopt(SO_SNDBUF): %s", pa_cstrerror(errno)); + } else { + unsigned n; + int new_bufsize; + + /* Set send buffer size as small as possible. The minimum value is 1024 according to + * the socket man page, so let's use the next multiple of write_link_mtu which is + * larger than 1024. If write_link_mtu is larger than 1024, use 2 * write_link_mtu */ + n = 1024 / u->write_link_mtu + 1; + if (n == 1) + n = 2; + + new_bufsize = n * u->write_link_mtu; + + ret = setsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &new_bufsize, len); + if (ret == -1) + pa_log_warn("Changing bluetooth buffer size: Failed to change from %d to %d: %s", old_bufsize, new_bufsize, pa_cstrerror(errno)); + else + pa_log_info("Changing bluetooth buffer size: Changed from %d to %d", old_bufsize, new_bufsize); + } +} + /* Run from I/O thread */ static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) { struct sbc_info *sbc_info; @@ -714,6 +743,8 @@ static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) { pa_sink_set_max_request_within_thread(u->sink, u->write_block_size); pa_sink_set_fixed_latency_within_thread(u->sink, FIXED_LATENCY_PLAYBACK_A2DP + pa_bytes_to_usec(u->write_block_size, &u->sample_spec)); + + update_buffer_size(u); } /* Run from I/O thread */ @@ -879,13 +910,15 @@ static void setup_stream(struct userdata *u) { if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) a2dp_set_bitpool(u, u->sbc_info.max_bitpool); + update_buffer_size(u); + u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); pollfd->fd = u->stream_fd; pollfd->events = pollfd->revents = 0; u->read_index = u->write_index = 0; - u->started_at = 0; + u->started_at = pa_rtclock_now(); u->stream_setup_done = true; if (u->source) @@ -1451,12 +1484,32 @@ static int init_profile(struct userdata *u) { return r; } +static int write_block(struct userdata *u) { + int n_written; + + if (u->write_index <= 0) + u->started_at = pa_rtclock_now(); + + if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) { + if ((n_written = a2dp_process_render(u)) < 0) + return -1; + } else { + if ((n_written = sco_process_render(u)) < 0) + return -1; + } + + if (n_written == 0) + pa_log("Broken kernel: we got EAGAIN on write() after POLLOUT!"); + + return n_written; +} + + /* I/O thread function */ static void thread_func(void *userdata) { struct userdata *u = userdata; - unsigned do_write = 0; - unsigned pending_read_bytes = 0; - bool writable = false; + unsigned blocks_to_write = 0; + unsigned bytes_to_write = 0; pa_assert(u); pa_assert(u->transport); @@ -1476,160 +1529,198 @@ static void thread_func(void *userdata) { struct pollfd *pollfd; int ret; bool disable_timer = true; + bool writable = false; + bool have_source = u->source ? PA_SOURCE_IS_LINKED(u->source->thread_info.state) : false; + bool have_sink = u->sink ? PA_SINK_IS_LINKED(u->sink->thread_info.state) : false; pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL; - if (pollfd && (pollfd->revents & ~(POLLOUT|POLLIN))) { - pa_log_info("FD error: %s%s%s%s", - pollfd->revents & POLLERR ? "POLLERR " :"", - pollfd->revents & POLLHUP ? "POLLHUP " :"", - pollfd->revents & POLLPRI ? "POLLPRI " :"", - pollfd->revents & POLLNVAL ? "POLLNVAL " :""); - - if (pollfd->revents & POLLHUP) { - pollfd = NULL; - teardown_stream(u); - do_write = 0; - pending_read_bytes = 0; - writable = false; - pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), BLUETOOTH_MESSAGE_STREAM_FD_HUP, NULL, 0, NULL, NULL); - } else - goto fail; + /* If there is a pollfd, the stream is set up and we need to do something */ + if (pollfd) { + + /* Check for stream error or close */ + if (pollfd->revents & ~(POLLOUT|POLLIN)) { + pa_log_info("FD error: %s%s%s%s", + pollfd->revents & POLLERR ? "POLLERR " :"", + pollfd->revents & POLLHUP ? "POLLHUP " :"", + pollfd->revents & POLLPRI ? "POLLPRI " :"", + pollfd->revents & POLLNVAL ? "POLLNVAL " :""); + + if (pollfd->revents & POLLHUP) { + pollfd = NULL; + teardown_stream(u); + blocks_to_write = 0; + bytes_to_write = 0; + writable = false; + pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), BLUETOOTH_MESSAGE_STREAM_FD_HUP, NULL, 0, NULL, NULL); + } else + goto fail; + } } - if (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state)) { + if (pollfd) { - /* We should send two blocks to the device before we expect - * a response. */ + /* Handle source if present */ + if (have_source) { - if (u->write_index == 0 && u->read_index <= 0) - do_write = 2; + /* We should send two blocks to the device before we expect a response. */ + if (u->write_index == 0 && u->read_index <= 0) + blocks_to_write = 2; - if (pollfd && (pollfd->revents & POLLIN)) { - int n_read; + /* If we got woken up by POLLIN let's do some reading */ + if (pollfd->revents & POLLIN) { + int n_read; - if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE) - n_read = a2dp_process_push(u); - else - n_read = sco_process_push(u); + if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE) + n_read = a2dp_process_push(u); + else + n_read = sco_process_push(u); - if (n_read < 0) - goto fail; + if (n_read < 0) + goto fail; - if (n_read > 0) { - /* We just read something, so we are supposed to write something, too */ - pending_read_bytes += n_read; - do_write += pending_read_bytes / u->write_block_size; - pending_read_bytes = pending_read_bytes % u->write_block_size; + if (n_read > 0) { + /* We just read something, so we are supposed to write something, too */ + bytes_to_write += n_read; + blocks_to_write += bytes_to_write / u->write_block_size; + bytes_to_write = bytes_to_write % u->write_block_size; + } } } - } - if (u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state)) { + /* Handle sink if present */ + if (have_sink) { - if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) - pa_sink_process_rewind(u->sink, 0); + /* Process rewinds */ + if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) + pa_sink_process_rewind(u->sink, 0); - if (pollfd) { + /* Test if the stream is writable */ if (pollfd->revents & POLLOUT) writable = true; - if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0 && writable) { - pa_usec_t time_passed; - pa_usec_t audio_sent; + /* If we have a source, we let the source determine the timing + * for the sink */ + if (have_source) { - /* Hmm, there is no input stream we could synchronize - * to. So let's do things by time */ + if (writable && blocks_to_write > 0) { + int result; - time_passed = pa_rtclock_now() - u->started_at; - audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec); - - if (audio_sent <= time_passed) { - pa_usec_t audio_to_send = time_passed - audio_sent; - - /* Never try to catch up for more than 100ms */ - if (u->write_index > 0 && audio_to_send > MAX_PLAYBACK_CATCH_UP_USEC) { - pa_usec_t skip_usec; - uint64_t skip_bytes; - - skip_usec = audio_to_send - MAX_PLAYBACK_CATCH_UP_USEC; - skip_bytes = pa_usec_to_bytes(skip_usec, &u->sample_spec); + if ((result = write_block(u)) < 0) + goto fail; - if (skip_bytes > 0) { - pa_memchunk tmp; + blocks_to_write -= result; + writable = false; + } - pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream", - (unsigned long long) skip_usec, - (unsigned long long) skip_bytes); + /* There is no source, we have to use the system clock for timing */ + } else { + bool have_written = false; - pa_sink_render_full(u->sink, skip_bytes, &tmp); - pa_memblock_unref(tmp.memblock); - u->write_index += skip_bytes; + /* If the stream is writable, send some data if necessary */ + if (writable) { + pa_usec_t time_passed; + pa_usec_t audio_sent; - if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) - a2dp_reduce_bitpool(u); + time_passed = pa_rtclock_now() - u->started_at; + audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec); + + /* A new block needs to be sent. */ + if (audio_sent <= time_passed) { + size_t bytes_to_send = pa_usec_to_bytes(time_passed - audio_sent, &u->sample_spec); + + /* There is more than one block that needs to be written. + * We cannot catch up, therefore discard everything older + * than two block sizes */ + if (bytes_to_send > 2 * u->write_block_size) { + uint64_t skip_bytes; + + skip_bytes = bytes_to_send - 2 * u->write_block_size; + + if (skip_bytes > 0) { + pa_memchunk tmp; + size_t mempool_max_block_size = pa_mempool_block_size_max(u->core->mempool); + pa_usec_t skip_usec = pa_bytes_to_usec(skip_bytes, &u->sample_spec); + + pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream", + (unsigned long long) skip_usec, + (unsigned long long) skip_bytes); + + while (skip_bytes > 0) { + size_t bytes_to_render; + + if (skip_bytes > mempool_max_block_size) + bytes_to_render = mempool_max_block_size; + else + bytes_to_render = skip_bytes; + + pa_sink_render_full(u->sink, bytes_to_render, &tmp); + pa_memblock_unref(tmp.memblock); + u->write_index += bytes_to_render; + skip_bytes -= bytes_to_render; + } + + if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) + a2dp_reduce_bitpool(u); + } } - } - do_write = 1; - pending_read_bytes = 0; - } - } + blocks_to_write = 1; + } - if (writable && do_write > 0) { - int n_written; + /* Send block */ + if (blocks_to_write > 0) { + int result; - if (u->write_index <= 0) - u->started_at = pa_rtclock_now(); + if ((result = write_block(u)) < 0) + goto fail; - if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) { - if ((n_written = a2dp_process_render(u)) < 0) - goto fail; - } else { - if ((n_written = sco_process_render(u)) < 0) - goto fail; + blocks_to_write -= result; + writable = false; + have_written = true; + } } - if (n_written == 0) - pa_log("Broken kernel: we got EAGAIN on write() after POLLOUT!"); - - do_write -= n_written; - writable = false; + /* If nothing was written during this iteration, either the pipe + * is not writable or there was no write pending. Set up a timer + * that will wake up the thread when the next data needs + * to be written. */ + if (!have_written) { + pa_usec_t sleep_for; + pa_usec_t time_passed, next_write_at; + + if (writable) { + /* There was no write pending on this iteration of the loop. + * Let's estimate when we need to wake up next */ + time_passed = pa_rtclock_now() - u->started_at; + next_write_at = pa_bytes_to_usec(u->write_index, &u->sample_spec); + sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0; + /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */ + } else + /* We could not write because the stream was not ready. Let's try + * again in 500 ms and drop audio if we still can't write. The + * thread will also be woken up when we can write again. */ + sleep_for = PA_USEC_PER_MSEC * 500; + + pa_rtpoll_set_timer_relative(u->rtpoll, sleep_for); + disable_timer = false; + } } + } - if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0) { - pa_usec_t sleep_for; - pa_usec_t time_passed, next_write_at; + /* Set events to wake up the thread */ + pollfd->events = (short) (((have_sink && !writable) ? POLLOUT : 0) | (have_source ? POLLIN : 0)); - if (writable) { - /* Hmm, there is no input stream we could synchronize - * to. So let's estimate when we need to wake up the latest */ - time_passed = pa_rtclock_now() - u->started_at; - next_write_at = pa_bytes_to_usec(u->write_index, &u->sample_spec); - sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0; - /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */ - } else - /* drop stream every 500 ms */ - sleep_for = PA_USEC_PER_MSEC * 500; - - pa_rtpoll_set_timer_relative(u->rtpoll, sleep_for); - disable_timer = false; - } - } } if (disable_timer) pa_rtpoll_set_timer_disabled(u->rtpoll); - /* Hmm, nothing to do. Let's sleep */ - if (pollfd) - pollfd->events = (short) (((u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state) && !writable) ? POLLOUT : 0) | - (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state) ? POLLIN : 0)); - if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) { pa_log_debug("pa_rtpoll_run failed with: %d", ret); goto fail; } + if (ret == 0) { pa_log_debug("IO thread shutdown requested, stopping cleanly"); transport_release(u); -- 2.14.1