Browse Source

pulse-server: split out message handling

Part of !776.
lines
Barnabás Pőcze 1 year ago
parent
commit
b2ec1fb60a
6 changed files with 147 additions and 83 deletions
  1. +1
    -0
      src/modules/meson.build
  2. +2
    -0
      src/modules/module-protocol-pulse/defs.h
  3. +2
    -0
      src/modules/module-protocol-pulse/ext-stream-restore.c
  4. +64
    -39
      src/modules/module-protocol-pulse/message.c
  5. +77
    -0
      src/modules/module-protocol-pulse/message.h
  6. +1
    -44
      src/modules/module-protocol-pulse/pulse-server.c

+ 1
- 0
src/modules/meson.build View File

@ -138,6 +138,7 @@ pipewire_module_protocol_pulse_sources = [
'module-protocol-pulse/format.c',
'module-protocol-pulse/manager.c',
'module-protocol-pulse/media-roles.c',
'module-protocol-pulse/message.c',
'module-protocol-pulse/pending-sample.c',
'module-protocol-pulse/pulse-server.c',
'module-protocol-pulse/sample.c',


+ 2
- 0
src/modules/module-protocol-pulse/defs.h View File

@ -25,6 +25,8 @@
#ifndef PULSE_SERVER_DEFS_H
#define PULSE_SERVER_DEFS_H
#include <pipewire/node.h>
#define FLAG_SHMDATA 0x80000000LU
#define FLAG_SHMDATA_MEMFD_BLOCK 0x20000000LU
#define FLAG_SHMRELEASE 0x40000000LU


+ 2
- 0
src/modules/module-protocol-pulse/ext-stream-restore.c View File

@ -26,6 +26,8 @@
#include <spa/utils/string.h>
#include "media-roles.h"
static const struct extension_sub ext_stream_restore[];
static int do_extension_stream_restore_test(struct client *client, uint32_t command, uint32_t tag, struct message *m)


+ 64
- 39
src/modules/module-protocol-pulse/message.c View File

@ -22,9 +22,21 @@
* DEALINGS IN THE SOFTWARE.
*/
#include <arpa/inet.h>
#include <math.h>
#include <spa/debug/buffer.h>
#include <spa/utils/defs.h>
#include <spa/utils/string.h>
#include <pipewire/keys.h>
#include <pipewire/log.h>
#include "defs.h"
#include "format.h"
#include "internal.h"
#include "media-roles.h"
#include "message.h"
#include "volume.h"
#define VOLUME_MUTED ((uint32_t) 0U)
#define VOLUME_NORM ((uint32_t) 0x10000U)
@ -49,7 +61,7 @@ static inline float volume_to_linear(uint32_t vol)
return v * v * v;
}
const struct str_map key_table[] = {
static const struct str_map key_table[] = {
{ PW_KEY_DEVICE_BUS_PATH, "device.bus_path" },
{ PW_KEY_DEVICE_FORM_FACTOR, "device.form_factor" },
{ PW_KEY_DEVICE_ICON_NAME, "device.icon_name" },
@ -63,41 +75,6 @@ const struct str_map key_table[] = {
{ NULL, NULL },
};
enum {
TAG_INVALID = 0,
TAG_STRING = 't',
TAG_STRING_NULL = 'N',
TAG_U32 = 'L',
TAG_U8 = 'B',
TAG_U64 = 'R',
TAG_S64 = 'r',
TAG_SAMPLE_SPEC = 'a',
TAG_ARBITRARY = 'x',
TAG_BOOLEAN_TRUE = '1',
TAG_BOOLEAN_FALSE = '0',
TAG_BOOLEAN = TAG_BOOLEAN_TRUE,
TAG_TIMEVAL = 'T',
TAG_USEC = 'U' /* 64bit unsigned */,
TAG_CHANNEL_MAP = 'm',
TAG_CVOLUME = 'v',
TAG_PROPLIST = 'P',
TAG_VOLUME = 'V',
TAG_FORMAT_INFO = 'f',
};
struct message {
struct spa_list link;
struct stats *stat;
uint32_t extra[4];
uint32_t channel;
uint32_t allocated;
uint32_t length;
uint32_t offset;
uint8_t *data;
};
static int message_get(struct message *m, ...);
static int read_u8(struct message *m, uint8_t *val)
{
if (m->offset + 1 > m->length)
@ -292,7 +269,7 @@ static int read_format_info(struct message *m, struct format_info *info)
return res;
}
static int message_get(struct message *m, ...)
int message_get(struct message *m, ...)
{
va_list va;
int res = 0;
@ -614,7 +591,7 @@ static void write_format_info(struct message *m, struct format_info *info)
write_dict(m, info->props ? &info->props->dict : NULL, false);
}
static int message_put(struct message *m, ...)
int message_put(struct message *m, ...)
{
va_list va;
@ -684,7 +661,7 @@ static int message_put(struct message *m, ...)
return 0;
}
static int message_dump(enum spa_log_level level, struct message *m)
int message_dump(enum spa_log_level level, struct message *m)
{
int res;
uint32_t i, offset = m->offset, o;
@ -844,3 +821,51 @@ static int message_dump(enum spa_log_level level, struct message *m)
return 0;
}
struct message *message_alloc(struct impl *impl, uint32_t channel, uint32_t size)
{
struct message *msg;
if (!spa_list_is_empty(&impl->free_messages)) {
msg = spa_list_first(&impl->free_messages, struct message, link);
spa_list_remove(&msg->link);
pw_log_trace("using recycled message %p", msg);
} else {
if ((msg = calloc(1, sizeof(*msg))) == NULL)
return NULL;
pw_log_trace("new message %p", msg);
msg->stat = &impl->stat;
msg->stat->n_allocated++;
msg->stat->n_accumulated++;
}
if (ensure_size(msg, size) < 0) {
message_free(impl, msg, false, true);
return NULL;
}
spa_zero(msg->extra);
msg->channel = channel;
msg->offset = 0;
msg->length = size;
return msg;
}
void message_free(struct impl *impl, struct message *msg, bool dequeue, bool destroy)
{
if (dequeue)
spa_list_remove(&msg->link);
if (destroy) {
pw_log_trace("destroy message %p", msg);
msg->stat->n_allocated--;
msg->stat->allocated -= msg->allocated;
free(msg->data);
free(msg);
} else {
pw_log_trace("recycle message %p", msg);
spa_list_append(&impl->free_messages, &msg->link);
}
}

+ 77
- 0
src/modules/module-protocol-pulse/message.h View File

@ -0,0 +1,77 @@
/* PipeWire
*
* Copyright © 2020 Wim Taymans
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice (including the next
* paragraph) shall be included in all copies or substantial portions of the
* Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*/
#ifndef PULSE_SERVER_MESSAGE_H
#define PULSE_SERVER_MESSAGE_H
#include <stdbool.h>
#include <stdint.h>
#include <spa/utils/list.h>
#include <spa/support/log.h>
struct impl;
struct client;
struct stats;
struct message {
struct spa_list link;
struct stats *stat;
uint32_t extra[4];
uint32_t channel;
uint32_t allocated;
uint32_t length;
uint32_t offset;
uint8_t *data;
};
enum {
TAG_INVALID = 0,
TAG_STRING = 't',
TAG_STRING_NULL = 'N',
TAG_U32 = 'L',
TAG_U8 = 'B',
TAG_U64 = 'R',
TAG_S64 = 'r',
TAG_SAMPLE_SPEC = 'a',
TAG_ARBITRARY = 'x',
TAG_BOOLEAN_TRUE = '1',
TAG_BOOLEAN_FALSE = '0',
TAG_BOOLEAN = TAG_BOOLEAN_TRUE,
TAG_TIMEVAL = 'T',
TAG_USEC = 'U' /* 64bit unsigned */,
TAG_CHANNEL_MAP = 'm',
TAG_CVOLUME = 'v',
TAG_PROPLIST = 'P',
TAG_VOLUME = 'V',
TAG_FORMAT_INFO = 'f',
};
struct message *message_alloc(struct impl *impl, uint32_t channel, uint32_t size);
void message_free(struct impl *impl, struct message *msg, bool dequeue, bool destroy);
int message_get(struct message *m, ...);
int message_put(struct message *m, ...);
int message_dump(enum spa_log_level level, struct message *m);
#endif /* PULSE_SERVER_MESSAGE_H */

+ 1
- 44
src/modules/module-protocol-pulse/pulse-server.c View File

@ -77,6 +77,7 @@
#include "defs.h"
#include "format.h"
#include "internal.h"
#include "message.h"
#include "pending-sample.h"
#include "sample.h"
#include "sample-play.h"
@ -97,7 +98,6 @@
#define MAX_FORMATS 32
#define MAX_CLIENTS 64
#include "message.c"
#include "manager.h"
static bool debug_messages = false;
@ -138,49 +138,6 @@ static struct sample *find_sample(struct impl *impl, uint32_t idx, const char *n
return NULL;
}
static void message_free(struct impl *impl, struct message *msg, bool dequeue, bool destroy)
{
if (dequeue)
spa_list_remove(&msg->link);
if (destroy) {
pw_log_trace("destroy message %p", msg);
msg->stat->n_allocated--;
msg->stat->allocated -= msg->allocated;
free(msg->data);
free(msg);
} else {
pw_log_trace("recycle message %p", msg);
spa_list_append(&impl->free_messages, &msg->link);
}
}
static struct message *message_alloc(struct impl *impl, uint32_t channel, uint32_t size)
{
struct message *msg;
if (!spa_list_is_empty(&impl->free_messages)) {
msg = spa_list_first(&impl->free_messages, struct message, link);
spa_list_remove(&msg->link);
pw_log_trace("using recycled message %p", msg);
} else {
if ((msg = calloc(1, sizeof(struct message))) == NULL)
return NULL;
pw_log_trace("new message %p", msg);
msg->stat = &impl->stat;
msg->stat->n_allocated++;
msg->stat->n_accumulated++;
}
if (ensure_size(msg, size) < 0) {
message_free(impl, msg, false, true);
return NULL;
}
spa_zero(msg->extra);
msg->channel = channel;
msg->offset = 0;
msg->length = size;
return msg;
}
static int flush_messages(struct client *client)
{
struct impl *impl = client->impl;


Loading…
Cancel
Save