diff --git a/spa/plugins/bluez5/iso-io.c b/spa/plugins/bluez5/iso-io.c index f53da2f4d..2cc65a2bf 100644 --- a/spa/plugins/bluez5/iso-io.c +++ b/spa/plugins/bluez5/iso-io.c @@ -41,6 +41,9 @@ SPA_LOG_TOPIC_DEFINE_STATIC(log_topic, "spa.bluez5.iso"); #define ISO_BUFFERING_AVG_PERIOD (50 * SPA_NSEC_PER_MSEC) #define ISO_BUFFERING_RATE_DIFF_MAX 0.05 +#define FLUSH_WAIT 3 +#define MIN_FILL 1 + struct clock_sync { /** Reference monotonic time for streams in the group */ int64_t base_time; @@ -67,9 +70,12 @@ struct group { int64_t next; int64_t duration_tx; int64_t duration_rx; - bool flush; + uint32_t flush; bool started; + struct spa_bt_ptp kernel_imbalance; + struct spa_bt_ptp stream_imbalance; + struct clock_sync rx_sync; }; @@ -134,21 +140,18 @@ static void stream_unlink(struct stream *stream) spa_assert_se(res == 0); } -static int stream_silence(struct stream *stream) +static int stream_silence_buf(struct stream *stream, uint8_t *buf, size_t max_size) { static uint8_t empty[EMPTY_BUF_SIZE] = {0}; - const size_t max_size = sizeof(stream->this.buf); int res, used, need_flush; size_t encoded; - stream->idle = true; - - res = used = stream->codec->start_encode(stream->this.codec_data, stream->this.buf, max_size, 0, 0); + res = used = stream->codec->start_encode(stream->this.codec_data, buf, max_size, 0, 0); if (res < 0) return res; res = stream->codec->encode(stream->this.codec_data, empty, stream->block_size, - SPA_PTROFF(stream->this.buf, used, void), max_size - used, &encoded, &need_flush); + SPA_PTROFF(buf, used, void), max_size - used, &encoded, &need_flush); if (res < 0) return res; @@ -157,7 +160,21 @@ static int stream_silence(struct stream *stream) if (!need_flush) return -EINVAL; - stream->this.size = used; + return used; +} + +static int stream_silence(struct stream *stream) +{ + const size_t max_size = sizeof(stream->this.buf); + int res; + + stream->idle = true; + + res = stream_silence_buf(stream, stream->this.buf, max_size); + if (res < 0) + return res; + + stream->this.size = res; return 0; } @@ -200,55 +217,114 @@ static void drop_rx(int fd) } while (res >= 0); } +static void reset_imbalance(struct group *group) +{ + spa_bt_ptp_init(&group->kernel_imbalance, 2*LATENCY_PERIOD, LATENCY_PERIOD); + spa_bt_ptp_init(&group->stream_imbalance, 2*LATENCY_PERIOD, LATENCY_PERIOD); +} + static bool group_latency_check(struct group *group) { struct stream *stream; - int32_t min_latency = INT32_MAX, max_latency = INT32_MIN; - unsigned int kernel_queue = UINT_MAX; + int32_t min_min = INT32_MAX, max_min = INT32_MIN; + int32_t min_kernel = INT32_MAX, max_kernel = INT32_MIN; - spa_list_for_each(stream, &group->streams, link) { - if (!stream->sink) - continue; - if (!stream->tx_latency.enabled) - return false; + /* + * Packet transport eg. over USB and in kernel (where there is no delay guarantee) + * can introduce delays in controller receiving the packets, and this may desync + * stream playback. From measurements, in steady state kernel+USB introduce +- 3 ms + * jitter. + * + * Since there's currently no way to sync to controller HW clock (as of kernel + * 6.18) and we cannot provide packet timestamps, controllers appear to fall back + * to guessing, and seem to sometimes get stuck in a state where streams are + * desynchronized. + * + * It appears many controllers also have bad implementations of the LE Read ISO TX + * Sync command and always return 0 timestamp, so it is not even possible to + * provide valid packet timestamps on such broken hardware. + * + * Kernel (as of 6.18) does not do any stream synchronization, and its packet + * scheduler can also introduce desync on socket buffer level if controller + * buffers are full. + * + * Consequently, there's currently no fully reliable way to sync even two + * channels. We have to try work around this mess by attempting to detect desyncs, + * and resynchronize if: + * + * - if socket queues are out of balance (kernel packet scheduler out of sync) + * - if controller is reporting packet completion times that seem off between + * different streams, controller is likely out of sync. No way to know, really, + * but let's flush then and hope for the best. + * + * In addition, we have to keep minimal fill level in the controller to avoid it + * running out of packets, as that triggers desyncs on Intel controllers. + */ - if (kernel_queue == UINT_MAX) - kernel_queue = stream->tx_latency.kernel_queue; + /* Check for ongoing flush */ + if (group->flush) { + spa_list_for_each(stream, &group->streams, link) { + if (!stream->sink) + continue; - if (group->flush && stream->tx_latency.queue) { - spa_log_debug(group->log, "%p: ISO group:%d latency skip: flushing", + if (stream->tx_latency.queue) { + spa_log_trace(group->log, "%p: ISO group:%d resync pause: flushing", + group, group->id); + return true; + } + } + + if (--group->flush) { + spa_log_trace(group->log, "%p: ISO group:%d resync pause: flushing wait", group, group->id); return true; } - if (stream->tx_latency.kernel_queue != kernel_queue) { - /* Streams out of sync, try to correct if it persists */ - spa_log_debug(group->log, "%p: ISO group:%d latency skip: imbalance", - group, group->id); - group->flush = true; - return true; - } } - group->flush = false; - + /* Evaluate TX imbalances */ spa_list_for_each(stream, &group->streams, link) { - if (!stream->sink) + if (!stream->sink || stream->idle) continue; - if (!stream->tx_latency.valid) + if (!stream->tx_latency.enabled || !stream->tx_latency.valid) return false; - min_latency = SPA_MIN(min_latency, stream->tx_latency.ptp.min); - max_latency = SPA_MAX(max_latency, stream->tx_latency.ptp.max); + min_kernel = SPA_MIN(stream->tx_latency.kernel_queue * group->duration_tx, min_kernel); + max_kernel = SPA_MAX(stream->tx_latency.kernel_queue * group->duration_tx, max_kernel); + + min_min = SPA_MIN(min_min, stream->tx_latency.ptp.min); + max_min = SPA_MAX(max_min, stream->tx_latency.ptp.min); } - if (max_latency > MAX_LATENCY) { - spa_log_debug(group->log, "%p: ISO group:%d latency skip: latency %d ms", - group, group->id, (int)(max_latency / SPA_NSEC_PER_MSEC)); - group->flush = true; - return true; + /* Update values */ + if (min_min > max_min || min_kernel > max_kernel) + return false; + + spa_bt_ptp_update(&group->kernel_imbalance, max_kernel - min_kernel, group->duration_tx); + spa_bt_ptp_update(&group->stream_imbalance, max_min - min_min, group->duration_tx); + + /* Check latencies */ + if (!spa_bt_ptp_valid(&group->kernel_imbalance) || !spa_bt_ptp_valid(&group->stream_imbalance)) + return false; + + if (max_min > MAX_LATENCY) { + spa_log_info(group->log, "%p: ISO group:%d resync pause: too big latency %d ms", + group, group->id, (int)(max_min / SPA_NSEC_PER_MSEC)); + group->flush = FLUSH_WAIT; } - return false; + if (group->kernel_imbalance.min >= group->duration_tx/2) { + spa_log_info(group->log, "%p: ISO group:%d resync pause: kernel desync %d ms", + group, group->id, (int)(group->kernel_imbalance.min / SPA_NSEC_PER_MSEC)); + group->flush = FLUSH_WAIT; + } + + if (group->stream_imbalance.min >= group->duration_tx*4/5) { + spa_log_info(group->log, "%p: ISO group:%d resync pause: stream desync %d ms", + group, group->id, (int)(group->stream_imbalance.min / SPA_NSEC_PER_MSEC)); + group->flush = FLUSH_WAIT; + } + + return group->flush; } static void group_on_timeout(struct spa_source *source) @@ -260,6 +336,7 @@ static void group_on_timeout(struct spa_source *source) bool debug_mono = false; uint64_t exp; uint64_t now_realtime; + unsigned int fill_count; int res; if ((res = spa_system_timerfd_read(group->data_system, group->timerfd, &exp)) < 0) { @@ -303,6 +380,7 @@ static void group_on_timeout(struct spa_source *source) if (group_latency_check(group)) { spa_list_for_each(stream, &group->streams, link) spa_bt_latency_reset(&stream->tx_latency); + reset_imbalance(group); goto done; } @@ -330,6 +408,48 @@ static void group_on_timeout(struct spa_source *source) } } + /* Ensure controller fill level */ + fill_count = UINT_MAX; + spa_list_for_each(stream, &group->streams, link) { + if (!stream->sink || !group->started) + continue; + if (stream->tx_latency.queue < MIN_FILL) + fill_count = SPA_MIN(fill_count, MIN_FILL - stream->tx_latency.queue); + } + if (fill_count == UINT_MAX) + fill_count = 0; + spa_list_for_each(stream, &group->streams, link) { + uint64_t now; + unsigned int i; + + if (!stream->sink || !group->started) + continue; + + /* Ensure buffer level on controller side */ + for (i = 0; i < fill_count; ++i) { + uint8_t buf[4096]; + int size; + + size = stream_silence_buf(stream, buf, sizeof(buf)); + if (size < 0) { + fail = true; + break; + } + + spa_log_debug(group->log, "%p: ISO group:%u fill fd:%d", + group, group->id, stream->fd); + now = get_time_ns(group->data_system, CLOCK_REALTIME); + res = spa_bt_send(stream->fd, buf, size, &stream->tx_latency, now); + if (res < 0) { + res = -errno; + fail = true; + break; + } + } + } + if (fail) + goto done; + /* Produce output */ spa_list_for_each(stream, &group->streams, link) { int res = 0; @@ -358,7 +478,6 @@ static void group_on_timeout(struct spa_source *source) if (res < 0) { res = -errno; fail = true; - group->flush = true; } spa_log_trace(group->log, "%p: ISO group:%u sent fd:%d size:%u ts:%u idle:%d res:%d latency:%d..%d%sus queue:%u", @@ -371,10 +490,12 @@ static void group_on_timeout(struct spa_source *source) stream->this.size = 0; } - if (fail) - spa_log_debug(group->log, "%p: ISO group:%d send failure", group, group->id); - done: + if (fail) { + spa_log_debug(group->log, "%p: ISO group:%d send failure", group, group->id); + group->flush = FLUSH_WAIT; + } + /* Pull data for the next interval */ group->next += exp * group->duration_tx; @@ -425,6 +546,8 @@ static struct group *group_create(struct spa_bt_transport *t, spa_list_init(&group->streams); + reset_imbalance(group); + group->timerfd = spa_system_timerfd_create(group->data_system, CLOCK_MONOTONIC, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK); if (group->timerfd < 0) {