From 6084127b2918bbb5b6a0986692e7f0f0349eed13 Mon Sep 17 00:00:00 2001 From: Georg Chini Date: Tue, 20 Feb 2018 21:45:01 +0100 Subject: [PATCH] bluez5-device: Rewrite of thread function, reduce send buffer size for a2dp sink The rewrite of the thread function does not change functionality much, most of it is only cleanup, minor bug fixing and documentation work. This patch also changes the send buffer size for a2dp sink to avoid lags after temporary connection drops, following the proof-of-concept patch posted by Dmitry Kalyanov. Bug-Link: https://bugs.freedesktop.org/show_bug.cgi?id=58746 --- src/modules/bluetooth/module-bluez5-device.c | 290 +++++++++++++++++---------- 1 file changed, 188 insertions(+), 102 deletions(-) diff --git a/src/modules/bluetooth/module-bluez5-device.c b/src/modules/bluetooth/module-bluez5-device.c index 7970dda7..1c23335f 100644 --- a/src/modules/bluetooth/module-bluez5-device.c +++ b/src/modules/bluetooth/module-bluez5-device.c @@ -56,7 +56,6 @@ PA_MODULE_LOAD_ONCE(false); PA_MODULE_USAGE("path=" "autodetect_mtu="); -#define MAX_PLAYBACK_CATCH_UP_USEC (100 * PA_USEC_PER_MSEC) #define FIXED_LATENCY_PLAYBACK_A2DP (25 * PA_USEC_PER_MSEC) #define FIXED_LATENCY_PLAYBACK_SCO (125 * PA_USEC_PER_MSEC) #define FIXED_LATENCY_RECORD_A2DP (25 * PA_USEC_PER_MSEC) @@ -660,6 +659,35 @@ static int a2dp_process_push(struct userdata *u) { return ret; } +static void update_buffer_size(struct userdata *u) { + int old_bufsize; + socklen_t len = sizeof(int); + int ret; + + ret = getsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &old_bufsize, &len); + if (ret == -1) { + pa_log_warn("Changing bluetooth buffer size: Failed to getsockopt(SO_SNDBUF): %s", pa_cstrerror(errno)); + } else { + unsigned n; + int new_bufsize; + + /* Set send buffer size as small as possible. The minimum value is 1024 according to + * the socket man page, so let's use the next multiple of write_link_mtu which is + * larger than 1024. If write_link_mtu is larger than 1024, use 2 * write_link_mtu */ + n = 1024 / u->write_link_mtu + 1; + if (n == 1) + n = 2; + + new_bufsize = n * u->write_link_mtu; + + ret = setsockopt(u->stream_fd, SOL_SOCKET, SO_SNDBUF, &new_bufsize, len); + if (ret == -1) + pa_log_warn("Changing bluetooth buffer size: Failed to change from %d to %d: %s", old_bufsize, new_bufsize, pa_cstrerror(errno)); + else + pa_log_info("Changing bluetooth buffer size: Changed from %d to %d", old_bufsize, new_bufsize); + } +} + /* Run from I/O thread */ static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) { struct sbc_info *sbc_info; @@ -694,6 +722,8 @@ static void a2dp_set_bitpool(struct userdata *u, uint8_t bitpool) { pa_sink_set_max_request_within_thread(u->sink, u->write_block_size); pa_sink_set_fixed_latency_within_thread(u->sink, FIXED_LATENCY_PLAYBACK_A2DP + pa_bytes_to_usec(u->write_block_size, &u->sample_spec)); + + update_buffer_size(u); } /* Run from I/O thread */ @@ -852,8 +882,10 @@ static void setup_stream(struct userdata *u) { pa_log_debug("Stream properly set up, we're ready to roll!"); - if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) + if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) { a2dp_set_bitpool(u, u->sbc_info.max_bitpool); + update_buffer_size(u); + } u->rtpoll_item = pa_rtpoll_item_new(u->rtpoll, PA_RTPOLL_NEVER, 1); pollfd = pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL); @@ -861,7 +893,7 @@ static void setup_stream(struct userdata *u) { pollfd->events = pollfd->revents = 0; u->read_index = u->write_index = 0; - u->started_at = 0; + u->started_at = pa_rtclock_now(); u->stream_setup_done = true; if (u->source) @@ -1407,12 +1439,32 @@ static int init_profile(struct userdata *u) { return r; } +static int write_block(struct userdata *u) { + int n_written; + + if (u->write_index <= 0) + u->started_at = pa_rtclock_now(); + + if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) { + if ((n_written = a2dp_process_render(u)) < 0) + return -1; + } else { + if ((n_written = sco_process_render(u)) < 0) + return -1; + } + + if (n_written == 0) + pa_log("Broken kernel: we got EAGAIN on write() after POLLOUT!"); + + return n_written; +} + + /* I/O thread function */ static void thread_func(void *userdata) { struct userdata *u = userdata; - unsigned do_write = 0; - unsigned pending_read_bytes = 0; - bool writable = false; + unsigned blocks_to_write = 0; + unsigned bytes_to_write = 0; pa_assert(u); pa_assert(u->transport); @@ -1432,9 +1484,13 @@ static void thread_func(void *userdata) { struct pollfd *pollfd; int ret; bool disable_timer = true; + bool writable = false; + bool have_source = u->source ? PA_SOURCE_IS_LINKED(u->source->thread_info.state) : false; + bool have_sink = u->sink ? PA_SINK_IS_LINKED(u->sink->thread_info.state) : false; pollfd = u->rtpoll_item ? pa_rtpoll_item_get_pollfd(u->rtpoll_item, NULL) : NULL; + /* Check for stream error or close */ if (pollfd && (pollfd->revents & ~(POLLOUT|POLLIN))) { pa_log_info("FD error: %s%s%s%s", pollfd->revents & POLLERR ? "POLLERR " :"", @@ -1445,147 +1501,177 @@ static void thread_func(void *userdata) { if (pollfd->revents & POLLHUP) { pollfd = NULL; teardown_stream(u); - do_write = 0; - pending_read_bytes = 0; + blocks_to_write = 0; + bytes_to_write = 0; writable = false; pa_asyncmsgq_post(pa_thread_mq_get()->outq, PA_MSGOBJECT(u->msg), BLUETOOTH_MESSAGE_STREAM_FD_HUP, NULL, 0, NULL, NULL); } else goto fail; } - if (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state)) { + /* If there is a pollfd, the stream is set up and we need to do something */ + if (pollfd) { - /* We should send two blocks to the device before we expect - * a response. */ + /* Handle source if present */ + if (have_source) { - if (u->write_index == 0 && u->read_index <= 0) - do_write = 2; + /* We should send two blocks to the device before we expect a response. */ + if (u->write_index == 0 && u->read_index <= 0) + blocks_to_write = 2; - if (pollfd && (pollfd->revents & POLLIN)) { - int n_read; + /* If we got woken up by POLLIN let's do some reading */ + if (pollfd->revents & POLLIN) { + int n_read; - if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE) - n_read = a2dp_process_push(u); - else - n_read = sco_process_push(u); + if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SOURCE) + n_read = a2dp_process_push(u); + else + n_read = sco_process_push(u); - if (n_read < 0) - goto fail; + if (n_read < 0) + goto fail; - if (n_read > 0) { - /* We just read something, so we are supposed to write something, too */ - pending_read_bytes += n_read; - do_write += pending_read_bytes / u->write_block_size; - pending_read_bytes = pending_read_bytes % u->write_block_size; + if (n_read > 0) { + /* We just read something, so we are supposed to write something, too */ + bytes_to_write += n_read; + blocks_to_write += bytes_to_write / u->write_block_size; + bytes_to_write = bytes_to_write % u->write_block_size; + } } } - } - if (u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state)) { + /* Handle sink if present */ + if (have_sink) { - if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) - pa_sink_process_rewind(u->sink, 0); + /* Process rewinds */ + if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) + pa_sink_process_rewind(u->sink, 0); - if (pollfd) { + /* Test if the stream is writable */ if (pollfd->revents & POLLOUT) writable = true; - if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0 && writable) { - pa_usec_t time_passed; - pa_usec_t audio_sent; - - /* Hmm, there is no input stream we could synchronize - * to. So let's do things by time */ + /* If we have a source, we let the source determine the timing + * for the sink */ + if (have_source) { - time_passed = pa_rtclock_now() - u->started_at; - audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec); + if (writable && blocks_to_write > 0) { + int result; - if (audio_sent <= time_passed) { - pa_usec_t audio_to_send = time_passed - audio_sent; - - /* Never try to catch up for more than 100ms */ - if (u->write_index > 0 && audio_to_send > MAX_PLAYBACK_CATCH_UP_USEC) { - pa_usec_t skip_usec; - uint64_t skip_bytes; - - skip_usec = audio_to_send - MAX_PLAYBACK_CATCH_UP_USEC; - skip_bytes = pa_usec_to_bytes(skip_usec, &u->sample_spec); + if ((result = write_block(u)) < 0) + goto fail; - if (skip_bytes > 0) { - pa_memchunk tmp; + blocks_to_write -= result; + writable = false; + } - pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream", - (unsigned long long) skip_usec, - (unsigned long long) skip_bytes); + /* There is no source, we have to use the system clock for timing */ + } else { + bool have_written = false; - pa_sink_render_full(u->sink, skip_bytes, &tmp); - pa_memblock_unref(tmp.memblock); - u->write_index += skip_bytes; + /* If the stream is writable, send some data if necessary */ + if (writable) { + pa_usec_t time_passed; + pa_usec_t audio_sent; - if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) - a2dp_reduce_bitpool(u); + time_passed = pa_rtclock_now() - u->started_at; + audio_sent = pa_bytes_to_usec(u->write_index, &u->sample_spec); + + /* A new block needs to be sent. */ + if (audio_sent <= time_passed) { + size_t bytes_to_send = pa_usec_to_bytes(time_passed - audio_sent, &u->sample_spec); + + /* There is more than one block that needs to be written. + * We cannot catch up, therefore discard everything older + * than two block sizes */ + if (bytes_to_send > 2 * u->write_block_size) { + uint64_t skip_bytes; + + skip_bytes = bytes_to_send - 2 * u->write_block_size; + + if (skip_bytes > 0) { + pa_memchunk tmp; + size_t mempool_max_block_size = pa_mempool_block_size_max(u->core->mempool); + pa_usec_t skip_usec = pa_bytes_to_usec(skip_bytes, &u->sample_spec); + + pa_log_warn("Skipping %llu us (= %llu bytes) in audio stream", + (unsigned long long) skip_usec, + (unsigned long long) skip_bytes); + + while (skip_bytes > 0) { + size_t bytes_to_render; + + if (skip_bytes > mempool_max_block_size) + bytes_to_render = mempool_max_block_size; + else + bytes_to_render = skip_bytes; + + pa_sink_render_full(u->sink, bytes_to_render, &tmp); + pa_memblock_unref(tmp.memblock); + u->write_index += bytes_to_render; + skip_bytes -= bytes_to_render; + } + + if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) + a2dp_reduce_bitpool(u); + } } - } - do_write = 1; - pending_read_bytes = 0; - } - } + blocks_to_write = 1; + } - if (writable && do_write > 0) { - int n_written; + /* Send block */ + if (blocks_to_write > 0) { + int result; - if (u->write_index <= 0) - u->started_at = pa_rtclock_now(); + if ((result = write_block(u)) < 0) + goto fail; - if (u->profile == PA_BLUETOOTH_PROFILE_A2DP_SINK) { - if ((n_written = a2dp_process_render(u)) < 0) - goto fail; - } else { - if ((n_written = sco_process_render(u)) < 0) - goto fail; + blocks_to_write -= result; + writable = false; + have_written = true; + } } - if (n_written == 0) - pa_log("Broken kernel: we got EAGAIN on write() after POLLOUT!"); - - do_write -= n_written; - writable = false; + /* If nothing was written during this iteration, either the stream + * is not writable or there was no write pending. Set up a timer that + * will wake up the thread when the next data needs to be written. */ + if (!have_written) { + pa_usec_t sleep_for; + pa_usec_t time_passed, next_write_at; + + if (writable) { + /* There was no write pending on this iteration of the loop. + * Let's estimate when we need to wake up next */ + time_passed = pa_rtclock_now() - u->started_at; + next_write_at = pa_bytes_to_usec(u->write_index, &u->sample_spec); + sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0; + /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */ + } else + /* We could not write because the stream was not ready. Let's try + * again in 500 ms and drop audio if we still can't write. The + * thread will also be woken up when we can write again. */ + sleep_for = PA_USEC_PER_MSEC * 500; + + pa_rtpoll_set_timer_relative(u->rtpoll, sleep_for); + disable_timer = false; + } } + } - if ((!u->source || !PA_SOURCE_IS_LINKED(u->source->thread_info.state)) && do_write <= 0) { - pa_usec_t sleep_for; - pa_usec_t time_passed, next_write_at; + /* Set events to wake up the thread */ + pollfd->events = (short) (((have_sink && !writable) ? POLLOUT : 0) | (have_source ? POLLIN : 0)); - if (writable) { - /* Hmm, there is no input stream we could synchronize - * to. So let's estimate when we need to wake up the latest */ - time_passed = pa_rtclock_now() - u->started_at; - next_write_at = pa_bytes_to_usec(u->write_index, &u->sample_spec); - sleep_for = time_passed < next_write_at ? next_write_at - time_passed : 0; - /* pa_log("Sleeping for %lu; time passed %lu, next write at %lu", (unsigned long) sleep_for, (unsigned long) time_passed, (unsigned long)next_write_at); */ - } else - /* drop stream every 500 ms */ - sleep_for = PA_USEC_PER_MSEC * 500; - - pa_rtpoll_set_timer_relative(u->rtpoll, sleep_for); - disable_timer = false; - } - } } if (disable_timer) pa_rtpoll_set_timer_disabled(u->rtpoll); - /* Hmm, nothing to do. Let's sleep */ - if (pollfd) - pollfd->events = (short) (((u->sink && PA_SINK_IS_LINKED(u->sink->thread_info.state) && !writable) ? POLLOUT : 0) | - (u->source && PA_SOURCE_IS_LINKED(u->source->thread_info.state) ? POLLIN : 0)); - if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) { pa_log_debug("pa_rtpoll_run failed with: %d", ret); goto fail; } + if (ret == 0) { pa_log_debug("IO thread shutdown requested, stopping cleanly"); transport_release(u); -- 2.14.1