diff --git a/src/Makefile.am b/src/Makefile.am index a621a30..abbd870 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1037,7 +1037,8 @@ librtp_la_LIBADD = $(AM_LIBADD) libpulsecore-@PA_MAJORMINOR@.la libpulsecommon-@ libraop_la_SOURCES = \ modules/raop/raop_client.c modules/raop/raop_client.h \ - modules/raop/base64.c modules/raop/base64.h + modules/raop/base64.c modules/raop/base64.h \ + modules/raop/raop_packet_buffer.h modules/raop/raop_packet_buffer.c libraop_la_CFLAGS = $(AM_CFLAGS) $(OPENSSL_CFLAGS) -I$(top_srcdir)/src/modules/rtp libraop_la_LDFLAGS = $(AM_LDFLAGS) -avoid-version libraop_la_LIBADD = $(AM_LIBADD) $(OPENSSL_LIBS) libpulsecore-@PA_MAJORMINOR@.la librtp.la libpulsecommon-@PA_MAJORMINOR@.la libpulse.la diff --git a/src/modules/raop/raop_client.c b/src/modules/raop/raop_client.c index 0bc40ef..0b7615d 100644 --- a/src/modules/raop/raop_client.c +++ b/src/modules/raop/raop_client.c @@ -59,6 +59,8 @@ #include "rtsp_client.h" #include "base64.h" +#include "raop_packet_buffer.h" + #define FRAMES_PER_PACKET 352 #define AES_CHUNKSIZE 16 @@ -76,6 +78,8 @@ #define DEFAULT_CONTROL_PORT 6001 #define DEFAULT_TIMING_PORT 6002 +#define DEFAULT_PKT_BUF_SIZE 1000 + typedef enum { PAYLOAD_TIMING_REQUEST = 0x52, PAYLOAD_TIMING_RESPONSE = 0x53, @@ -128,6 +132,8 @@ struct pa_raop_client { pa_raop_client_disconnected_cb_t disconnected_callback; void *disconnected_userdata; + + packet_buffer_t *packet_buffer; }; /* Timming packet header (8x8): @@ -162,6 +168,17 @@ static const uint8_t audio_header[12] = { 0x00, 0x00, 0x00, 0x00 }; +/* Retransmit request packet header (8x8): + * [0] RTP v2: 0x80 + * [1] Payload type: 0x55 | marker bit: 0x80 + * [2,3] (Request) Sequence number: 0x0000 (to be set) + * [4,5] First sequence number of packet to retransmit: 0x0000 (to be set) + * [6,7] Number of missed packets to retransmit: 0x0000 (to be set) */ +static const uint8_t retransmit_header[8] = { + 0x80, 0xd5, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00 +}; + static inline void rtrimchar(char *str, char rc) { char *sp; @@ -536,24 +553,28 @@ static int send_sync_packet(pa_raop_client *c, uint32_t stamp) { return rv; } -static int send_audio_packet(pa_raop_client *c, uint32_t *buffer, size_t size, ssize_t *written) { +static int send_audio_packet(pa_raop_client *c, uint32_t *buffer, size_t size, uint16_t retrans_seq_num, ssize_t *written) { ssize_t length = 0; int rv = 1; memcpy(buffer, audio_header, sizeof(audio_header)); if (c->first_packet == TRUE) buffer[0] |= ((uint32_t) 0x80) << 8; - buffer[0] |= htonl((uint32_t) c->seq); + if (retrans_seq_num) { + buffer[0] = (uint32_t) 0x5580; + buffer[0] |= htonl((uint32_t) retrans_seq_num); + } else + buffer[0] |= htonl((uint32_t) c->seq); buffer[1] = htonl(c->rtptime); buffer[2] = htonl(c->ssrc); length = pa_loop_write(c->stream_fd, buffer, size, NULL); if (length == ((ssize_t) size)) rv = 0; - if (written != NULL) *written = length; - c->seq++; + if (!(retrans_seq_num)) + c->seq++; return rv; } @@ -930,6 +951,8 @@ pa_raop_client* pa_raop_client_new(pa_core *core, const char *host, pa_sample_sp /* Packet sync interval should be around 1s. */ c->sync_interval = spec.rate / FRAMES_PER_PACKET; c->sync_count = 0; + + c->packet_buffer = pb_init(DEFAULT_PKT_BUF_SIZE); } return c; @@ -945,6 +968,8 @@ void pa_raop_client_free(pa_raop_client *c) { pa_xfree(c->sid); pa_xfree(c->host); pa_xfree(c); + + pb_free(c->packet_buffer); } int pa_raop_client_connect(pa_raop_client *c) { @@ -1053,14 +1078,47 @@ int pa_raop_client_handle_timing_packet(pa_raop_client *c, const uint8_t packet[ return rv; } +static int resend_packets(pa_raop_client *c, uint16_t seq_num, uint16_t num_packets) { + int rv = -1; + uint32_t *packet_data = NULL; + ssize_t packet_length = 0; + ssize_t written = 0; + int i = 0; + + pa_assert(c); + pa_assert(num_packets); + pa_assert(c->packet_buffer); + + /* If the requested packet is too old (seq_num below first seq number in buffer) + or too young (seq_num greater than current seq number), do nothing and return */ + if ((seq_num < c->packet_buffer->first_seq_num) || (seq_num > c->seq)) + return rv; + for (i = seq_num; i < (seq_num + num_packets); i++) { + packet_data = malloc((FRAMES_PER_PACKET * 2 * 2) + sizeof(audio_header)); + rv = pb_get_packet(c->packet_buffer, i, packet_data, &packet_length); + + pa_assert(packet_length); + + if (rv == 0) + rv = send_audio_packet(c, packet_data, packet_length, seq_num, &written); + + free(packet_data); + } + + return rv; +} + int pa_raop_client_handle_control_packet(pa_raop_client *c, const uint8_t packet[], ssize_t size) { uint8_t playload = 0; int rv = 0; + uint16_t seq_num; + uint16_t num_packets; + pa_assert(c); pa_assert(packet); - if (size != 20 || packet[0] != 0x80) + if ((size != 20 && size != 8) || packet[0] != 0x80) { pa_log_debug("Received an invalid control packet."); return 1; @@ -1070,10 +1128,22 @@ int pa_raop_client_handle_control_packet(pa_raop_client *c, const uint8_t packet playload = packet[1] ^ 0x80; switch (playload) { case PAYLOAD_RETRANSMIT_REQUEST: - /* Packet retransmission not implemented yet... */ - /* rv = ... */ + pa_assert(size == 8); + + /* Requested start sequence number */ + seq_num = ((uint16_t) packet[4]) << 8; + seq_num |= (uint16_t) packet[5]; + /* Number of requested packets starting at requested seq. number */ + num_packets = (uint16_t) packet[6] << 8; + num_packets |= (uint16_t) packet[7]; + pa_log_debug("Resending %d packets starting at %d", num_packets, seq_num); + rv = resend_packets(c, seq_num, num_packets); break; + case PAYLOAD_RETRANSMIT_REPLY: + pa_log_debug("Received a retransmit reply packet on control port (this should never happen)"); + break; + default: pa_log_debug("Got an unexpected payload type on control channel !"); return 1; @@ -1133,12 +1203,18 @@ int pa_raop_client_send_audio_packet(pa_raop_client *c, pa_memchunk *block, ssiz } buf = (uint32_t *) pa_memblock_acquire(block->memblock); - if (buf != NULL && block->length > 0) - rv = send_audio_packet(c, buf + block->index, block->length, &length); + if (buf != NULL && block->length > 0) { + rv = send_audio_packet(c, buf + block->index, block->length, 0, &length); + + /* Store packet for resending in the packet buffer */ + pb_write_packet(c->packet_buffer, c->seq, buf + block->index, block->length); + } pa_memblock_release(block->memblock); + block->index += length; block->length -= length; - if (written != NULL) + + if (written != NULL) *written = length; if (c->first_packet == TRUE) diff --git a/src/modules/raop/raop_packet_buffer.c b/src/modules/raop/raop_packet_buffer.c new file mode 100644 index 0000000..52823a9 --- /dev/null +++ b/src/modules/raop/raop_packet_buffer.c @@ -0,0 +1,140 @@ +/*** + Circular buffer for RTP audio packets with random access support by RTP sequence number. + + Copyright 2013 Matthias Wabersich + + This is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + This is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + USA. + +***/ + +#include +#include + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include "raop_client.h" + +#include "raop_packet_buffer.h" + +packet_buffer_t *pb_init(int size) { + packet_buffer_t *pb = calloc(1, sizeof(packet_buffer_t)); + pb->size = size; + pb->start = 0; + pb->count = 0; + pb->first_seq_num = 0; + pb->packets = (packet_element_t *)calloc(pb->size, sizeof(packet_element_t)); + return pb; +} + + +void pb_free(packet_buffer_t *pb) { + free(pb->packets); + free(pb); +} + +int pb_is_full(packet_buffer_t *pb) { + return pb->count == pb->size; +} + +int pb_is_empty(packet_buffer_t *pb) { + return pb->count == 0; +} + +int pb_write(packet_buffer_t *pb, packet_element_t *packet) { + int end = (pb->start + pb->count) % pb->size; + int ov = 0; + + + /* Set first packet sequence number in buffer if buffer is empty */ + if (pb_is_empty(pb)) + pb->first_seq_num = packet->seq_num; + + pb->packets[end] = *packet; + if (pb->count == pb->size) { + pb->start = (pb->start + 1) % pb->size; /* full, overwrite */ + + /* Set first packet sequence number in buffer to new start packet sequence number */ + packet_element_t *start_packet = malloc(sizeof(packet_element_t)); + *start_packet = pb->packets[pb->start]; + pb->first_seq_num = start_packet->seq_num; + free(start_packet); + ov = 1; + } else { + ++ pb->count; + ov = 0; + } +} + +int pb_read(packet_buffer_t *pb, packet_element_t *packet) { + int rv = -1; + + if (pb_is_empty(pb)) + rv = -1; + else { + *packet = pb->packets[pb->start]; + pb->start = (pb->start + 1) % pb->size; + -- pb->count; + rv = 0; + } + if (pb_is_empty(pb)) + pb->first_seq_num = 0; + return rv; +} + +/* Write packet data to packet buffer */ +int pb_write_packet(packet_buffer_t *pb, uint16_t seq_num, uint32_t *packet_data, ssize_t packet_length) { + packet_element_t *packet; + + packet = malloc(sizeof(packet_element_t)); + packet->seq_num = seq_num; + packet->packet_length = packet_length; + packet->resent = FALSE; + memcpy(&packet->packet, packet_data, packet->packet_length); + + pb_write(pb, packet); + free(packet); +} + +/* Random access to packet from buffer by sequence number for (re-)sending. */ +int pb_get_packet(packet_buffer_t *pb, uint16_t seq_num, uint32_t *packet_data, ssize_t *packet_length) { + int rv = -1; + int index = 0; /* Index of requested packet */ + packet_element_t *packet = malloc(sizeof(packet_element_t)); + + /* If the buffer is empty, there is no use in calculating indices */ + if (pb_is_empty(pb)) + return rv; + + /* Index of the requested packet in the buffer is calculated using the first sequence number + stored in the buffer. The offset (seq_num - first_seq_num) is used to access the array. + */ + if (seq_num == pb->first_seq_num) + index = 0; + else + index = (seq_num - pb->first_seq_num); + + *packet = pb->packets[index]; + + packet->resent = TRUE; + packet_data = packet->packet; + *packet_length = packet->packet_length; + free(packet); + rv = 0; + return rv; +} diff --git a/src/modules/raop/raop_packet_buffer.h b/src/modules/raop/raop_packet_buffer.h new file mode 100644 index 0000000..1cb50e9 --- /dev/null +++ b/src/modules/raop/raop_packet_buffer.h @@ -0,0 +1,46 @@ +/*** + Circular buffer for RTP audio packets with random access support by RTP sequence number. + + Copyright 2013 Matthias Wabersich + + This is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published + by the Free Software Foundation; either version 2.1 of the License, + or (at your option) any later version. + + This is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with PulseAudio; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + USA. + +***/ + +#define PACKET_SIZE_MAX 1422 /* FRAMES_PER_PACKET * 2 * 2 + sizeof(rtp_header), unencoded */ + +/* Packet element */ +typedef struct { + uint16_t seq_num; /* RTP sequence number */ + ssize_t packet_length; /* Actual packet length, bounded to PACKET_SIZE_MAX */ + bool resent; /* Resend Flag (TRUE == packet has been re-sent) */ + uint32_t packet[PACKET_SIZE_MAX]; /* Packet data including RTP header */ +} packet_element_t; + +/* Buffer struct */ +typedef struct { + int size; /* max number of packets in buffer */ + int start; /* index of oldest packet */ + int count; /* number of packets in buffer */ + uint16_t first_seq_num; /* Sequence number of first packet in buffer */ + packet_element_t *packets; /* Packet element pointer */ +} packet_buffer_t; + +packet_buffer_t *pb_init(int size); +void pb_free(packet_buffer_t *pb); + +int pb_write_packet(packet_buffer_t *pb, uint16_t seq_num, uint32_t *packet_data, ssize_t packet_length); +int pb_get_packet(packet_buffer_t *pb, uint16_t seq_num, uint32_t *packet_data, ssize_t *packet_length);