Request real-time priority from the operating system for audio pipeline (#866)

* Bound requested frames between both min and max.

* Request real-time scheduling for pipeline threads (currently macOS-only).

* Ensure pipeline threads never execute at the same time as audio threads.

* macOS: ask for 5ms audio blocks.

* Temporarily re-enable stress test.

* Suppress codespell for .mm files.

* Swap times/buffer sizes.

* Fix compiler error.

* Go back to previous frame size and time.

* Re-disable stress test.

* Try adjusting time quantum.

* Move audio priority handling to audio layer.

* Set helper thread to real-time on Windows.

* Temporarily disable audio workgroup code.

* Disable periodicity in macOS RT request

* Forgot to define variable.

* Only allow a max of 50% CPU time for audio helpers.

* More tuning of the duty cycle.

* Make sure we sleep some amount of time every time through the loop.

* Go back to smaller time quantum.

* Another quantum tweak.

* Increase audio sample block size to 20ms.

* Use 2.5ms for audio block time.

* Try increasing FIFO sizes for RX.

* Oops, forgot to comment out the / 1000 too.

* Get faster turnaround for macOS GH builds.

* Revert FIFO changes.

* Add additional debugging.

* Fix compiler error.

* Fix typo.

* Use semaphores to notify TX/RX worker threads.

* Try dividing duty cycle by 2 to avoid starvation.

* Reenable audio workgroups.

* No point in having parallel threads if only one mode is active.

* Ensure that ParallelStep gets added to the audio workgroup too.

* Anything for GUI consumption shouldn't be in the RT path.

* Go back to 10ms audio blocks to see if reporting tests can more reliably pass.

* Fix issue causing audio to not work on certain Macs.

* Support real-time threading on Linux when using PulseAudio/pipewire.

* Fix misspelling.

* dbus needs to be installed in the environment.

* macOS: try a smaller block size again.

* Revert "macOS: try a smaller block size again."

This reverts commit 1d21ad6934.

* Try architecture-specific audio block times.

* rtkit itself also needs to be installed in the environment.

* Revert ordering changes to macOS CI builds.

* GH user needs to be added to rtkit group.

* Implement semaphores on Windows.

* Don't exit the render/capture threads unless requested by higher level code.

* Don't take address of a pointer.

* macOS: set thread QoS for RT threads.

* Move RADE RX/TX memory allocations out of real-time path.

* Moving additional memory allocations outside of RT context.

* Move all remaining allocations in pipeline outside of RT except for legacy FreeDV modes.

* Longer audio block times don't seem to be making a difference.

* Move legacy mode memory allocations out of RT context.

* Prevent unbounded mutex locks inside pipeline.

* Windows: fallback to a simple sleep if we can't lock the semaphore.

* Increase maximum wait time to match what Windows returns for buffer size.

* PulseAudio: fallback to simple sleep if we can't wait on semaphore.

* Prevent unbounded locking in TapStep.
pull/869/head
Mooneer Salem 2025-04-23 00:18:24 -07:00 committed by GitHub
parent 6bf0f311b1
commit 439e54035f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
52 changed files with 1383 additions and 313 deletions

View File

@ -25,11 +25,12 @@ jobs:
run: | run: |
sudo apt-get update sudo apt-get update
sudo apt-get upgrade -y sudo apt-get upgrade -y
sudo apt-get install codespell libpulse-dev libspeexdsp-dev libsamplerate0-dev sox git libwxgtk3.2-dev portaudio19-dev libhamlib-dev libasound2-dev libao-dev libgsm1-dev libsndfile-dev xvfb pipewire pulseaudio-utils pipewire-pulse wireplumber metacity dbus-x11 at-spi2-core rtkit octave octave-signal sudo apt-get install codespell libpulse-dev libspeexdsp-dev libsamplerate0-dev sox git libwxgtk3.2-dev portaudio19-dev libhamlib-dev libasound2-dev libao-dev libgsm1-dev libsndfile-dev xvfb pipewire pulseaudio-utils pipewire-pulse wireplumber metacity dbus-x11 at-spi2-core rtkit octave octave-signal libdbus-1-dev
sudo usermod -a -G rtkit $(whoami)
- name: Spellcheck codebase - name: Spellcheck codebase
shell: bash shell: bash
run: codespell --ignore-words-list=caf,radae,rade,inout,nin,ontop,parm,tthe,ue `find src -name '*.c*' -o -name '*.h' -o -name '*.mm' | grep -v 3rdparty` run: codespell --ignore-words-list=caf,radae,rade,inout,nin,ontop,parm,tthe,ue `find src -name '*.c*' -o -name '*.h' | grep -v 3rdparty`
- name: Install Python required modules - name: Install Python required modules
shell: bash shell: bash

View File

@ -205,6 +205,18 @@ if(USE_STATIC_DEPS)
set(USE_STATIC_SPEEXDSP TRUE FORCE) set(USE_STATIC_SPEEXDSP TRUE FORCE)
endif(USE_STATIC_DEPS) endif(USE_STATIC_DEPS)
#
# Find DBus (Linux only)--this is so that we can use
# rtkit to request real-time scheduling for our audio
# threads.
if (LINUX)
message(STATUS "Looking for DBus...")
find_package(DBus)
if (DBUS_FOUND)
message(STATUS "Found DBus, will attempt to use rtkit for real-time threading")
endif (DBUS_FOUND)
endif (LINUX)
# #
# Find wxWidgets # Find wxWidgets
# #

View File

@ -0,0 +1,59 @@
# - Try to find DBus
# Once done, this will define
#
# DBUS_FOUND - system has DBus
# DBUS_INCLUDE_DIRS - the DBus include directories
# DBUS_LIBRARIES - link these to use DBus
#
# Copyright (C) 2012 Raphael Kubo da Costa <rakuco@webkit.org>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER AND ITS CONTRIBUTORS ``AS
# IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR ITS
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
FIND_PACKAGE(PkgConfig)
PKG_CHECK_MODULES(PC_DBUS QUIET dbus-1)
FIND_LIBRARY(DBUS_LIBRARIES
NAMES dbus-1
HINTS ${PC_DBUS_LIBDIR}
${PC_DBUS_LIBRARY_DIRS}
)
FIND_PATH(DBUS_INCLUDE_DIR
NAMES dbus/dbus.h
HINTS ${PC_DBUS_INCLUDEDIR}
${PC_DBUS_INCLUDE_DIRS}
)
GET_FILENAME_COMPONENT(_DBUS_LIBRARY_DIR ${DBUS_LIBRARIES} PATH)
FIND_PATH(DBUS_ARCH_INCLUDE_DIR
NAMES dbus/dbus-arch-deps.h
HINTS ${PC_DBUS_INCLUDEDIR}
${PC_DBUS_INCLUDE_DIRS}
${_DBUS_LIBRARY_DIR}
${DBUS_INCLUDE_DIR}
PATH_SUFFIXES include
)
SET(DBUS_INCLUDE_DIRS ${DBUS_INCLUDE_DIR} ${DBUS_ARCH_INCLUDE_DIR})
INCLUDE(FindPackageHandleStandardArgs)
FIND_PACKAGE_HANDLE_STANDARD_ARGS(DBUS REQUIRED_VARS DBUS_INCLUDE_DIRS DBUS_LIBRARIES)

View File

@ -10,9 +10,14 @@ if(NATIVE_AUDIO_AVAILABLE)
WASAPIAudioDevice.cpp WASAPIAudioDevice.cpp
) )
elseif(LINUX) elseif(LINUX)
if (DBUS_FOUND)
set(RTKIT_FILES rtkit.c)
endif (DBUS_FOUND)
set(AUDIO_ENGINE_LIBRARY_SPECIFIC_FILES set(AUDIO_ENGINE_LIBRARY_SPECIFIC_FILES
PulseAudioDevice.cpp PulseAudioDevice.cpp
PulseAudioEngine.cpp PulseAudioEngine.cpp
${RTKIT_FILES}
) )
endif() endif()
else() else()
@ -37,6 +42,10 @@ if(APPLE AND NATIVE_AUDIO_AVAILABLE)
"-framework CoreAudio") "-framework CoreAudio")
elseif(WIN32 AND NATIVE_AUDIO_AVAILABLE) elseif(WIN32 AND NATIVE_AUDIO_AVAILABLE)
target_link_libraries(fdv_audio PRIVATE uuid avrt) target_link_libraries(fdv_audio PRIVATE uuid avrt)
elseif(LINUX AND NATIVE_AUDIO_AVAILABLE AND DBUS_FOUND)
target_include_directories(fdv_audio PRIVATE ${DBUS_INCLUDE_DIRS})
target_compile_definitions(fdv_audio PRIVATE -DUSE_RTKIT)
target_link_libraries(fdv_audio PRIVATE ${DBUS_LIBRARIES})
endif() endif()
if(BOOTSTRAP_WXWIDGETS) if(BOOTSTRAP_WXWIDGETS)

View File

@ -26,9 +26,15 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include <functional> #include <functional>
#include "AudioDeviceSpecification.h" #include <thread>
#include <chrono>
class IAudioDevice #include "AudioDeviceSpecification.h"
#include "../util/IRealtimeHelper.h"
using namespace std::chrono_literals;
class IAudioDevice : public IRealtimeHelper
{ {
public: public:
typedef std::function<void(IAudioDevice&, void*, size_t, void*)> AudioDataCallbackFn; typedef std::function<void(IAudioDevice&, void*, size_t, void*)> AudioDataCallbackFn;
@ -47,6 +53,21 @@ public:
virtual int getLatencyInMicroseconds() = 0; virtual int getLatencyInMicroseconds() = 0;
// Configures current thread for real-time priority. This should be
// called from the thread that will be operating on received audio.
virtual void setHelperRealTime() override { /* empty */ }
// Lets audio system know that we're beginning to do work with the
// received audio.
virtual void startRealTimeWork() override { /* empty */ }
// Lets audio system know that we're done with the work on the received
// audio.
virtual void stopRealTimeWork() override { std::this_thread::sleep_for(10ms); }
// Reverts real-time priority for current thread.
virtual void clearHelperRealTime() override { /* empty */ }
// Sets user friendly description of device. Not used by all engines. // Sets user friendly description of device. Not used by all engines.
void setDescription(std::string desc); void setDescription(std::string desc);

View File

@ -23,6 +23,9 @@
#ifndef MAC_AUDIO_DEVICE_H #ifndef MAC_AUDIO_DEVICE_H
#define MAC_AUDIO_DEVICE_H #define MAC_AUDIO_DEVICE_H
#include <thread>
#include <dispatch/dispatch.h>
#include "../util/ThreadedObject.h" #include "../util/ThreadedObject.h"
#include "IAudioEngine.h" #include "IAudioEngine.h"
#include "IAudioDevice.h" #include "IAudioDevice.h"
@ -42,6 +45,21 @@ public:
virtual int getLatencyInMicroseconds() override; virtual int getLatencyInMicroseconds() override;
// Configures current thread for real-time priority. This should be
// called from the thread that will be operating on received audio.
virtual void setHelperRealTime() override;
// Lets audio system know that we're beginning to do work with the
// received audio.
virtual void startRealTimeWork() override;
// Lets audio system know that we're done with the work on the received
// audio.
virtual void stopRealTimeWork() override;
// Reverts real-time priority for current thread.
virtual void clearHelperRealTime() override;
protected: protected:
friend class MacAudioEngine; friend class MacAudioEngine;
@ -56,6 +74,11 @@ private:
void* player_; // actually AVAudioPlayerNode void* player_; // actually AVAudioPlayerNode
short* inputFrames_; short* inputFrames_;
static thread_local void* workgroup_;
static thread_local void* joinToken_;
dispatch_semaphore_t sem_;
}; };
#endif // MAC_AUDIO_DEVICE_H #endif // MAC_AUDIO_DEVICE_H

View File

@ -25,6 +25,26 @@
#include <future> #include <future>
#import <AVFoundation/AVFoundation.h> #import <AVFoundation/AVFoundation.h>
#import <AudioToolbox/AudioToolbox.h>
#include <mach/mach_time.h>
#include <mach/mach_init.h>
#include <mach/thread_policy.h>
#include <sched.h>
#include <pthread.h>
thread_local void* MacAudioDevice::workgroup_ = nullptr;
thread_local void* MacAudioDevice::joinToken_ = nullptr;
// One nanosecond in seconds.
constexpr static double kOneNanosecond = 1.0e9;
// The I/O interval time in seconds.
//#if __aarch64__
constexpr static double AUDIO_SAMPLE_BLOCK_SEC = 0.010;
//#else
//constexpr static double AUDIO_SAMPLE_BLOCK_SEC = 0.020;
//#endif // __aarch64__
static OSStatus GetIOBufferFrameSizeRange(AudioObjectID inDeviceID, static OSStatus GetIOBufferFrameSizeRange(AudioObjectID inDeviceID,
UInt32* outMinimum, UInt32* outMinimum,
@ -74,6 +94,8 @@ MacAudioDevice::MacAudioDevice(int coreAudioId, IAudioEngine::AudioDirection dir
, inputFrames_(nullptr) , inputFrames_(nullptr)
{ {
log_info("Create MacAudioDevice with ID %d, channels %d and sample rate %d", coreAudioId, numChannels, sampleRate); log_info("Create MacAudioDevice with ID %d, channels %d and sample rate %d", coreAudioId, numChannels, sampleRate);
sem_ = dispatch_semaphore_create(0);
} }
MacAudioDevice::~MacAudioDevice() MacAudioDevice::~MacAudioDevice()
@ -82,6 +104,8 @@ MacAudioDevice::~MacAudioDevice()
{ {
stop(); stop();
} }
dispatch_release(sem_);
} }
int MacAudioDevice::getNumChannels() int MacAudioDevice::getNumChannels()
@ -132,12 +156,13 @@ void MacAudioDevice::start()
// reduces dropouts on marginal hardware. // reduces dropouts on marginal hardware.
UInt32 minFrameSize = 0; UInt32 minFrameSize = 0;
UInt32 maxFrameSize = 0; UInt32 maxFrameSize = 0;
UInt32 desiredFrameSize = 512; UInt32 desiredFrameSize = pow(2, ceil(log(AUDIO_SAMPLE_BLOCK_SEC * sampleRate_) / log(2))); // next power of two
GetIOBufferFrameSizeRange(coreAudioId_, &minFrameSize, &maxFrameSize); GetIOBufferFrameSizeRange(coreAudioId_, &minFrameSize, &maxFrameSize);
if (minFrameSize != 0 && maxFrameSize != 0) if (minFrameSize != 0 && maxFrameSize != 0)
{ {
log_info("Frame sizes of %d to %d are supported for audio device ID %d", minFrameSize, maxFrameSize, coreAudioId_); log_info("Frame sizes of %d to %d are supported for audio device ID %d", minFrameSize, maxFrameSize, coreAudioId_);
desiredFrameSize = std::min(maxFrameSize, (UInt32)2048); // TBD: investigate why we need a significantly higher than default. desiredFrameSize = std::min(maxFrameSize, desiredFrameSize);
desiredFrameSize = std::max(minFrameSize, desiredFrameSize);
if (SetCurrentIOBufferFrameSize(coreAudioId_, desiredFrameSize) != noErr) if (SetCurrentIOBufferFrameSize(coreAudioId_, desiredFrameSize) != noErr)
{ {
log_warn("Could not set IO frame size to %d for audio device ID %d", desiredFrameSize, coreAudioId_); log_warn("Could not set IO frame size to %d for audio device ID %d", desiredFrameSize, coreAudioId_);
@ -224,6 +249,9 @@ void MacAudioDevice::start()
onAudioDataFunction(*this, inputFrames_, frameCount, onAudioDataState); onAudioDataFunction(*this, inputFrames_, frameCount, onAudioDataState);
} }
dispatch_semaphore_signal(sem_);
return OSStatus(noErr); return OSStatus(noErr);
}; };
@ -406,7 +434,7 @@ int MacAudioDevice::getLatencyInMicroseconds()
0, 0,
nullptr, nullptr,
&size, &size,
&streams); streams);
if (result == noErr) if (result == noErr)
{ {
propertyAddress.mSelector = kAudioStreamPropertyLatency; propertyAddress.mSelector = kAudioStreamPropertyLatency;
@ -429,3 +457,173 @@ int MacAudioDevice::getLatencyInMicroseconds()
}); });
return fut.get(); return fut.get();
} }
void MacAudioDevice::setHelperRealTime()
{
// Set thread QoS to "user interactive"
pthread_set_qos_class_self_np(QOS_CLASS_USER_INTERACTIVE, 0);
// Adapted from Chromium project. May need to adjust parameters
// depending on behavior.
// Get current thread ID
auto currentThreadId = pthread_mach_thread_np(pthread_self());
// Increase thread priority to real-time.
// Please note that the thread_policy_set() calls may fail in
// rare cases if the kernel decides the system is under heavy load
// and is unable to handle boosting the thread priority.
// In these cases we just return early and go on with life.
// Make thread fixed priority.
thread_extended_policy_data_t policy;
policy.timeshare = 0; // Set to 1 for a non-fixed thread.
kern_return_t result =
thread_policy_set(currentThreadId,
THREAD_EXTENDED_POLICY,
reinterpret_cast<thread_policy_t>(&policy),
THREAD_EXTENDED_POLICY_COUNT);
if (result != KERN_SUCCESS)
{
log_warn("Could not set current thread to real-time: %d");
return;
}
// Set to relatively high priority.
thread_precedence_policy_data_t precedence;
precedence.importance = 63;
result = thread_policy_set(currentThreadId,
THREAD_PRECEDENCE_POLICY,
reinterpret_cast<thread_policy_t>(&precedence),
THREAD_PRECEDENCE_POLICY_COUNT);
if (result != KERN_SUCCESS)
{
log_warn("Could not increase thread priority");
return;
}
// Most important, set real-time constraints.
// Define the guaranteed and max fraction of time for the audio thread.
// These "duty cycle" values can range from 0 to 1. A value of 0.5
// means the scheduler would give half the time to the thread.
// These values have empirically been found to yield good behavior.
// Good means that audio performance is high and other threads won't starve.
const double kGuaranteedAudioDutyCycle = 0.75;
const double kMaxAudioDutyCycle = 0.85;
// Define constants determining how much time the audio thread can
// use in a given time quantum. All times are in milliseconds.
//auto sampleBuffer = pow(2, ceil(log(0.01 * sampleRate_) / log(2))); // next power of two
const double kTimeQuantum = AUDIO_SAMPLE_BLOCK_SEC * 1000;
// Time guaranteed each quantum.
const double kAudioTimeNeeded = kGuaranteedAudioDutyCycle * kTimeQuantum;
// Maximum time each quantum.
const double kMaxTimeAllowed = kMaxAudioDutyCycle * kTimeQuantum;
// Get the conversion factor from milliseconds to absolute time
// which is what the time-constraints call needs.
mach_timebase_info_data_t tbInfo;
mach_timebase_info(&tbInfo);
double ms_to_abs_time =
(static_cast<double>(tbInfo.denom) / tbInfo.numer) * 1000000;
thread_time_constraint_policy_data_t timeConstraints;
timeConstraints.period = kTimeQuantum * ms_to_abs_time;
timeConstraints.computation = kAudioTimeNeeded * ms_to_abs_time;
timeConstraints.constraint = kMaxTimeAllowed * ms_to_abs_time;
timeConstraints.preemptible = 1;
result =
thread_policy_set(currentThreadId,
THREAD_TIME_CONSTRAINT_POLICY,
reinterpret_cast<thread_policy_t>(&timeConstraints),
THREAD_TIME_CONSTRAINT_POLICY_COUNT);
if (result != KERN_SUCCESS)
{
log_warn("Could not set time constraint policy");
return;
}
// Join Core Audio workgroup
workgroup_ = nullptr;
joinToken_ = nullptr;
if (@available(macOS 11.0, *))
{
UInt32 size = sizeof(os_workgroup_t);
os_workgroup_t* wgMem = new os_workgroup_t;
assert(wgMem != nullptr);
os_workgroup_join_token_s* wgToken = new os_workgroup_join_token_s;
assert(wgToken != nullptr);
AudioObjectPropertyAddress propertyAddress = {
.mSelector = kAudioDevicePropertyIOThreadOSWorkgroup,
.mScope = kAudioObjectPropertyScopeGlobal,
.mElement = kAudioObjectPropertyElementMain
};
OSStatus osResult = AudioObjectGetPropertyData(
coreAudioId_,
&propertyAddress,
0,
nullptr,
&size,
wgMem);
if (osResult != noErr)
{
log_warn("Could not get audio workgroup");
workgroup_ = nullptr;
joinToken_ = nullptr;
delete wgMem;
delete wgToken;
return;
}
else
{
workgroup_ = wgMem;
joinToken_ = wgToken;
}
auto workgroupResult = os_workgroup_join(*wgMem, wgToken);
if (workgroupResult != 0)
{
log_warn("Could not join Core Audio workgroup (err = %d)", workgroupResult);
workgroup_ = nullptr;
joinToken_ = nullptr;
delete wgMem;
delete wgToken;
}
}
}
void MacAudioDevice::startRealTimeWork()
{
// empty
}
void MacAudioDevice::stopRealTimeWork()
{
dispatch_semaphore_wait(sem_, dispatch_time(DISPATCH_TIME_NOW, AUDIO_SAMPLE_BLOCK_SEC * kOneNanosecond));
}
void MacAudioDevice::clearHelperRealTime()
{
if (@available(macOS 11.0, *))
{
if (workgroup_ != nullptr)
{
os_workgroup_t* wgMem = (os_workgroup_t*)workgroup_;
os_workgroup_join_token_s* wgToken = (os_workgroup_join_token_s*)joinToken_;
os_workgroup_leave(*wgMem, wgToken);
delete wgMem;
delete wgToken;
workgroup_ = nullptr;
joinToken_ = nullptr;
}
}
}

View File

@ -23,9 +23,15 @@
#include <cstring> #include <cstring>
#include <cstdio> #include <cstdio>
#include <chrono> #include <chrono>
#include <sched.h>
#include <sys/resource.h>
#include "PulseAudioDevice.h" #include "PulseAudioDevice.h"
#if defined(USE_RTKIT)
#include "rtkit.h"
#endif // defined(USE_RTKIT)
#include "../util/logging/ulog.h" #include "../util/logging/ulog.h"
using namespace std::chrono_literals; using namespace std::chrono_literals;
@ -139,6 +145,12 @@ void PulseAudioDevice::start()
} }
else else
{ {
// Set up semaphore for signaling workers
if (sem_init(&sem_, 0, 0) < 0)
{
log_warn("Could not set up semaphore (errno = %d)", errno);
}
// Start data collection thread. This thread // Start data collection thread. This thread
// is necessary in order to ensure that we can // is necessary in order to ensure that we can
// provide data to PulseAudio at a rate expected // provide data to PulseAudio at a rate expected
@ -191,6 +203,7 @@ void PulseAudioDevice::start()
{ {
onAudioDataFunction(*this, data, currentLength / getNumChannels(), onAudioDataState); onAudioDataFunction(*this, data, currentLength / getNumChannels(), onAudioDataState);
} }
sem_post(&sem_);
// Sleep up to the number of milliseconds corresponding to the data received // Sleep up to the number of milliseconds corresponding to the data received
int numMilliseconds = 1000.0 * ((double)currentLength / getNumChannels()) / (double)getSampleRate(); int numMilliseconds = 1000.0 * ((double)currentLength / getNumChannels()) / (double)getSampleRate();
@ -307,6 +320,8 @@ void PulseAudioDevice::stop()
delete inputPendingThread_; delete inputPendingThread_;
inputPendingThread_ = nullptr; inputPendingThread_ = nullptr;
} }
sem_destroy(&sem_);
} }
} }
@ -321,6 +336,78 @@ int PulseAudioDevice::getLatencyInMicroseconds()
return (int)latency; return (int)latency;
} }
void PulseAudioDevice::setHelperRealTime()
{
// Set RLIMIT_RTTIME, required for rtkit
struct rlimit rlim;
memset(&rlim, 0, sizeof(rlim));
rlim.rlim_cur = 100000ULL; // 100ms
rlim.rlim_max = rlim.rlim_cur;
if ((setrlimit(RLIMIT_RTTIME, &rlim) < 0))
{
log_warn("Could not set RLIMIT_RTTIME (errno = %d)", errno);
}
// Prerequisite: SCHED_RR and SCHED_RESET_ON_FORK need to be set.
struct sched_param p;
memset(&p, 0, sizeof(p));
p.sched_priority = 3;
if (sched_setscheduler(0, SCHED_RR|SCHED_RESET_ON_FORK, &p) < 0 && errno == EPERM)
{
#if defined(USE_RTKIT)
DBusError error;
DBusConnection* bus = nullptr;
int result = 0;
dbus_error_init(&error);
if (!(bus = dbus_bus_get(DBUS_BUS_SYSTEM, &error)))
{
log_warn("Could not connect to system bus: %s", error.message);
}
else if ((result = rtkit_make_realtime(bus, 0, p.sched_priority)) < 0)
{
log_warn("rtkit could not make real-time: %s", strerror(-result));
}
if (bus != nullptr)
{
dbus_connection_unref(bus);
}
#else
log_warn("No permission to make real-time");
#endif // defined(USE_RTKIT)
}
}
void PulseAudioDevice::stopRealTimeWork()
{
struct timespec ts;
if (clock_gettime(CLOCK_REALTIME, &ts) == -1)
{
// Fallback to simple sleep.
IAudioDevice::stopRealTimeWork();
return;
}
ts.tv_nsec += 10000000;
ts.tv_sec += (ts.tv_nsec / 1000000000);
ts.tv_nsec = ts.tv_nsec % 1000000000;
if (sem_timedwait(&sem_, &ts) < 0 && errno != ETIMEDOUT)
{
// Fallback to simple sleep.
IAudioDevice::stopRealTimeWork();
}
}
void PulseAudioDevice::clearHelperRealTime()
{
IAudioDevice::clearHelperRealTime();
}
void PulseAudioDevice::StreamReadCallback_(pa_stream *s, size_t length, void *userdata) void PulseAudioDevice::StreamReadCallback_(pa_stream *s, size_t length, void *userdata)
{ {
const void* data = nullptr; const void* data = nullptr;

View File

@ -28,6 +28,8 @@
#include <condition_variable> #include <condition_variable>
#include <wx/string.h> #include <wx/string.h>
#include <pulse/pulseaudio.h> #include <pulse/pulseaudio.h>
#include <semaphore.h>
#include "IAudioEngine.h" #include "IAudioEngine.h"
#include "IAudioDevice.h" #include "IAudioDevice.h"
@ -46,6 +48,17 @@ public:
virtual int getLatencyInMicroseconds() override; virtual int getLatencyInMicroseconds() override;
// Configures current thread for real-time priority. This should be
// called from the thread that will be operating on received audio.
virtual void setHelperRealTime() override;
// Lets audio system know that we're done with the work on the received
// audio.
virtual void stopRealTimeWork() override;
// Reverts real-time priority for current thread.
virtual void clearHelperRealTime() override;
protected: protected:
// PulseAudioDevice cannot be created directly, only via PulseAudioEngine. // PulseAudioDevice cannot be created directly, only via PulseAudioEngine.
friend class PulseAudioEngine; friend class PulseAudioEngine;
@ -77,6 +90,8 @@ private:
std::mutex streamStateMutex_; std::mutex streamStateMutex_;
std::condition_variable streamStateCondVar_; std::condition_variable streamStateCondVar_;
sem_t sem_;
static void StreamReadCallback_(pa_stream *s, size_t length, void *userdata); static void StreamReadCallback_(pa_stream *s, size_t length, void *userdata);
static void StreamWriteCallback_(pa_stream *s, size_t length, void *userdata); static void StreamWriteCallback_(pa_stream *s, size_t length, void *userdata);
static void StreamUnderflowCallback_(pa_stream *p, void *userdata); static void StreamUnderflowCallback_(pa_stream *p, void *userdata);

View File

@ -29,11 +29,13 @@
#include <avrt.h> #include <avrt.h>
#include "../util/logging/ulog.h" #include "../util/logging/ulog.h"
#define BLOCK_TIME_NS (40000000) #define BLOCK_TIME_NS (0)
// Nanoseconds per REFERENCE_TIME unit // Nanoseconds per REFERENCE_TIME unit
#define NS_PER_REFTIME (100) #define NS_PER_REFTIME (100)
thread_local HANDLE WASAPIAudioDevice::helperTask_ = nullptr;
WASAPIAudioDevice::WASAPIAudioDevice(IAudioClient* client, IAudioEngine::AudioDirection direction, int sampleRate, int numChannels) WASAPIAudioDevice::WASAPIAudioDevice(IAudioClient* client, IAudioEngine::AudioDirection direction, int sampleRate, int numChannels)
: client_(client) : client_(client)
, renderClient_(nullptr) , renderClient_(nullptr)
@ -47,6 +49,7 @@ WASAPIAudioDevice::WASAPIAudioDevice(IAudioClient* client, IAudioEngine::AudioDi
, latencyFrames_(0) , latencyFrames_(0)
, renderCaptureEvent_(nullptr) , renderCaptureEvent_(nullptr)
, isRenderCaptureRunning_(false) , isRenderCaptureRunning_(false)
, semaphore_(nullptr)
{ {
// empty // empty
} }
@ -254,6 +257,15 @@ void WASAPIAudioDevice::start()
} }
} }
// Create semaphore
semaphore_ = CreateSemaphore(nullptr, 0, 1, nullptr);
if (semaphore_ == nullptr)
{
std::stringstream ss;
ss << "Could not create semaphore (err = " << GetLastError() << ")";
log_warn(ss.str().c_str());
}
// Start render/capture // Start render/capture
hr = client_->Start(); hr = client_->Start();
if (FAILED(hr)) if (FAILED(hr))
@ -296,8 +308,9 @@ void WASAPIAudioDevice::start()
log_warn("Could not increase thread priority"); log_warn("Could not increase thread priority");
} }
while (isRenderCaptureRunning_ && WaitForSingleObject(renderCaptureEvent_, 100) == WAIT_OBJECT_0) while (isRenderCaptureRunning_)
{ {
WaitForSingleObject(renderCaptureEvent_, 100);
if (direction_ == IAudioEngine::AUDIO_ENGINE_OUT) if (direction_ == IAudioEngine::AUDIO_ENGINE_OUT)
{ {
renderAudio_(); renderAudio_();
@ -364,6 +377,12 @@ void WASAPIAudioDevice::stop()
captureClient_ = nullptr; captureClient_ = nullptr;
} }
if (semaphore_ != nullptr)
{
CloseHandle(semaphore_);
semaphore_ = nullptr;
}
prom->set_value(); prom->set_value();
}); });
fut.wait(); fut.wait();
@ -381,6 +400,49 @@ int WASAPIAudioDevice::getLatencyInMicroseconds()
return 1000000 * latencyFrames_ / sampleRate_; return 1000000 * latencyFrames_ / sampleRate_;
} }
void WASAPIAudioDevice::setHelperRealTime()
{
DWORD taskIndex = 0;
helperTask_ = AvSetMmThreadCharacteristics(TEXT("Pro Audio"), &taskIndex);
if (helperTask_ == nullptr)
{
log_warn("Could not increase thread priority");
}
}
void WASAPIAudioDevice::startRealTimeWork()
{
// empty
}
void WASAPIAudioDevice::stopRealTimeWork()
{
if (semaphore_ == nullptr)
{
// Fallback to base class behavior
IAudioDevice::stopRealTimeWork();
return;
}
// Wait a maximum of (bufferSize / sampleRate) seconds for the semaphore to return
DWORD result = WaitForSingleObject(semaphore_, (1000 * bufferFrameCount_) / sampleRate_);
if (result != WAIT_TIMEOUT && result != WAIT_OBJECT_0)
{
// Fallback to a simple sleep.
IAudioDevice::stopRealTimeWork();
}
}
void WASAPIAudioDevice::clearHelperRealTime()
{
if (helperTask_ != nullptr)
{
AvRevertMmThreadCharacteristics(helperTask_);
helperTask_ = nullptr;
}
}
void WASAPIAudioDevice::renderAudio_() void WASAPIAudioDevice::renderAudio_()
{ {
// If client is no longer available, abort // If client is no longer available, abort
@ -514,4 +576,10 @@ void WASAPIAudioDevice::captureAudio_()
return; return;
} }
} }
if (semaphore_ != nullptr)
{
// Notify worker threads
ReleaseSemaphore(semaphore_, 1, nullptr);
}
} }

View File

@ -50,6 +50,21 @@ public:
virtual int getLatencyInMicroseconds() override; virtual int getLatencyInMicroseconds() override;
// Configures current thread for real-time priority. This should be
// called from the thread that will be operating on received audio.
virtual void setHelperRealTime() override;
// Lets audio system know that we're beginning to do work with the
// received audio.
virtual void startRealTimeWork() override;
// Lets audio system know that we're done with the work on the received
// audio.
virtual void stopRealTimeWork() override;
// Reverts real-time priority for current thread.
virtual void clearHelperRealTime() override;
protected: protected:
friend class WASAPIAudioEngine; friend class WASAPIAudioEngine;
@ -69,9 +84,12 @@ private:
std::thread renderCaptureThread_; std::thread renderCaptureThread_;
HANDLE renderCaptureEvent_; HANDLE renderCaptureEvent_;
bool isRenderCaptureRunning_; bool isRenderCaptureRunning_;
HANDLE semaphore_;
void renderAudio_(); void renderAudio_();
void captureAudio_(); void captureAudio_();
static thread_local HANDLE helperTask_;
}; };
#endif // WASAPI_AUDIO_DEVICE_H #endif // WASAPI_AUDIO_DEVICE_H

308
src/audio/rtkit.c 100644
View File

@ -0,0 +1,308 @@
/*-*- Mode: C; c-basic-offset: 8 -*-*/
/***
Copyright 2009 Lennart Poettering
Copyright 2010 David Henningsson <diwic@ubuntu.com>
Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation files
(the "Software"), to deal in the Software without restriction,
including without limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of the Software,
and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
***/
#include <errno.h>
#include "rtkit.h"
#ifdef __linux__
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/syscall.h>
static pid_t _gettid(void) {
return (pid_t) syscall(SYS_gettid);
}
static int translate_error(const char *name) {
if (strcmp(name, DBUS_ERROR_NO_MEMORY) == 0)
return -ENOMEM;
if (strcmp(name, DBUS_ERROR_SERVICE_UNKNOWN) == 0 ||
strcmp(name, DBUS_ERROR_NAME_HAS_NO_OWNER) == 0)
return -ENOENT;
if (strcmp(name, DBUS_ERROR_ACCESS_DENIED) == 0 ||
strcmp(name, DBUS_ERROR_AUTH_FAILED) == 0)
return -EACCES;
return -EIO;
}
static long long rtkit_get_int_property(DBusConnection *connection, const char* propname, long long* propval) {
DBusMessage *m = NULL, *r = NULL;
DBusMessageIter iter, subiter;
dbus_int64_t i64;
dbus_int32_t i32;
DBusError error;
int current_type;
long long ret;
const char * interfacestr = "org.freedesktop.RealtimeKit1";
dbus_error_init(&error);
if (!(m = dbus_message_new_method_call(
RTKIT_SERVICE_NAME,
RTKIT_OBJECT_PATH,
"org.freedesktop.DBus.Properties",
"Get"))) {
ret = -ENOMEM;
goto finish;
}
if (!dbus_message_append_args(
m,
DBUS_TYPE_STRING, &interfacestr,
DBUS_TYPE_STRING, &propname,
DBUS_TYPE_INVALID)) {
ret = -ENOMEM;
goto finish;
}
if (!(r = dbus_connection_send_with_reply_and_block(connection, m, -1, &error))) {
ret = translate_error(error.name);
goto finish;
}
if (dbus_set_error_from_message(&error, r)) {
ret = translate_error(error.name);
goto finish;
}
ret = -EBADMSG;
dbus_message_iter_init(r, &iter);
while ((current_type = dbus_message_iter_get_arg_type (&iter)) != DBUS_TYPE_INVALID) {
if (current_type == DBUS_TYPE_VARIANT) {
dbus_message_iter_recurse(&iter, &subiter);
while ((current_type = dbus_message_iter_get_arg_type (&subiter)) != DBUS_TYPE_INVALID) {
if (current_type == DBUS_TYPE_INT32) {
dbus_message_iter_get_basic(&subiter, &i32);
*propval = i32;
ret = 0;
}
if (current_type == DBUS_TYPE_INT64) {
dbus_message_iter_get_basic(&subiter, &i64);
*propval = i64;
ret = 0;
}
dbus_message_iter_next (&subiter);
}
}
dbus_message_iter_next (&iter);
}
finish:
if (m)
dbus_message_unref(m);
if (r)
dbus_message_unref(r);
dbus_error_free(&error);
return ret;
}
int rtkit_get_max_realtime_priority(DBusConnection *connection) {
long long retval;
int err;
err = rtkit_get_int_property(connection, "MaxRealtimePriority", &retval);
return err < 0 ? err : retval;
}
int rtkit_get_min_nice_level(DBusConnection *connection, int* min_nice_level) {
long long retval;
int err;
err = rtkit_get_int_property(connection, "MinNiceLevel", &retval);
if (err >= 0)
*min_nice_level = retval;
return err;
}
long long rtkit_get_rttime_usec_max(DBusConnection *connection) {
long long retval;
int err;
err = rtkit_get_int_property(connection, "RTTimeUSecMax", &retval);
return err < 0 ? err : retval;
}
int rtkit_make_realtime(DBusConnection *connection, pid_t thread, int priority) {
DBusMessage *m = NULL, *r = NULL;
dbus_uint64_t u64;
dbus_uint32_t u32;
DBusError error;
int ret;
dbus_error_init(&error);
if (thread == 0)
thread = _gettid();
if (!(m = dbus_message_new_method_call(
RTKIT_SERVICE_NAME,
RTKIT_OBJECT_PATH,
"org.freedesktop.RealtimeKit1",
"MakeThreadRealtime"))) {
ret = -ENOMEM;
goto finish;
}
u64 = (dbus_uint64_t) thread;
u32 = (dbus_uint32_t) priority;
if (!dbus_message_append_args(
m,
DBUS_TYPE_UINT64, &u64,
DBUS_TYPE_UINT32, &u32,
DBUS_TYPE_INVALID)) {
ret = -ENOMEM;
goto finish;
}
if (!(r = dbus_connection_send_with_reply_and_block(connection, m, -1, &error))) {
ret = translate_error(error.name);
goto finish;
}
if (dbus_set_error_from_message(&error, r)) {
ret = translate_error(error.name);
goto finish;
}
ret = 0;
finish:
if (m)
dbus_message_unref(m);
if (r)
dbus_message_unref(r);
dbus_error_free(&error);
return ret;
}
int rtkit_make_high_priority(DBusConnection *connection, pid_t thread, int nice_level) {
DBusMessage *m = NULL, *r = NULL;
dbus_uint64_t u64;
dbus_int32_t s32;
DBusError error;
int ret;
dbus_error_init(&error);
if (thread == 0)
thread = _gettid();
if (!(m = dbus_message_new_method_call(
RTKIT_SERVICE_NAME,
RTKIT_OBJECT_PATH,
"org.freedesktop.RealtimeKit1",
"MakeThreadHighPriority"))) {
ret = -ENOMEM;
goto finish;
}
u64 = (dbus_uint64_t) thread;
s32 = (dbus_int32_t) nice_level;
if (!dbus_message_append_args(
m,
DBUS_TYPE_UINT64, &u64,
DBUS_TYPE_INT32, &s32,
DBUS_TYPE_INVALID)) {
ret = -ENOMEM;
goto finish;
}
if (!(r = dbus_connection_send_with_reply_and_block(connection, m, -1, &error))) {
ret = translate_error(error.name);
goto finish;
}
if (dbus_set_error_from_message(&error, r)) {
ret = translate_error(error.name);
goto finish;
}
ret = 0;
finish:
if (m)
dbus_message_unref(m);
if (r)
dbus_message_unref(r);
dbus_error_free(&error);
return ret;
}
#else
int rtkit_make_realtime(DBusConnection *connection, pid_t thread, int priority) {
return -ENOTSUP;
}
int rtkit_make_high_priority(DBusConnection *connection, pid_t thread, int nice_level) {
return -ENOTSUP;
}
int rtkit_get_max_realtime_priority(DBusConnection *connection) {
return -ENOTSUP;
}
int rtkit_get_min_nice_level(DBusConnection *connection, int* min_nice_level) {
return -ENOTSUP;
}
long long rtkit_get_rttime_usec_max(DBusConnection *connection) {
return -ENOTSUP;
}
#endif

79
src/audio/rtkit.h 100644
View File

@ -0,0 +1,79 @@
/*-*- Mode: C; c-basic-offset: 8 -*-*/
#ifndef foortkithfoo
#define foortkithfoo
/***
Copyright 2009 Lennart Poettering
Copyright 2010 David Henningsson <diwic@ubuntu.com>
Permission is hereby granted, free of charge, to any person
obtaining a copy of this software and associated documentation files
(the "Software"), to deal in the Software without restriction,
including without limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of the Software,
and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
***/
#include <sys/types.h>
#include <dbus/dbus.h>
#ifdef __cplusplus
extern "C" {
#endif
/* This is the reference implementation for a client for
* RealtimeKit. You don't have to use this, but if do, just copy these
* sources into your repository */
#define RTKIT_SERVICE_NAME "org.freedesktop.RealtimeKit1"
#define RTKIT_OBJECT_PATH "/org/freedesktop/RealtimeKit1"
/* This is mostly equivalent to sched_setparam(thread, SCHED_RR, {
* .sched_priority = priority }). 'thread' needs to be a kernel thread
* id as returned by gettid(), not a pthread_t! If 'thread' is 0 the
* current thread is used. The returned value is a negative errno
* style error code, or 0 on success. */
int rtkit_make_realtime(DBusConnection *system_bus, pid_t thread, int priority);
/* This is mostly equivalent to setpriority(PRIO_PROCESS, thread,
* nice_level). 'thread' needs to be a kernel thread id as returned by
* gettid(), not a pthread_t! If 'thread' is 0 the current thread is
* used. The returned value is a negative errno style error code, or 0
* on success.*/
int rtkit_make_high_priority(DBusConnection *system_bus, pid_t thread, int nice_level);
/* Return the maximum value of realtime priority available. Realtime requests
* above this value will fail. A negative value is an errno style error code.
*/
int rtkit_get_max_realtime_priority(DBusConnection *system_bus);
/* Retrieve the minimum value of nice level available. High prio requests
* below this value will fail. The returned value is a negative errno
* style error code, or 0 on success.*/
int rtkit_get_min_nice_level(DBusConnection *system_bus, int* min_nice_level);
/* Return the maximum value of RLIMIT_RTTIME to set before attempting a
* realtime request. A negative value is an errno style error code.
*/
long long rtkit_get_rttime_usec_max(DBusConnection *system_bus);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -146,7 +146,7 @@ float FreeDVInterface::GetMinimumSNR_(int mode)
void FreeDVInterface::start(int txMode, int fifoSizeMs, bool singleRxThread, bool usingReliableText) void FreeDVInterface::start(int txMode, int fifoSizeMs, bool singleRxThread, bool usingReliableText)
{ {
sync_ = 0; sync_ = 0;
singleRxThread_ = singleRxThread; singleRxThread_ = enabledModes_.size() > 1 ? singleRxThread : true;
modemStatsList_ = new MODEM_STATS[enabledModes_.size()]; modemStatsList_ = new MODEM_STATS[enabledModes_.size()];
for (int index = 0; index < (int)enabledModes_.size(); index++) for (int index = 0; index < (int)enabledModes_.size(); index++)
@ -757,6 +757,7 @@ IPipelineStep* FreeDVInterface::createTransmitPipeline(int inputSampleRate, int
modeFn, modeFn,
modeFn, modeFn,
parallelSteps, parallelSteps,
nullptr,
nullptr nullptr
); );
@ -769,7 +770,8 @@ IPipelineStep* FreeDVInterface::createReceivePipeline(
std::function<int()> getChannelNoiseFn, std::function<int()> getChannelNoiseFn,
std::function<int()> getChannelNoiseSnrFn, std::function<int()> getChannelNoiseSnrFn,
std::function<float()> getFreqOffsetFn, std::function<float()> getFreqOffsetFn,
std::function<float*()> getSigPwrAvgFn) std::function<float*()> getSigPwrAvgFn,
std::shared_ptr<IRealtimeHelper> realtimeHelper)
{ {
std::vector<IPipelineStep*> parallelSteps; std::vector<IPipelineStep*> parallelSteps;
@ -811,7 +813,8 @@ IPipelineStep* FreeDVInterface::createReceivePipeline(
state->preProcessFn, state->preProcessFn,
state->postProcessFn, state->postProcessFn,
parallelSteps, parallelSteps,
state state,
realtimeHelper
); );
return parallelStep; return parallelStep;

View File

@ -52,6 +52,8 @@ extern "C"
#include "lpcnet.h" #include "lpcnet.h"
} }
#include "util/IRealtimeHelper.h"
#include <samplerate.h> #include <samplerate.h>
class IPipelineStep; class IPipelineStep;
@ -131,7 +133,8 @@ public:
std::function<int()> getChannelNoiseFn, std::function<int()> getChannelNoiseFn,
std::function<int()> getChannelNoiseSnrFn, std::function<int()> getChannelNoiseSnrFn,
std::function<float()> getFreqOffsetFn, std::function<float()> getFreqOffsetFn,
std::function<float*()> getSigPwrAvgFn std::function<float*()> getSigPwrAvgFn,
std::shared_ptr<IRealtimeHelper> realtimeHelper
); );
void restartTxVocoder(); void restartTxVocoder();

View File

@ -3121,10 +3121,10 @@ void MainFrame::startRxStream()
g_rxUserdata = new paCallBackData; g_rxUserdata = new paCallBackData;
// create FIFOs used to interface between IAudioEngine and txRx // create FIFOs used to interface between IAudioEngine and txRx
// processing loop, which iterates about once every 20ms. // processing loop, which iterates about once every 10-40ms
// Sample rate conversion, stats for spectral plots, and // (depending on platform/audio library). Sample rate conversion,
// transmit processng are all performed in the tx/rxProcessing // stats for spectral plots, and transmit processng are all performed
// loop. // in the tx/rxProcessing loop.
int m_fifoSize_ms = wxGetApp().appConfiguration.fifoSizeMs; int m_fifoSize_ms = wxGetApp().appConfiguration.fifoSizeMs;
int soundCard1InFifoSizeSamples = m_fifoSize_ms*wxGetApp().appConfiguration.audioConfiguration.soundCard1In.sampleRate / 1000; int soundCard1InFifoSizeSamples = m_fifoSize_ms*wxGetApp().appConfiguration.audioConfiguration.soundCard1In.sampleRate / 1000;
@ -3430,7 +3430,7 @@ void MainFrame::startRxStream()
// start tx/rx processing thread // start tx/rx processing thread
if (txInSoundDevice && txOutSoundDevice) if (txInSoundDevice && txOutSoundDevice)
{ {
m_txThread = new TxRxThread(true, txInSoundDevice->getSampleRate(), txOutSoundDevice->getSampleRate(), wxGetApp().linkStep.get()); m_txThread = new TxRxThread(true, txInSoundDevice->getSampleRate(), txOutSoundDevice->getSampleRate(), wxGetApp().linkStep.get(), txInSoundDevice);
if ( m_txThread->Create() != wxTHREAD_NO_ERROR ) if ( m_txThread->Create() != wxTHREAD_NO_ERROR )
{ {
wxLogError(wxT("Can't create TX thread!")); wxLogError(wxT("Can't create TX thread!"));
@ -3469,7 +3469,7 @@ void MainFrame::startRxStream()
} }
} }
m_rxThread = new TxRxThread(false, rxInSoundDevice->getSampleRate(), rxOutSoundDevice->getSampleRate(), wxGetApp().linkStep.get()); m_rxThread = new TxRxThread(false, rxInSoundDevice->getSampleRate(), rxOutSoundDevice->getSampleRate(), wxGetApp().linkStep.get(), rxInSoundDevice);
if ( m_rxThread->Create() != wxTHREAD_NO_ERROR ) if ( m_rxThread->Create() != wxTHREAD_NO_ERROR )
{ {
wxLogError(wxT("Can't create RX thread!")); wxLogError(wxT("Can't create RX thread!"));

View File

@ -29,12 +29,17 @@ ComputeRfSpectrumStep::ComputeRfSpectrumStep(
: modemStatsFn_(modemStatsFn) : modemStatsFn_(modemStatsFn)
, getAvMagFn_(getAvMagFn) , getAvMagFn_(getAvMagFn)
{ {
// empty rxSpectrum_ = new float[MODEM_STATS_NSPEC];
assert(rxSpectrum_ != nullptr);
rxFdm_ = new COMP[FS];
assert(rxFdm_ != nullptr);
} }
ComputeRfSpectrumStep::~ComputeRfSpectrumStep() ComputeRfSpectrumStep::~ComputeRfSpectrumStep()
{ {
// empty delete[] rxSpectrum_;
delete[] rxFdm_;
} }
int ComputeRfSpectrumStep::getInputSampleRate() const int ComputeRfSpectrumStep::getInputSampleRate() const
@ -49,29 +54,23 @@ int ComputeRfSpectrumStep::getOutputSampleRate() const
std::shared_ptr<short> ComputeRfSpectrumStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples) std::shared_ptr<short> ComputeRfSpectrumStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples)
{ {
COMP* rx_fdm = new COMP[numInputSamples];
assert(rx_fdm != nullptr);
float rx_spec[MODEM_STATS_NSPEC];
auto inputSamplesPtr = inputSamples.get(); auto inputSamplesPtr = inputSamples.get();
for (int i = 0; i < numInputSamples; i++) for (int i = 0; i < numInputSamples; i++)
{ {
rx_fdm[i].real = inputSamplesPtr[i]; rxFdm_[i].real = inputSamplesPtr[i];
} }
modem_stats_get_rx_spectrum(modemStatsFn_(), rx_spec, rx_fdm, numInputSamples); modem_stats_get_rx_spectrum(modemStatsFn_(), rxSpectrum_, rxFdm_, numInputSamples);
// Average rx spectrum data using a simple IIR low pass filter // Average rx spectrum data using a simple IIR low pass filter
auto avMagPtr = getAvMagFn_(); auto avMagPtr = getAvMagFn_();
for(int i = 0; i < MODEM_STATS_NSPEC; i++) for(int i = 0; i < MODEM_STATS_NSPEC; i++)
{ {
avMagPtr[i] = BETA * avMagPtr[i] + (1.0 - BETA) * rx_spec[i]; avMagPtr[i] = BETA * avMagPtr[i] + (1.0 - BETA) * rxSpectrum_[i];
} }
// Tap only, no output. // Tap only, no output.
*numOutputSamples = 0; *numOutputSamples = 0;
delete[] rx_fdm;
return std::shared_ptr<short>((short*)nullptr, std::default_delete<short[]>()); return std::shared_ptr<short>((short*)nullptr, std::default_delete<short[]>());
} }

View File

@ -46,6 +46,8 @@ public:
private: private:
std::function<struct MODEM_STATS*()> modemStatsFn_; std::function<struct MODEM_STATS*()> modemStatsFn_;
std::function<float*()> getAvMagFn_; std::function<float*()> getAvMagFn_;
float* rxSpectrum_;
COMP* rxFdm_;
}; };
#endif // AUDIO_PIPELINE__COMPUTE_RF_SPECTRUM_STEP_H #endif // AUDIO_PIPELINE__COMPUTE_RF_SPECTRUM_STEP_H

View File

@ -22,6 +22,7 @@
#include "EqualizerStep.h" #include "EqualizerStep.h"
#include <algorithm>
#include <cstring> #include <cstring>
#include "../sox_biquad.h" #include "../sox_biquad.h"
#include <assert.h> #include <assert.h>
@ -34,7 +35,12 @@ EqualizerStep::EqualizerStep(int sampleRate, bool* enableFilter, void** bassFilt
, trebleFilter_(trebleFilter) , trebleFilter_(trebleFilter)
, volFilter_(volFilter) , volFilter_(volFilter)
{ {
// empty // Pre-allocate buffers so we don't have to do so during real-time operation.
auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate());
outputSamples_ = std::shared_ptr<short>(
new short[maxSamples],
std::default_delete<short[]>());
assert(outputSamples_ != nullptr);
} }
EqualizerStep::~EqualizerStep() EqualizerStep::~EqualizerStep()
@ -54,31 +60,28 @@ int EqualizerStep::getOutputSampleRate() const
std::shared_ptr<short> EqualizerStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples) std::shared_ptr<short> EqualizerStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples)
{ {
short* outputSamples = new short[numInputSamples]; memcpy(outputSamples_.get(), inputSamples.get(), sizeof(short)*numInputSamples);
assert(outputSamples != nullptr);
memcpy(outputSamples, inputSamples.get(), sizeof(short)*numInputSamples);
if (*enableFilter_) if (*enableFilter_)
{ {
if (*bassFilter_) if (*bassFilter_)
{ {
sox_biquad_filter(*bassFilter_, outputSamples, outputSamples, numInputSamples); sox_biquad_filter(*bassFilter_, outputSamples_.get(), outputSamples_.get(), numInputSamples);
} }
if (*trebleFilter_) if (*trebleFilter_)
{ {
sox_biquad_filter(*trebleFilter_, outputSamples, outputSamples, numInputSamples); sox_biquad_filter(*trebleFilter_, outputSamples_.get(), outputSamples_.get(), numInputSamples);
} }
if (*midFilter_) if (*midFilter_)
{ {
sox_biquad_filter(*midFilter_, outputSamples, outputSamples, numInputSamples); sox_biquad_filter(*midFilter_, outputSamples_.get(), outputSamples_.get(), numInputSamples);
} }
if (*volFilter_) if (*volFilter_)
{ {
sox_biquad_filter(*volFilter_, outputSamples, outputSamples, numInputSamples); sox_biquad_filter(*volFilter_, outputSamples_.get(), outputSamples_.get(), numInputSamples);
} }
} }
*numOutputSamples = numInputSamples; *numOutputSamples = numInputSamples;
return std::shared_ptr<short>(outputSamples, std::default_delete<short[]>()); return outputSamples_;
} }

View File

@ -44,6 +44,7 @@ private:
void** midFilter_; void** midFilter_;
void** trebleFilter_; void** trebleFilter_;
void** volFilter_; void** volFilter_;
std::shared_ptr<short> outputSamples_;
}; };

View File

@ -42,10 +42,30 @@ FreeDVReceiveStep::FreeDVReceiveStep(struct freedv* dv)
rxFreqOffsetPhaseRectObjs_.real = cos(0.0); rxFreqOffsetPhaseRectObjs_.real = cos(0.0);
rxFreqOffsetPhaseRectObjs_.imag = sin(0.0); rxFreqOffsetPhaseRectObjs_.imag = sin(0.0);
// Pre-allocate buffers so we don't have to do so during real-time operation.
auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate());
outputSamples_ = std::shared_ptr<short>(
new short[maxSamples],
std::default_delete<short[]>());
assert(outputSamples_ != nullptr);
inputBuf_ = new short[freedv_get_n_max_modem_samples(dv_)];
assert(inputBuf_ != nullptr);
rxFdm_ = new COMP[freedv_get_n_max_modem_samples(dv_)];
assert(rxFdm_ != nullptr);
rxFdmOffset_ = new COMP[freedv_get_n_max_modem_samples(dv_)];
assert(rxFdmOffset_ != nullptr);
} }
FreeDVReceiveStep::~FreeDVReceiveStep() FreeDVReceiveStep::~FreeDVReceiveStep()
{ {
delete[] inputBuf_;
delete[] rxFdm_;
delete[] rxFdmOffset_;
if (inputSampleFifo_ != nullptr) if (inputSampleFifo_ != nullptr)
{ {
codec2_fifo_free(inputSampleFifo_); codec2_fifo_free(inputSampleFifo_);
@ -65,19 +85,6 @@ int FreeDVReceiveStep::getOutputSampleRate() const
std::shared_ptr<short> FreeDVReceiveStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples) std::shared_ptr<short> FreeDVReceiveStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples)
{ {
*numOutputSamples = 0; *numOutputSamples = 0;
short* outputSamples = nullptr;
short* input_buf = new short[freedv_get_n_max_modem_samples(dv_)];
assert(input_buf != nullptr);
short* output_buf = new short[freedv_get_n_speech_samples(dv_)];
assert(output_buf != nullptr);
COMP* rx_fdm = new COMP[freedv_get_n_max_modem_samples(dv_)];
assert(rx_fdm != nullptr);
COMP* rx_fdm_offset = new COMP[freedv_get_n_max_modem_samples(dv_)];
assert(rx_fdm_offset != nullptr);
short* inputPtr = inputSamples.get(); short* inputPtr = inputSamples.get();
while (numInputSamples > 0 && inputPtr != nullptr) while (numInputSamples > 0 && inputPtr != nullptr)
@ -87,46 +94,29 @@ std::shared_ptr<short> FreeDVReceiveStep::execute(std::shared_ptr<short> inputSa
int nin = freedv_nin(dv_); int nin = freedv_nin(dv_);
int nout = 0; int nout = 0;
while (codec2_fifo_read(inputSampleFifo_, input_buf, nin) == 0) while (codec2_fifo_read(inputSampleFifo_, inputBuf_, nin) == 0)
{ {
assert(nin <= freedv_get_n_max_modem_samples(dv_)); assert(nin <= freedv_get_n_max_modem_samples(dv_));
// demod per frame processing // demod per frame processing
for(int i=0; i<nin; i++) { for(int i=0; i<nin; i++) {
rx_fdm[i].real = (float)input_buf[i]; rxFdm_[i].real = (float)inputBuf_[i];
rx_fdm[i].imag = 0.0; rxFdm_[i].imag = 0.0;
} }
// Optional channel noise // Optional channel noise
if (channelNoiseEnabled_) { if (channelNoiseEnabled_) {
fdmdv_simulate_channel(&sigPwrAvg_, rx_fdm, nin, channelNoiseSnr_); fdmdv_simulate_channel(&sigPwrAvg_, rxFdm_, nin, channelNoiseSnr_);
} }
// Optional frequency shifting // Optional frequency shifting
freq_shift_coh(rx_fdm_offset, rx_fdm, freqOffsetHz_, freedv_get_modem_sample_rate(dv_), &rxFreqOffsetPhaseRectObjs_, nin); freq_shift_coh(rxFdmOffset_, rxFdm_, freqOffsetHz_, freedv_get_modem_sample_rate(dv_), &rxFreqOffsetPhaseRectObjs_, nin);
nout = freedv_comprx(dv_, output_buf, rx_fdm_offset); nout = freedv_comprx(dv_, outputSamples_.get() + *numOutputSamples, rxFdmOffset_);
short* newOutputSamples = new short[*numOutputSamples + nout];
assert(newOutputSamples != nullptr);
if (outputSamples != nullptr)
{
memcpy(newOutputSamples, outputSamples, *numOutputSamples * sizeof(short));
delete[] outputSamples;
}
memcpy(newOutputSamples + *numOutputSamples, output_buf, nout * sizeof(short));
*numOutputSamples += nout; *numOutputSamples += nout;
outputSamples = newOutputSamples;
nin = freedv_nin(dv_); nin = freedv_nin(dv_);
} }
} }
delete[] input_buf; return outputSamples_;
delete[] output_buf;
delete[] rx_fdm;
delete[] rx_fdm_offset;
return std::shared_ptr<short>(outputSamples, std::default_delete<short[]>());
} }

View File

@ -63,6 +63,11 @@ private:
bool channelNoiseEnabled_; bool channelNoiseEnabled_;
int channelNoiseSnr_; int channelNoiseSnr_;
float freqOffsetHz_; float freqOffsetHz_;
std::shared_ptr<short> outputSamples_;
short* inputBuf_;
COMP* rxFdm_;
COMP* rxFdmOffset_;
}; };
#endif // AUDIO_PIPELINE__FREEDV_RECEIVE_STEP_H #endif // AUDIO_PIPELINE__FREEDV_RECEIVE_STEP_H

View File

@ -40,10 +40,35 @@ FreeDVTransmitStep::FreeDVTransmitStep(struct freedv* dv, std::function<float()>
txFreqOffsetPhaseRectObj_.real = cos(0.0); txFreqOffsetPhaseRectObj_.real = cos(0.0);
txFreqOffsetPhaseRectObj_.imag = sin(0.0); txFreqOffsetPhaseRectObj_.imag = sin(0.0);
// Pre-allocate buffers so we don't have to do so during real-time operation.
auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate());
outputSamples_ = std::shared_ptr<short>(
new short[maxSamples],
std::default_delete<short[]>());
assert(outputSamples_ != nullptr);
codecInput_ = new short[maxSamples];
assert(codecInput_ != nullptr);
tmpOutput_ = new short[maxSamples];
assert(tmpOutput_ != nullptr);
int nfreedv = freedv_get_n_nom_modem_samples(dv_);
txFdm_ = new COMP[nfreedv];
assert(txFdm_ != nullptr);
txFdmOffset_ = new COMP[nfreedv];
assert(txFdmOffset_ != nullptr);
} }
FreeDVTransmitStep::~FreeDVTransmitStep() FreeDVTransmitStep::~FreeDVTransmitStep()
{ {
delete[] codecInput_;
delete[] txFdm_;
delete[] txFdmOffset_;
if (inputSampleFifo_ != nullptr) if (inputSampleFifo_ != nullptr)
{ {
codec2_fifo_free(inputSampleFifo_); codec2_fifo_free(inputSampleFifo_);
@ -62,8 +87,6 @@ int FreeDVTransmitStep::getOutputSampleRate() const
std::shared_ptr<short> FreeDVTransmitStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples) std::shared_ptr<short> FreeDVTransmitStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples)
{ {
short* outputSamples = nullptr;
int mode = freedv_get_mode(dv_); int mode = freedv_get_mode(dv_);
int samplesUsedForFifo = freedv_get_n_speech_samples(dv_); int samplesUsedForFifo = freedv_get_n_speech_samples(dv_);
int nfreedv = freedv_get_n_nom_modem_samples(dv_); int nfreedv = freedv_get_n_nom_modem_samples(dv_);
@ -78,54 +101,26 @@ std::shared_ptr<short> FreeDVTransmitStep::execute(std::shared_ptr<short> inputS
if (codec2_fifo_used(inputSampleFifo_) >= samplesUsedForFifo) if (codec2_fifo_used(inputSampleFifo_) >= samplesUsedForFifo)
{ {
short* codecInput = new short[samplesUsedForFifo]; codec2_fifo_read(inputSampleFifo_, codecInput_, samplesUsedForFifo);
assert(codecInput != nullptr);
short* tmpOutput = new short[nfreedv];
assert(tmpOutput != nullptr);
codec2_fifo_read(inputSampleFifo_, codecInput, samplesUsedForFifo);
if (mode == FREEDV_MODE_800XA) if (mode == FREEDV_MODE_800XA)
{ {
/* 800XA doesn't support complex output just yet */ /* 800XA doesn't support complex output just yet */
freedv_tx(dv_, tmpOutput, codecInput); freedv_tx(dv_, tmpOutput_, codecInput_);
} }
else else
{ {
COMP* tx_fdm = new COMP[nfreedv]; freedv_comptx(dv_, txFdm_, codecInput_);
assert(tx_fdm != nullptr);
COMP* tx_fdm_offset = new COMP[nfreedv]; freq_shift_coh(txFdmOffset_, txFdm_, getFreqOffsetFn_(), getOutputSampleRate(), &txFreqOffsetPhaseRectObj_, nfreedv);
assert(tx_fdm_offset != nullptr);
freedv_comptx(dv_, tx_fdm, codecInput);
freq_shift_coh(tx_fdm_offset, tx_fdm, getFreqOffsetFn_(), getOutputSampleRate(), &txFreqOffsetPhaseRectObj_, nfreedv);
for(int i = 0; i<nfreedv; i++) for(int i = 0; i<nfreedv; i++)
tmpOutput[i] = tx_fdm_offset[i].real; tmpOutput_[i] = txFdmOffset_[i].real;
delete[] tx_fdm_offset;
delete[] tx_fdm;
} }
short* newOutputSamples = new short[*numOutputSamples + nfreedv]; memcpy(outputSamples_.get() + *numOutputSamples, tmpOutput_, nfreedv * sizeof(short));
assert(newOutputSamples != nullptr);
if (outputSamples != nullptr)
{
memcpy(newOutputSamples, outputSamples, *numOutputSamples * sizeof(short));
delete[] outputSamples;
}
memcpy(newOutputSamples + *numOutputSamples, tmpOutput, nfreedv * sizeof(short));
*numOutputSamples += nfreedv; *numOutputSamples += nfreedv;
outputSamples = newOutputSamples;
delete[] codecInput;
delete[] tmpOutput;
} }
} }
return std::shared_ptr<short>(outputSamples, std::default_delete<short[]>()); return outputSamples_;
} }

View File

@ -50,6 +50,12 @@ private:
std::function<float()> getFreqOffsetFn_; std::function<float()> getFreqOffsetFn_;
struct FIFO* inputSampleFifo_; struct FIFO* inputSampleFifo_;
COMP txFreqOffsetPhaseRectObj_; COMP txFreqOffsetPhaseRectObj_;
COMP* txFdm_;
COMP* txFdmOffset_;
short* codecInput_;
short* tmpOutput_;
std::shared_ptr<short> outputSamples_;
}; };
#endif // AUDIO_PIPELINE__FREEDV_TRANSMIT_STEP_H #endif // AUDIO_PIPELINE__FREEDV_TRANSMIT_STEP_H

View File

@ -28,7 +28,12 @@ LevelAdjustStep::LevelAdjustStep(int sampleRate, std::function<double()> scaleFa
: scaleFactorFn_(scaleFactorFn) : scaleFactorFn_(scaleFactorFn)
, sampleRate_(sampleRate) , sampleRate_(sampleRate)
{ {
// empty // Pre-allocate buffers so we don't have to do so during real-time operation.
auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate());
outputSamples_ = std::shared_ptr<short>(
new short[maxSamples],
std::default_delete<short[]>());
assert(outputSamples_ != nullptr);
} }
LevelAdjustStep::~LevelAdjustStep() LevelAdjustStep::~LevelAdjustStep()
@ -48,14 +53,13 @@ int LevelAdjustStep::getOutputSampleRate() const
std::shared_ptr<short> LevelAdjustStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples) std::shared_ptr<short> LevelAdjustStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples)
{ {
short* outputSamples = new short[numInputSamples];
double scaleFactor = scaleFactorFn_(); double scaleFactor = scaleFactorFn_();
for (int index = 0; index < numInputSamples; index++) for (int index = 0; index < numInputSamples; index++)
{ {
outputSamples[index] = inputSamples.get()[index] * scaleFactor; outputSamples_.get()[index] = inputSamples.get()[index] * scaleFactor;
} }
*numOutputSamples = numInputSamples; *numOutputSamples = numInputSamples;
return std::shared_ptr<short>(outputSamples, std::default_delete<short[]>()); return outputSamples_;
} }

View File

@ -40,6 +40,7 @@ public:
private: private:
std::function<double()> scaleFactorFn_; std::function<double()> scaleFactorFn_;
int sampleRate_; int sampleRate_;
std::shared_ptr<short> outputSamples_;
}; };
#endif // AUDIO_PIPELINE__LEVEL_ADJUST_STEP_H #endif // AUDIO_PIPELINE__LEVEL_ADJUST_STEP_H

View File

@ -33,24 +33,25 @@ LinkStep::LinkStep(int outputSampleRate, size_t numSamples)
// Create pipeline steps // Create pipeline steps
inputPipelineStep_ = std::make_shared<InputStep>(this); inputPipelineStep_ = std::make_shared<InputStep>(this);
outputPipelineStep_ = std::make_shared<OutputStep>(this); outputPipelineStep_ = std::make_shared<OutputStep>(this);
tmpBuffer_ = new short[numSamples];
assert(tmpBuffer_ != nullptr);
} }
LinkStep::~LinkStep() LinkStep::~LinkStep()
{ {
codec2_fifo_destroy(fifo_); codec2_fifo_destroy(fifo_);
fifo_ = nullptr; fifo_ = nullptr;
delete[] tmpBuffer_;
} }
void LinkStep::clearFifo() void LinkStep::clearFifo()
{ {
int numUsed = codec2_fifo_used(fifo_); int numUsed = codec2_fifo_used(fifo_);
short* tmp = new short[numUsed];
assert(tmp != nullptr);
// Read data and then promptly throw it out. // Read data and then promptly throw it out.
codec2_fifo_read(fifo_, tmp, numUsed); codec2_fifo_read(fifo_, tmpBuffer_, numUsed);
delete[] tmp;
} }
std::shared_ptr<short> LinkStep::InputStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples) std::shared_ptr<short> LinkStep::InputStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples)
@ -74,11 +75,8 @@ std::shared_ptr<short> LinkStep::OutputStep::execute(std::shared_ptr<short> inpu
if (*numOutputSamples > 0) if (*numOutputSamples > 0)
{ {
short* outputSamples = new short[*numOutputSamples]; codec2_fifo_read(fifo, outputSamples_.get(), *numOutputSamples);
assert(outputSamples != nullptr); return outputSamples_;
codec2_fifo_read(fifo, outputSamples, *numOutputSamples);
return std::shared_ptr<short>(outputSamples, std::default_delete<short[]>());
} }
else else
{ {

View File

@ -84,7 +84,12 @@ private:
OutputStep(LinkStep* parent) OutputStep(LinkStep* parent)
: parent_(parent) : parent_(parent)
{ {
// empty // Pre-allocate buffers so we don't have to do so during real-time operation.
auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate());
outputSamples_ = std::shared_ptr<short>(
new short[maxSamples],
std::default_delete<short[]>());
assert(outputSamples_ != nullptr);
} }
virtual ~OutputStep() = default; virtual ~OutputStep() = default;
@ -105,12 +110,15 @@ private:
private: private:
LinkStep* parent_; LinkStep* parent_;
std::shared_ptr<short> outputSamples_;
}; };
int sampleRate_; int sampleRate_;
std::shared_ptr<IPipelineStep> inputPipelineStep_; std::shared_ptr<IPipelineStep> inputPipelineStep_;
std::shared_ptr<IPipelineStep> outputPipelineStep_; std::shared_ptr<IPipelineStep> outputPipelineStep_;
FIFO* fifo_; FIFO* fifo_;
short* tmpBuffer_;
}; };
#endif // AUDIO_PIPELINE__LINK_STEP_H #endif // AUDIO_PIPELINE__LINK_STEP_H

View File

@ -20,6 +20,7 @@
// //
//========================================================================= //=========================================================================
#include <algorithm>
#include <cassert> #include <cassert>
#include <cstring> #include <cstring>
#include "MuteStep.h" #include "MuteStep.h"
@ -27,7 +28,14 @@
MuteStep::MuteStep(int outputSampleRate) MuteStep::MuteStep(int outputSampleRate)
: sampleRate_(outputSampleRate) : sampleRate_(outputSampleRate)
{ {
// empty // Pre-allocate buffers so we don't have to do so during real-time operation.
auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate());
outputSamples_ = std::shared_ptr<short>(
new short[maxSamples],
std::default_delete<short[]>());
assert(outputSamples_ != nullptr);
memset(outputSamples_.get(), 0, sizeof(short) * maxSamples);
} }
// Executes pipeline step. // Executes pipeline step.
@ -42,12 +50,7 @@ std::shared_ptr<short> MuteStep::execute(std::shared_ptr<short> inputSamples, in
if (*numOutputSamples > 0) if (*numOutputSamples > 0)
{ {
short* outputSamples = new short[*numOutputSamples]; return outputSamples_;
assert(outputSamples != nullptr);
memset(outputSamples, 0, sizeof(short) * (*numOutputSamples));
return std::shared_ptr<short>(outputSamples, std::default_delete<short[]>());
} }
else else
{ {

View File

@ -47,6 +47,7 @@ public:
private: private:
int sampleRate_; int sampleRate_;
std::shared_ptr<short> outputSamples_;
}; };
#endif // AUDIO_PIPELINE__MUTE_STEP_H #endif // AUDIO_PIPELINE__MUTE_STEP_H

View File

@ -32,13 +32,15 @@ ParallelStep::ParallelStep(
std::function<int(ParallelStep*)> inputRouteFn, std::function<int(ParallelStep*)> inputRouteFn,
std::function<int(ParallelStep*)> outputRouteFn, std::function<int(ParallelStep*)> outputRouteFn,
std::vector<IPipelineStep*> parallelSteps, std::vector<IPipelineStep*> parallelSteps,
std::shared_ptr<void> state) std::shared_ptr<void> state,
std::shared_ptr<IRealtimeHelper> realtimeHelper)
: inputSampleRate_(inputSampleRate) : inputSampleRate_(inputSampleRate)
, outputSampleRate_(outputSampleRate) , outputSampleRate_(outputSampleRate)
, runMultiThreaded_(runMultiThreaded) , runMultiThreaded_(runMultiThreaded)
, inputRouteFn_(inputRouteFn) , inputRouteFn_(inputRouteFn)
, outputRouteFn_(outputRouteFn) , outputRouteFn_(outputRouteFn)
, state_(state) , state_(state)
, realtimeHelper_(realtimeHelper)
{ {
for (auto& step : parallelSteps) for (auto& step : parallelSteps)
{ {
@ -51,7 +53,17 @@ ParallelStep::ParallelStep(
state->exitingThread = false; state->exitingThread = false;
state->thread = std::thread([this](ThreadInfo* threadState) state->thread = std::thread([this](ThreadInfo* threadState)
{ {
if (realtimeHelper_)
{
realtimeHelper_->setHelperRealTime();
}
executeRunnerThread_(threadState); executeRunnerThread_(threadState);
if (realtimeHelper_)
{
realtimeHelper_->clearHelperRealTime();
}
}, state); }, state);
threads_.push_back(state); threads_.push_back(state);

View File

@ -32,6 +32,8 @@
#include <queue> #include <queue>
#include <map> #include <map>
#include "../util/IRealtimeHelper.h"
class ParallelStep : public IPipelineStep class ParallelStep : public IPipelineStep
{ {
public: public:
@ -41,7 +43,8 @@ public:
std::function<int(ParallelStep*)> inputRouteFn, std::function<int(ParallelStep*)> inputRouteFn,
std::function<int(ParallelStep*)> outputRouteFn, std::function<int(ParallelStep*)> outputRouteFn,
std::vector<IPipelineStep*> parallelSteps, std::vector<IPipelineStep*> parallelSteps,
std::shared_ptr<void> state); std::shared_ptr<void> state,
std::shared_ptr<IRealtimeHelper> realtimeHelper);
virtual ~ParallelStep(); virtual ~ParallelStep();
virtual int getInputSampleRate() const; virtual int getInputSampleRate() const;
@ -81,6 +84,7 @@ private:
std::map<std::pair<int, int>, std::shared_ptr<ResampleStep>> resamplers_; std::map<std::pair<int, int>, std::shared_ptr<ResampleStep>> resamplers_;
std::vector<ThreadInfo*> threads_; std::vector<ThreadInfo*> threads_;
std::shared_ptr<void> state_; std::shared_ptr<void> state_;
std::shared_ptr<IRealtimeHelper> realtimeHelper_;
void executeRunnerThread_(ThreadInfo* threadState); void executeRunnerThread_(ThreadInfo* threadState);
std::future<TaskResult> enqueueTask_(ThreadInfo* taskQueueThread, IPipelineStep* step, std::shared_ptr<short> inputSamples, int numInputSamples); std::future<TaskResult> enqueueTask_(ThreadInfo* taskQueueThread, IPipelineStep* step, std::shared_ptr<short> inputSamples, int numInputSamples);

View File

@ -33,7 +33,12 @@ PlaybackStep::PlaybackStep(
, getSndFileFn_(getSndFileFn) , getSndFileFn_(getSndFileFn)
, fileCompleteFn_(fileCompleteFn) , fileCompleteFn_(fileCompleteFn)
{ {
// empty // Pre-allocate buffers so we don't have to do so during real-time operation.
auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate());
outputSamples_ = std::shared_ptr<short>(
new short[maxSamples],
std::default_delete<short[]>());
assert(outputSamples_ != nullptr);
} }
PlaybackStep::~PlaybackStep() PlaybackStep::~PlaybackStep()
@ -57,19 +62,15 @@ std::shared_ptr<short> PlaybackStep::execute(std::shared_ptr<short> inputSamples
assert(playFile != nullptr); assert(playFile != nullptr);
unsigned int nsf = numInputSamples * getOutputSampleRate()/getInputSampleRate(); unsigned int nsf = numInputSamples * getOutputSampleRate()/getInputSampleRate();
short* outputSamples = nullptr;
*numOutputSamples = 0; *numOutputSamples = 0;
if (nsf > 0) if (nsf > 0)
{ {
outputSamples = new short[nsf]; *numOutputSamples = sf_read_short(playFile, outputSamples_.get(), nsf);
assert(outputSamples != nullptr);
*numOutputSamples = sf_read_short(playFile, outputSamples, nsf);
if ((unsigned)*numOutputSamples < nsf) if ((unsigned)*numOutputSamples < nsf)
{ {
fileCompleteFn_(); fileCompleteFn_();
} }
} }
return std::shared_ptr<short>(outputSamples, std::default_delete<short[]>()); return outputSamples_;
} }

View File

@ -44,6 +44,7 @@ private:
std::function<int()> fileSampleRateFn_; std::function<int()> fileSampleRateFn_;
std::function<SNDFILE*()> getSndFileFn_; std::function<SNDFILE*()> getSndFileFn_;
std::function<void()> fileCompleteFn_; std::function<void()> fileCompleteFn_;
std::shared_ptr<short> outputSamples_;
}; };
#endif // AUDIO_PIPELINE__PLAYBACK_STEP_H #endif // AUDIO_PIPELINE__PLAYBACK_STEP_H

View File

@ -48,10 +48,36 @@ RADEReceiveStep::RADEReceiveStep(struct rade* dv, FARGANState* fargan, rade_text
featuresFile_ = fopen((const char*)utRxFeatureFile.ToUTF8(), "wb"); featuresFile_ = fopen((const char*)utRxFeatureFile.ToUTF8(), "wb");
assert(featuresFile_ != nullptr); assert(featuresFile_ != nullptr);
} }
// Pre-allocate buffers so we don't have to do so during real-time operation.
auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate());
outputSamples_ = std::shared_ptr<short>(
new short[maxSamples],
std::default_delete<short[]>());
assert(outputSamples_ != nullptr);
inputBufCplx_ = new RADE_COMP[rade_nin_max(dv_)];
assert(inputBufCplx_ != nullptr);
inputBuf_ = new short[rade_nin_max(dv_)];
assert(inputBuf_ != nullptr);
featuresOut_ = new float[rade_n_features_in_out(dv_)];
assert(featuresOut_ != nullptr);
eooOut_ = new float[rade_n_eoo_bits(dv_)];
assert(eooOut_ != nullptr);
pendingFeatures_.reserve(rade_n_features_in_out(dv_));
} }
RADEReceiveStep::~RADEReceiveStep() RADEReceiveStep::~RADEReceiveStep()
{ {
delete[] inputBuf_;
delete[] inputBufCplx_;
delete[] featuresOut_;
delete[] eooOut_;
if (featuresFile_ != nullptr) if (featuresFile_ != nullptr)
{ {
fclose(featuresFile_); fclose(featuresFile_);
@ -81,16 +107,6 @@ int RADEReceiveStep::getOutputSampleRate() const
std::shared_ptr<short> RADEReceiveStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples) std::shared_ptr<short> RADEReceiveStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples)
{ {
*numOutputSamples = 0; *numOutputSamples = 0;
short* outputSamples = nullptr;
RADE_COMP* input_buf_cplx = new RADE_COMP[rade_nin_max(dv_)];
assert(input_buf_cplx != nullptr);
short* input_buf = new short[rade_nin_max(dv_)];
assert(input_buf != nullptr);
float* features_out = new float[rade_n_features_in_out(dv_)];
assert(features_out != nullptr);
short* inputPtr = inputSamples.get(); short* inputPtr = inputSamples.get();
while (numInputSamples > 0 && inputPtr != nullptr) while (numInputSamples > 0 && inputPtr != nullptr)
@ -100,38 +116,36 @@ std::shared_ptr<short> RADEReceiveStep::execute(std::shared_ptr<short> inputSamp
int nin = rade_nin(dv_); int nin = rade_nin(dv_);
int nout = 0; int nout = 0;
while (codec2_fifo_read(inputSampleFifo_, input_buf, nin) == 0) while (codec2_fifo_read(inputSampleFifo_, inputBuf_, nin) == 0)
{ {
assert(nin <= rade_nin_max(dv_)); assert(nin <= rade_nin_max(dv_));
// demod per frame processing // demod per frame processing
for(int i=0; i<nin; i++) for(int i=0; i<nin; i++)
{ {
input_buf_cplx[i].real = input_buf[i] / 32767.0; inputBufCplx_[i].real = inputBuf_[i] / 32767.0;
input_buf_cplx[i].imag = 0.0; inputBufCplx_[i].imag = 0.0;
} }
// RADE processing (input signal->features). // RADE processing (input signal->features).
int hasEooOut = 0; int hasEooOut = 0;
float* eooOut = new float[rade_n_eoo_bits(dv_)];
assert(eooOut != nullptr);
nout = rade_rx(dv_, features_out, &hasEooOut, eooOut, input_buf_cplx); nout = rade_rx(dv_, featuresOut_, &hasEooOut, eooOut_, inputBufCplx_);
if (hasEooOut && textPtr_ != nullptr) if (hasEooOut && textPtr_ != nullptr)
{ {
// Handle RX of bits from EOO. // Handle RX of bits from EOO.
rade_text_rx(textPtr_, eooOut, rade_n_eoo_bits(dv_) / 2); rade_text_rx(textPtr_, eooOut_, rade_n_eoo_bits(dv_) / 2);
} }
else if (!hasEooOut) else if (!hasEooOut)
{ {
if (featuresFile_) if (featuresFile_)
{ {
fwrite(features_out, sizeof(float), nout, featuresFile_); fwrite(featuresOut_, sizeof(float), nout, featuresFile_);
} }
for (int i = 0; i < nout; i++) for (int i = 0; i < nout; i++)
{ {
pendingFeatures_.push_back(features_out[i]); pendingFeatures_.push_back(featuresOut_[i]);
} }
// FARGAN processing (features->analog audio) // FARGAN processing (features->analog audio)
@ -163,21 +177,14 @@ std::shared_ptr<short> RADEReceiveStep::execute(std::shared_ptr<short> inputSamp
} }
} }
delete[] eooOut;
nin = rade_nin(dv_); nin = rade_nin(dv_);
} }
} }
if (*numOutputSamples > 0) if (*numOutputSamples > 0)
{ {
outputSamples = new short[*numOutputSamples]; codec2_fifo_read(outputSampleFifo_, outputSamples_.get(), *numOutputSamples);
assert(outputSamples != nullptr);
codec2_fifo_read(outputSampleFifo_, outputSamples, *numOutputSamples);
} }
delete[] input_buf_cplx; return outputSamples_;
delete[] input_buf;
delete[] features_out;
return std::shared_ptr<short>(outputSamples, std::default_delete<short[]>());
} }

View File

@ -57,6 +57,12 @@ private:
std::vector<float> pendingFeatures_; std::vector<float> pendingFeatures_;
FILE* featuresFile_; FILE* featuresFile_;
rade_text_t textPtr_; rade_text_t textPtr_;
RADE_COMP* inputBufCplx_;
short* inputBuf_;
float* featuresOut_;
float* eooOut_;
std::shared_ptr<short> outputSamples_;
}; };
#endif // AUDIO_PIPELINE__RADE_RECEIVE_STEP_H #endif // AUDIO_PIPELINE__RADE_RECEIVE_STEP_H

View File

@ -48,10 +48,41 @@ RADETransmitStep::RADETransmitStep(struct rade* dv, LPCNetEncState* encState)
featuresFile_ = fopen((const char*)utTxFeatureFile.ToUTF8(), "wb"); featuresFile_ = fopen((const char*)utTxFeatureFile.ToUTF8(), "wb");
assert(featuresFile_ != nullptr); assert(featuresFile_ != nullptr);
} }
// Pre-allocate buffers so we don't have to do so during real-time operation.
auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate());
outputSamples_ = std::shared_ptr<short>(
new short[maxSamples],
std::default_delete<short[]>());
assert(outputSamples_ != nullptr);
int numOutputSamples = rade_n_tx_out(dv_);
radeOut_ = new RADE_COMP[numOutputSamples];
assert(radeOut_ != nullptr);
radeOutShort_ = new short[numOutputSamples];
assert(radeOutShort_ != nullptr);
const int NUM_SAMPLES_SILENCE = 60 * getOutputSampleRate() / 1000;
int numEOOSamples = rade_n_tx_eoo_out(dv_);
eooOut_ = new RADE_COMP[numEOOSamples];
assert(eooOut_ != nullptr);
eooOutShort_ = new short[numEOOSamples + NUM_SAMPLES_SILENCE];
assert(eooOutShort_ != nullptr);
featureList_.reserve(rade_n_features_in_out(dv_));
} }
RADETransmitStep::~RADETransmitStep() RADETransmitStep::~RADETransmitStep()
{ {
delete[] radeOut_;
delete[] radeOutShort_;
delete[] eooOut_;
delete[] eooOutShort_;
if (featuresFile_ != nullptr) if (featuresFile_ != nullptr)
{ {
fclose(featuresFile_); fclose(featuresFile_);
@ -80,7 +111,6 @@ int RADETransmitStep::getOutputSampleRate() const
std::shared_ptr<short> RADETransmitStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples) std::shared_ptr<short> RADETransmitStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples)
{ {
short* outputSamples = nullptr;
*numOutputSamples = 0; *numOutputSamples = 0;
if (numInputSamples == 0) if (numInputSamples == 0)
@ -89,15 +119,12 @@ std::shared_ptr<short> RADETransmitStep::execute(std::shared_ptr<short> inputSam
*numOutputSamples = std::min(codec2_fifo_used(outputSampleFifo_), (int)(RADE_MODEM_SAMPLE_RATE * .02)); *numOutputSamples = std::min(codec2_fifo_used(outputSampleFifo_), (int)(RADE_MODEM_SAMPLE_RATE * .02));
if (*numOutputSamples > 0) if (*numOutputSamples > 0)
{ {
outputSamples = new short[*numOutputSamples]; codec2_fifo_read(outputSampleFifo_, outputSamples_.get(), *numOutputSamples);
assert(outputSamples != nullptr);
codec2_fifo_read(outputSampleFifo_, outputSamples, *numOutputSamples);
log_info("Returning %d EOO samples (remaining in FIFO: %d)", *numOutputSamples, codec2_fifo_used(outputSampleFifo_)); log_info("Returning %d EOO samples (remaining in FIFO: %d)", *numOutputSamples, codec2_fifo_used(outputSampleFifo_));
} }
return std::shared_ptr<short>(outputSamples, std::default_delete<short[]>());; return outputSamples_;
} }
short* inputPtr = inputSamples.get(); short* inputPtr = inputSamples.get();
@ -113,12 +140,6 @@ std::shared_ptr<short> RADETransmitStep::execute(std::shared_ptr<short> inputSam
short pcm[LPCNET_FRAME_SIZE]; short pcm[LPCNET_FRAME_SIZE];
float features[NB_TOTAL_FEATURES]; float features[NB_TOTAL_FEATURES];
RADE_COMP* radeOut = new RADE_COMP[numOutputSamples];
assert(radeOut != nullptr);
short* radeOutShort = new short[numOutputSamples];
assert(radeOutShort != nullptr);
int arch = opus_select_arch(); int arch = opus_select_arch();
// Feature extraction // Feature extraction
@ -138,7 +159,7 @@ std::shared_ptr<short> RADETransmitStep::execute(std::shared_ptr<short> inputSam
// RADE TX handling // RADE TX handling
while (featureList_.size() >= numRequiredFeaturesForRADE) while (featureList_.size() >= numRequiredFeaturesForRADE)
{ {
rade_tx(dv_, radeOut, &featureList_[0]); rade_tx(dv_, radeOut_, &featureList_[0]);
for (unsigned int index = 0; index < numRequiredFeaturesForRADE; index++) for (unsigned int index = 0; index < numRequiredFeaturesForRADE; index++)
{ {
featureList_.erase(featureList_.begin()); featureList_.erase(featureList_.begin());
@ -146,26 +167,20 @@ std::shared_ptr<short> RADETransmitStep::execute(std::shared_ptr<short> inputSam
for (int index = 0; index < numOutputSamples; index++) for (int index = 0; index < numOutputSamples; index++)
{ {
// We only need the real component for TX. // We only need the real component for TX.
radeOutShort[index] = radeOut[index].real * RADE_SCALING_FACTOR; radeOutShort_[index] = radeOut_[index].real * RADE_SCALING_FACTOR;
} }
codec2_fifo_write(outputSampleFifo_, radeOutShort, numOutputSamples); codec2_fifo_write(outputSampleFifo_, radeOutShort_, numOutputSamples);
} }
delete[] radeOutShort;
delete[] radeOut;
} }
} }
*numOutputSamples = codec2_fifo_used(outputSampleFifo_); *numOutputSamples = codec2_fifo_used(outputSampleFifo_);
if (*numOutputSamples > 0) if (*numOutputSamples > 0)
{ {
outputSamples = new short[*numOutputSamples]; codec2_fifo_read(outputSampleFifo_, outputSamples_.get(), *numOutputSamples);
assert(outputSamples != nullptr);
codec2_fifo_read(outputSampleFifo_, outputSamples, *numOutputSamples);
} }
return std::shared_ptr<short>(outputSamples, std::default_delete<short[]>()); return outputSamples_;
} }
void RADETransmitStep::restartVocoder() void RADETransmitStep::restartVocoder()
@ -174,26 +189,17 @@ void RADETransmitStep::restartVocoder()
const int NUM_SAMPLES_SILENCE = 60 * getOutputSampleRate() / 1000; const int NUM_SAMPLES_SILENCE = 60 * getOutputSampleRate() / 1000;
int numEOOSamples = rade_n_tx_eoo_out(dv_); int numEOOSamples = rade_n_tx_eoo_out(dv_);
RADE_COMP* eooOut = new RADE_COMP[numEOOSamples]; rade_tx_eoo(dv_, eooOut_);
assert(eooOut != nullptr);
short* eooOutShort = new short[numEOOSamples + NUM_SAMPLES_SILENCE]; memset(eooOutShort_, 0, sizeof(short) * (numEOOSamples + NUM_SAMPLES_SILENCE));
assert(eooOutShort != nullptr);
rade_tx_eoo(dv_, eooOut);
memset(eooOutShort, 0, sizeof(short) * (numEOOSamples + NUM_SAMPLES_SILENCE));
for (int index = 0; index < numEOOSamples; index++) for (int index = 0; index < numEOOSamples; index++)
{ {
eooOutShort[index] = eooOut[index].real * RADE_SCALING_FACTOR; eooOutShort_[index] = eooOut_[index].real * RADE_SCALING_FACTOR;
} }
log_info("Queueing %d EOO samples to output FIFO", numEOOSamples + NUM_SAMPLES_SILENCE); log_info("Queueing %d EOO samples to output FIFO", numEOOSamples + NUM_SAMPLES_SILENCE);
if (codec2_fifo_write(outputSampleFifo_, eooOutShort, numEOOSamples + NUM_SAMPLES_SILENCE) != 0) if (codec2_fifo_write(outputSampleFifo_, eooOutShort_, numEOOSamples + NUM_SAMPLES_SILENCE) != 0)
{ {
log_warn("Could not queue EOO samples (remaining space in FIFO = %d)", codec2_fifo_free(outputSampleFifo_)); log_warn("Could not queue EOO samples (remaining space in FIFO = %d)", codec2_fifo_free(outputSampleFifo_));
} }
delete[] eooOutShort;
delete[] eooOut;
} }

View File

@ -51,6 +51,12 @@ private:
std::vector<float> featureList_; std::vector<float> featureList_;
FILE* featuresFile_; FILE* featuresFile_;
std::shared_ptr<short> outputSamples_;
RADE_COMP* radeOut_;
short* radeOutShort_;
RADE_COMP* eooOut_;
short* eooOutShort_;
}; };
#endif // AUDIO_PIPELINE__RADE_TRANSMIT_STEP_H #endif // AUDIO_PIPELINE__RADE_TRANSMIT_STEP_H

View File

@ -36,24 +36,20 @@ static int resample_step(SRC_STATE *src,
int output_sample_rate, int output_sample_rate,
int input_sample_rate, int input_sample_rate,
int length_output_short, // maximum output array length in samples int length_output_short, // maximum output array length in samples
int length_input_short int length_input_short,
float *tmpInput,
float *tmpOutput
) )
{ {
SRC_DATA src_data; SRC_DATA src_data;
float* input = new float[length_input_short];
assert(input != nullptr);
float* output = new float[length_output_short];
assert(output != nullptr);
int ret; int ret;
assert(src != NULL); assert(src != NULL);
src_short_to_float_array(input_short, input, length_input_short); src_short_to_float_array(input_short, tmpInput, length_input_short);
src_data.data_in = input; src_data.data_in = tmpInput;
src_data.data_out = output; src_data.data_out = tmpOutput;
src_data.input_frames = length_input_short; src_data.input_frames = length_input_short;
src_data.output_frames = length_output_short; src_data.output_frames = length_output_short;
src_data.end_of_input = 0; src_data.end_of_input = 0;
@ -67,10 +63,7 @@ static int resample_step(SRC_STATE *src,
assert(ret == 0); assert(ret == 0);
assert(src_data.output_frames_gen <= length_output_short); assert(src_data.output_frames_gen <= length_output_short);
src_float_to_short_array(output, output_short, src_data.output_frames_gen); src_float_to_short_array(tmpOutput, output_short, src_data.output_frames_gen);
delete[] input;
delete[] output;
return src_data.output_frames_gen; return src_data.output_frames_gen;
} }
@ -82,11 +75,27 @@ ResampleStep::ResampleStep(int inputSampleRate, int outputSampleRate)
int src_error; int src_error;
resampleState_ = src_new(SRC_SINC_MEDIUM_QUALITY, 1, &src_error); resampleState_ = src_new(SRC_SINC_MEDIUM_QUALITY, 1, &src_error);
assert(resampleState_ != nullptr); assert(resampleState_ != nullptr);
// Pre-allocate buffers so we don't have to do so during real-time operation.
auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate());
outputSamples_ = std::shared_ptr<short>(
new short[maxSamples],
std::default_delete<short[]>());
assert(outputSamples_ != nullptr);
tempInput_ = new float[std::max(inputSampleRate, outputSampleRate)];
assert(tempInput_ != nullptr);
tempOutput_ = new float[std::max(inputSampleRate, outputSampleRate)];
assert(tempOutput_ != nullptr);
} }
ResampleStep::~ResampleStep() ResampleStep::~ResampleStep()
{ {
src_delete(resampleState_); src_delete(resampleState_);
delete[] tempInput_;
delete[] tempOutput_;
} }
int ResampleStep::getInputSampleRate() const int ResampleStep::getInputSampleRate() const
@ -101,24 +110,20 @@ int ResampleStep::getOutputSampleRate() const
std::shared_ptr<short> ResampleStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples) std::shared_ptr<short> ResampleStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples)
{ {
short* outputSamples = nullptr;
if (numInputSamples > 0) if (numInputSamples > 0)
{ {
double scaleFactor = ((double)outputSampleRate_)/((double)inputSampleRate_); double scaleFactor = ((double)outputSampleRate_)/((double)inputSampleRate_);
int outputArraySize = std::max(numInputSamples, (int)(scaleFactor*numInputSamples)); int outputArraySize = std::max(numInputSamples, (int)(scaleFactor*numInputSamples));
assert(outputArraySize > 0); assert(outputArraySize > 0);
outputSamples = new short[outputArraySize];
assert(outputSamples != nullptr);
*numOutputSamples = resample_step( *numOutputSamples = resample_step(
resampleState_, outputSamples, inputSamples.get(), outputSampleRate_, resampleState_, outputSamples_.get(), inputSamples.get(), outputSampleRate_,
inputSampleRate_, outputArraySize, numInputSamples); inputSampleRate_, outputArraySize, numInputSamples, tempInput_, tempOutput_);
} }
else else
{ {
*numOutputSamples = 0; *numOutputSamples = 0;
} }
return std::shared_ptr<short>(outputSamples, std::default_delete<short[]>()); return outputSamples_;
} }

View File

@ -41,6 +41,10 @@ private:
int inputSampleRate_; int inputSampleRate_;
int outputSampleRate_; int outputSampleRate_;
SRC_STATE* resampleState_; SRC_STATE* resampleState_;
float* tempInput_;
float* tempOutput_;
std::shared_ptr<short> outputSamples_;
}; };
#endif // AUDIO_PIPELINE__RESAMPLE_STEP_H #endif // AUDIO_PIPELINE__RESAMPLE_STEP_H

View File

@ -41,6 +41,13 @@ SpeexStep::SpeexStep(int sampleRate)
// Set FIFO to be 2x the number of samples per run so we don't lose anything. // Set FIFO to be 2x the number of samples per run so we don't lose anything.
inputSampleFifo_ = codec2_fifo_create(numSamplesPerSpeexRun_ * 2); inputSampleFifo_ = codec2_fifo_create(numSamplesPerSpeexRun_ * 2);
assert(inputSampleFifo_ != nullptr); assert(inputSampleFifo_ != nullptr);
// Pre-allocate buffers so we don't have to do so during real-time operation.
auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate());
outputSamples_ = std::shared_ptr<short>(
new short[maxSamples],
std::default_delete<short[]>());
assert(outputSamples_ != nullptr);
} }
SpeexStep::~SpeexStep() SpeexStep::~SpeexStep()
@ -61,17 +68,14 @@ int SpeexStep::getOutputSampleRate() const
std::shared_ptr<short> SpeexStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples) std::shared_ptr<short> SpeexStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples)
{ {
short* outputSamples = nullptr;
*numOutputSamples = 0; *numOutputSamples = 0;
int numSpeexRuns = (codec2_fifo_used(inputSampleFifo_) + numInputSamples) / numSamplesPerSpeexRun_; int numSpeexRuns = (codec2_fifo_used(inputSampleFifo_) + numInputSamples) / numSamplesPerSpeexRun_;
if (numSpeexRuns > 0) if (numSpeexRuns > 0)
{ {
*numOutputSamples = numSpeexRuns * numSamplesPerSpeexRun_; *numOutputSamples = numSpeexRuns * numSamplesPerSpeexRun_;
outputSamples = new short[*numOutputSamples];
assert(outputSamples != nullptr);
short* tmpOutput = outputSamples; short* tmpOutput = outputSamples_.get();
short* tmpInput = inputSamples.get(); short* tmpInput = inputSamples.get();
while (numInputSamples > 0 && tmpInput != nullptr) while (numInputSamples > 0 && tmpInput != nullptr)
@ -92,5 +96,5 @@ std::shared_ptr<short> SpeexStep::execute(std::shared_ptr<short> inputSamples, i
codec2_fifo_write(inputSampleFifo_, inputSamples.get(), numInputSamples); codec2_fifo_write(inputSampleFifo_, inputSamples.get(), numInputSamples);
} }
return std::shared_ptr<short>(outputSamples, std::default_delete<short[]>()); return outputSamples_;
} }

View File

@ -50,6 +50,8 @@ private:
SpeexPreprocessState* speexStateObj_; SpeexPreprocessState* speexStateObj_;
int numSamplesPerSpeexRun_; int numSamplesPerSpeexRun_;
struct FIFO* inputSampleFifo_; struct FIFO* inputSampleFifo_;
std::shared_ptr<short> outputSamples_;
}; };

View File

@ -24,9 +24,10 @@
#include <assert.h> #include <assert.h>
TapStep::TapStep(int sampleRate, IPipelineStep* tapStep) TapStep::TapStep(int sampleRate, IPipelineStep* tapStep, bool operateBackground)
: tapStep_(tapStep) : tapStep_(tapStep)
, sampleRate_(sampleRate) , sampleRate_(sampleRate)
, operateBackground_(operateBackground)
{ {
// empty // empty
} }
@ -48,9 +49,23 @@ int TapStep::getOutputSampleRate() const
std::shared_ptr<short> TapStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples) std::shared_ptr<short> TapStep::execute(std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples)
{ {
int temp = 0;
assert(tapStep_->getInputSampleRate() == sampleRate_); assert(tapStep_->getInputSampleRate() == sampleRate_);
if (operateBackground_)
{
// 5 millisecond timeout to queue to background thread.
// Since this is likely for the UI, it's fine if the step
// doesn't execute.
enqueue_([&, inputSamples, numInputSamples]() {
int temp = 0;
tapStep_->execute(inputSamples, numInputSamples, &temp); tapStep_->execute(inputSamples, numInputSamples, &temp);
}, 5);
}
else
{
int temp = 0;
tapStep_->execute(inputSamples, numInputSamples, &temp);
}
*numOutputSamples = numInputSamples; *numOutputSamples = numInputSamples;
return inputSamples; return inputSamples;

View File

@ -26,11 +26,12 @@
#include <memory> #include <memory>
#include "IPipelineStep.h" #include "IPipelineStep.h"
#include "util/ThreadedObject.h"
class TapStep : public IPipelineStep class TapStep : public IPipelineStep, public ThreadedObject
{ {
public: public:
TapStep(int inputSampleRate, IPipelineStep* tapStep); TapStep(int inputSampleRate, IPipelineStep* tapStep, bool operateBackground);
virtual ~TapStep(); virtual ~TapStep();
virtual int getInputSampleRate() const; virtual int getInputSampleRate() const;
@ -40,6 +41,7 @@ public:
private: private:
std::shared_ptr<IPipelineStep> tapStep_; std::shared_ptr<IPipelineStep> tapStep_;
int sampleRate_; int sampleRate_;
bool operateBackground_;
}; };
#endif // AUDIO_PIPELINE__TAP_STEP_H #endif // AUDIO_PIPELINE__TAP_STEP_H

View File

@ -38,7 +38,12 @@ ToneInterfererStep::ToneInterfererStep(
, toneAmplitudeFn_(toneAmplitudeFn) , toneAmplitudeFn_(toneAmplitudeFn)
, tonePhaseFn_(tonePhaseFn) , tonePhaseFn_(tonePhaseFn)
{ {
// empty // Pre-allocate buffers so we don't have to do so during real-time operation.
auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate());
outputSamples_ = std::shared_ptr<short>(
new short[maxSamples],
std::default_delete<short[]>());
assert(outputSamples_ != nullptr);
} }
ToneInterfererStep::~ToneInterfererStep() ToneInterfererStep::~ToneInterfererStep()
@ -59,11 +64,9 @@ int ToneInterfererStep::getOutputSampleRate() const
std::shared_ptr<short> ToneInterfererStep::execute( std::shared_ptr<short> ToneInterfererStep::execute(
std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples) std::shared_ptr<short> inputSamples, int numInputSamples, int* numOutputSamples)
{ {
short* outputSamples = new short[numInputSamples];
assert(outputSamples != nullptr);
*numOutputSamples = numInputSamples; *numOutputSamples = numInputSamples;
memcpy(outputSamples, inputSamples.get(), numInputSamples * sizeof(short)); memcpy(outputSamples_.get(), inputSamples.get(), numInputSamples * sizeof(short));
auto toneFrequency = toneFrequencyFn_(); auto toneFrequency = toneFrequencyFn_();
auto toneAmplitude = toneAmplitudeFn_(); auto toneAmplitude = toneAmplitudeFn_();
@ -72,10 +75,10 @@ std::shared_ptr<short> ToneInterfererStep::execute(
float w = 2.0 * M_PI * toneFrequency / sampleRate_; float w = 2.0 * M_PI * toneFrequency / sampleRate_;
for(int i = 0; i < numInputSamples; i++) { for(int i = 0; i < numInputSamples; i++) {
float s = (float)toneAmplitude * cos(*tonePhase); float s = (float)toneAmplitude * cos(*tonePhase);
outputSamples[i] += (int)s; outputSamples_.get()[i] += (int)s;
*tonePhase += w; *tonePhase += w;
} }
*tonePhase -= 2.0 * M_PI * floor(*tonePhase / (2.0 * M_PI)); *tonePhase -= 2.0 * M_PI * floor(*tonePhase / (2.0 * M_PI));
return std::shared_ptr<short>(outputSamples, std::default_delete<short[]>()); return outputSamples_;
} }

View File

@ -45,6 +45,7 @@ private:
std::function<float()> toneFrequencyFn_; std::function<float()> toneFrequencyFn_;
std::function<float()> toneAmplitudeFn_; std::function<float()> toneAmplitudeFn_;
std::function<float*()> tonePhaseFn_; std::function<float*()> tonePhaseFn_;
std::shared_ptr<short> outputSamples_;
}; };
#endif // AUDIO_PIPELINE__TONE_INTERFERER_STEP_H #endif // AUDIO_PIPELINE__TONE_INTERFERER_STEP_H

View File

@ -48,6 +48,7 @@ using namespace std::chrono_literals;
#include "LinkStep.h" #include "LinkStep.h"
#include "util/logging/ulog.h" #include "util/logging/ulog.h"
#include "os/os_interface.h"
#include <wx/stopwatch.h> #include <wx/stopwatch.h>
@ -125,7 +126,8 @@ void TxRxThread::initializePipeline_()
{ {
// Function definitions shared across both pipelines. // Function definitions shared across both pipelines.
auto callbackLockFn = []() { auto callbackLockFn = []() {
g_mutexProtectingCallbackData.Lock(); // Prevent priority inversions by bounding the time we can wait for a lock.
g_mutexProtectingCallbackData.LockTimeout(5);
}; };
auto callbackUnlockFn = []() { auto callbackUnlockFn = []() {
@ -148,7 +150,7 @@ void TxRxThread::initializePipeline_()
auto recordMicPipeline = new AudioPipeline(inputSampleRate_, inputSampleRate_); auto recordMicPipeline = new AudioPipeline(inputSampleRate_, inputSampleRate_);
recordMicPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(recordMicStep)); recordMicPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(recordMicStep));
auto recordMicTap = new TapStep(inputSampleRate_, recordMicPipeline); auto recordMicTap = new TapStep(inputSampleRate_, recordMicPipeline, false);
auto bypassRecordMic = new AudioPipeline(inputSampleRate_, inputSampleRate_); auto bypassRecordMic = new AudioPipeline(inputSampleRate_, inputSampleRate_);
auto eitherOrRecordMic = new EitherOrStep( auto eitherOrRecordMic = new EitherOrStep(
@ -215,7 +217,7 @@ void TxRxThread::initializePipeline_()
auto micAudioPipeline = new AudioPipeline(inputSampleRate_, equalizedMicAudioLink_->getSampleRate()); auto micAudioPipeline = new AudioPipeline(inputSampleRate_, equalizedMicAudioLink_->getSampleRate());
micAudioPipeline->appendPipelineStep(equalizedMicAudioLink_->getInputPipelineStep()); micAudioPipeline->appendPipelineStep(equalizedMicAudioLink_->getInputPipelineStep());
auto micAudioTap = std::make_shared<TapStep>(inputSampleRate_, micAudioPipeline); auto micAudioTap = std::make_shared<TapStep>(inputSampleRate_, micAudioPipeline, false);
pipeline_->appendPipelineStep(micAudioTap); pipeline_->appendPipelineStep(micAudioTap);
} }
@ -224,7 +226,7 @@ void TxRxThread::initializePipeline_()
auto resampleForPlotPipeline = new AudioPipeline(inputSampleRate_, resampleForPlotStep->getOutputSampleRate()); auto resampleForPlotPipeline = new AudioPipeline(inputSampleRate_, resampleForPlotStep->getOutputSampleRate());
resampleForPlotPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(resampleForPlotStep)); resampleForPlotPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(resampleForPlotStep));
auto resampleForPlotTap = new TapStep(inputSampleRate_, resampleForPlotPipeline); auto resampleForPlotTap = new TapStep(inputSampleRate_, resampleForPlotPipeline, true);
pipeline_->appendPipelineStep(std::shared_ptr<IPipelineStep>(resampleForPlotTap)); pipeline_->appendPipelineStep(std::shared_ptr<IPipelineStep>(resampleForPlotTap));
// FreeDV TX step (analog leg) // FreeDV TX step (analog leg)
@ -252,7 +254,7 @@ void TxRxThread::initializePipeline_()
auto recordModulatedPipeline = new AudioPipeline(outputSampleRate_, recordModulatedStep->getOutputSampleRate()); auto recordModulatedPipeline = new AudioPipeline(outputSampleRate_, recordModulatedStep->getOutputSampleRate());
recordModulatedPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(recordModulatedStep)); recordModulatedPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(recordModulatedStep));
auto recordModulatedTap = new TapStep(outputSampleRate_, recordModulatedPipeline); auto recordModulatedTap = new TapStep(outputSampleRate_, recordModulatedPipeline, false);
auto recordModulatedTapPipeline = new AudioPipeline(outputSampleRate_, outputSampleRate_); auto recordModulatedTapPipeline = new AudioPipeline(outputSampleRate_, outputSampleRate_);
recordModulatedTapPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(recordModulatedTap)); recordModulatedTapPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(recordModulatedTap));
@ -293,7 +295,7 @@ void TxRxThread::initializePipeline_()
auto recordRadioPipeline = new AudioPipeline(inputSampleRate_, inputSampleRate_); auto recordRadioPipeline = new AudioPipeline(inputSampleRate_, inputSampleRate_);
recordRadioPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(recordRadioStep)); recordRadioPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(recordRadioStep));
auto recordRadioTap = new TapStep(inputSampleRate_, recordRadioPipeline); auto recordRadioTap = new TapStep(inputSampleRate_, recordRadioPipeline, false);
auto bypassRecordRadio = new AudioPipeline(inputSampleRate_, inputSampleRate_); auto bypassRecordRadio = new AudioPipeline(inputSampleRate_, inputSampleRate_);
auto eitherOrRecordRadio = new EitherOrStep( auto eitherOrRecordRadio = new EitherOrStep(
@ -324,7 +326,8 @@ void TxRxThread::initializePipeline_()
auto eitherOrPlayRadioStep = new EitherOrStep( auto eitherOrPlayRadioStep = new EitherOrStep(
[]() { []() {
g_mutexProtectingCallbackData.Lock(); // Prevent priority inversions by bounding the time we can wait for a lock.
g_mutexProtectingCallbackData.LockTimeout(5);
auto result = g_playFileFromRadio && (g_sfPlayFileFromRadio != NULL); auto result = g_playFileFromRadio && (g_sfPlayFileFromRadio != NULL);
g_mutexProtectingCallbackData.Unlock(); g_mutexProtectingCallbackData.Unlock();
return result; return result;
@ -339,7 +342,7 @@ void TxRxThread::initializePipeline_()
auto resampleForPlotPipeline = new AudioPipeline(inputSampleRate_, resampleForPlotStep->getOutputSampleRate()); auto resampleForPlotPipeline = new AudioPipeline(inputSampleRate_, resampleForPlotStep->getOutputSampleRate());
resampleForPlotPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(resampleForPlotStep)); resampleForPlotPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(resampleForPlotStep));
auto resampleForPlotTap = new TapStep(inputSampleRate_, resampleForPlotPipeline); auto resampleForPlotTap = new TapStep(inputSampleRate_, resampleForPlotPipeline, true);
pipeline_->appendPipelineStep(std::shared_ptr<IPipelineStep>(resampleForPlotTap)); pipeline_->appendPipelineStep(std::shared_ptr<IPipelineStep>(resampleForPlotTap));
// Tone interferer step (optional) // Tone interferer step (optional)
@ -366,7 +369,7 @@ void TxRxThread::initializePipeline_()
inputSampleRate_, computeRfSpectrumStep->getOutputSampleRate()); inputSampleRate_, computeRfSpectrumStep->getOutputSampleRate());
computeRfSpectrumPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(computeRfSpectrumStep)); computeRfSpectrumPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(computeRfSpectrumStep));
auto computeRfSpectrumTap = new TapStep(inputSampleRate_, computeRfSpectrumPipeline); auto computeRfSpectrumTap = new TapStep(inputSampleRate_, computeRfSpectrumPipeline, true);
pipeline_->appendPipelineStep(std::shared_ptr<IPipelineStep>(computeRfSpectrumTap)); pipeline_->appendPipelineStep(std::shared_ptr<IPipelineStep>(computeRfSpectrumTap));
// RX demodulation step // RX demodulation step
@ -378,7 +381,8 @@ void TxRxThread::initializePipeline_()
[]() { return g_channel_noise; }, []() { return g_channel_noise; },
[]() { return wxGetApp().appConfiguration.noiseSNR; }, []() { return wxGetApp().appConfiguration.noiseSNR; },
[]() { return g_RxFreqOffsetHz; }, []() { return g_RxFreqOffsetHz; },
[]() { return &g_sig_pwr_av; } []() { return &g_sig_pwr_av; },
helper_
); );
rfDemodulationPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(rfDemodulationStep)); rfDemodulationPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(rfDemodulationStep));
@ -455,7 +459,7 @@ void TxRxThread::initializePipeline_()
auto resampleForPlotOutPipeline = new AudioPipeline(outputSampleRate_, resampleForPlotOutStep->getOutputSampleRate()); auto resampleForPlotOutPipeline = new AudioPipeline(outputSampleRate_, resampleForPlotOutStep->getOutputSampleRate());
resampleForPlotOutPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(resampleForPlotOutStep)); resampleForPlotOutPipeline->appendPipelineStep(std::shared_ptr<IPipelineStep>(resampleForPlotOutStep));
auto resampleForPlotOutTap = new TapStep(outputSampleRate_, resampleForPlotOutPipeline); auto resampleForPlotOutTap = new TapStep(outputSampleRate_, resampleForPlotOutPipeline, true);
pipeline_->appendPipelineStep(std::shared_ptr<IPipelineStep>(resampleForPlotOutTap)); pipeline_->appendPipelineStep(std::shared_ptr<IPipelineStep>(resampleForPlotOutTap));
// Clear anything in the FIFO before resuming decode. // Clear anything in the FIFO before resuming decode.
@ -467,6 +471,9 @@ void* TxRxThread::Entry()
{ {
initializePipeline_(); initializePipeline_();
// Request real-time scheduling from the operating system.
helper_->setHelperRealTime();
while (m_run) while (m_run)
{ {
#if defined(__linux__) #if defined(__linux__)
@ -486,18 +493,23 @@ void* TxRxThread::Entry()
} }
#endif #endif
auto currentTime = std::chrono::steady_clock::now();
if (!m_run) break; if (!m_run) break;
//log_info("thread woken up: m_tx=%d", (int)m_tx);
helper_->startRealTimeWork();
if (m_tx) txProcessing_(); if (m_tx) txProcessing_();
else rxProcessing_(); else rxProcessing_();
std::this_thread::sleep_until(currentTime + 10ms); helper_->stopRealTimeWork();
} }
// Force pipeline to delete itself when we're done with the thread. // Force pipeline to delete itself when we're done with the thread.
pipeline_ = nullptr; pipeline_ = nullptr;
// Return to normal scheduling
helper_->clearHelperRealTime();
return NULL; return NULL;
} }
@ -534,19 +546,13 @@ void TxRxThread::clearFifos_()
auto used = codec2_fifo_used(cbData->outfifo1); auto used = codec2_fifo_used(cbData->outfifo1);
if (used > 0) if (used > 0)
{ {
short* temp = new short[used]; codec2_fifo_read(cbData->outfifo1, inputSamples_.get(), used);
assert(temp != nullptr);
codec2_fifo_read(cbData->outfifo1, temp, used);
delete[] temp;
} }
used = codec2_fifo_used(cbData->infifo2); used = codec2_fifo_used(cbData->infifo2);
if (used > 0) if (used > 0)
{ {
short* temp = new short[used]; codec2_fifo_read(cbData->infifo2, inputSamples_.get(), used);
assert(temp != nullptr);
codec2_fifo_read(cbData->infifo2, temp, used);
delete[] temp;
} }
} }
else else
@ -554,20 +560,14 @@ void TxRxThread::clearFifos_()
auto used = codec2_fifo_used(cbData->infifo1); auto used = codec2_fifo_used(cbData->infifo1);
if (used > 0) if (used > 0)
{ {
short* temp = new short[used]; codec2_fifo_read(cbData->infifo1, inputSamples_.get(), used);
assert(temp != nullptr);
codec2_fifo_read(cbData->infifo1, temp, used);
delete[] temp;
} }
auto outFifo = (g_nSoundCards == 1) ? cbData->outfifo1 : cbData->outfifo2; auto outFifo = (g_nSoundCards == 1) ? cbData->outfifo1 : cbData->outfifo2;
used = codec2_fifo_used(outFifo); used = codec2_fifo_used(outFifo);
if (used > 0) if (used > 0)
{ {
short* temp = new short[used]; codec2_fifo_read(outFifo, inputSamples_.get(), used);
assert(temp != nullptr);
codec2_fifo_read(outFifo, temp, used);
delete[] temp;
} }
} }
} }
@ -621,9 +621,6 @@ void TxRxThread::txProcessing_()
int nsam_in_48 = freedvInterface.getTxNumSpeechSamples() * ((float)inputSampleRate_ / (float)freedvInterface.getTxSpeechSampleRate()); int nsam_in_48 = freedvInterface.getTxNumSpeechSamples() * ((float)inputSampleRate_ / (float)freedvInterface.getTxSpeechSampleRate());
assert(nsam_in_48 > 0); assert(nsam_in_48 > 0);
short* insound_card = new short[nsam_in_48];
assert(insound_card != nullptr);
int nout; int nout;
@ -638,11 +635,11 @@ void TxRxThread::txProcessing_()
// again in the decoded audio at the other end. // again in the decoded audio at the other end.
// zero speech input just in case infifo2 underflows // zero speech input just in case infifo2 underflows
memset(insound_card, 0, nsam_in_48*sizeof(short)); memset(inputSamples_.get(), 0, nsam_in_48*sizeof(short));
// There may be recorded audio left to encode while ending TX. To handle this, // There may be recorded audio left to encode while ending TX. To handle this,
// we keep reading from the FIFO until we have less than nsam_in_48 samples available. // we keep reading from the FIFO until we have less than nsam_in_48 samples available.
int nread = codec2_fifo_read(cbData->infifo2, insound_card, nsam_in_48); int nread = codec2_fifo_read(cbData->infifo2, inputSamples_.get(), nsam_in_48);
if (nread != 0 && endingTx) if (nread != 0 && endingTx)
{ {
if (freedvInterface.getCurrentMode() >= FREEDV_MODE_RADE) if (freedvInterface.getCurrentMode() >= FREEDV_MODE_RADE)
@ -656,9 +653,7 @@ void TxRxThread::txProcessing_()
hasEooBeenSent_ = true; hasEooBeenSent_ = true;
} }
short* inputSamples = new short[1]; auto outputSamples = pipeline_->execute(inputSamples_, 0, &nout);
auto inputSamplesPtr = std::shared_ptr<short>(inputSamples, std::default_delete<short[]>());
auto outputSamples = pipeline_->execute(inputSamplesPtr, 0, &nout);
if (nout > 0 && outputSamples.get() != nullptr) if (nout > 0 && outputSamples.get() != nullptr)
{ {
log_debug("Injecting %d samples of resampled EOO into TX stream", nout); log_debug("Injecting %d samples of resampled EOO into TX stream", nout);
@ -680,11 +675,7 @@ void TxRxThread::txProcessing_()
hasEooBeenSent_ = false; hasEooBeenSent_ = false;
} }
short* inputSamples = new short[nsam_in_48]; auto outputSamples = pipeline_->execute(inputSamples_, nsam_in_48, &nout);
memcpy(inputSamples, insound_card, nsam_in_48 * sizeof(short));
auto inputSamplesPtr = std::shared_ptr<short>(inputSamples, std::default_delete<short[]>());
auto outputSamples = pipeline_->execute(inputSamplesPtr, nsam_in_48, &nout);
if (g_dump_fifo_state) { if (g_dump_fifo_state) {
log_info(" nout: %d", nout); log_info(" nout: %d", nout);
@ -696,7 +687,6 @@ void TxRxThread::txProcessing_()
} }
} }
delete[] insound_card;
txModeChangeMutex.Unlock(); txModeChangeMutex.Unlock();
} }
else else
@ -741,9 +731,6 @@ void TxRxThread::rxProcessing_()
int nsam = (int)(inputSampleRate_ * FRAME_DURATION); int nsam = (int)(inputSampleRate_ * FRAME_DURATION);
assert(nsam > 0); assert(nsam > 0);
short* insound_card = new short[nsam];
assert(insound_card != nullptr);
int nout; int nout;
@ -757,16 +744,12 @@ void TxRxThread::rxProcessing_()
} }
// while we have enough input samples available ... // while we have enough input samples available ...
while (codec2_fifo_read(cbData->infifo1, insound_card, nsam) == 0 && processInputFifo) { while (codec2_fifo_read(cbData->infifo1, inputSamples_.get(), nsam) == 0 && processInputFifo) {
// send latest squelch level to FreeDV API, as it handles squelch internally // send latest squelch level to FreeDV API, as it handles squelch internally
freedvInterface.setSquelch(g_SquelchActive, g_SquelchLevel); freedvInterface.setSquelch(g_SquelchActive, g_SquelchLevel);
short* inputSamples = new short[nsam]; auto outputSamples = pipeline_->execute(inputSamples_, nsam, &nout);
memcpy(inputSamples, insound_card, nsam * sizeof(short));
auto inputSamplesPtr = std::shared_ptr<short>(inputSamples, std::default_delete<short[]>());
auto outputSamples = pipeline_->execute(inputSamplesPtr, nsam, &nout);
auto outFifo = (g_nSoundCards == 1) ? cbData->outfifo1 : cbData->outfifo2; auto outFifo = (g_nSoundCards == 1) ? cbData->outfifo1 : cbData->outfifo2;
if (nout > 0 && outputSamples.get() != nullptr) if (nout > 0 && outputSamples.get() != nullptr)
@ -779,6 +762,4 @@ void TxRxThread::rxProcessing_()
(g_tx && wxGetApp().appConfiguration.monitorTxAudio) || (g_tx && wxGetApp().appConfiguration.monitorTxAudio) ||
(!g_voice_keyer_tx && ((g_half_duplex && !g_tx) || !g_half_duplex)); (!g_voice_keyer_tx && ((g_half_duplex && !g_tx) || !g_half_duplex));
} }
delete[] insound_card;
} }

View File

@ -29,6 +29,7 @@
#include <condition_variable> #include <condition_variable>
#include "AudioPipeline.h" #include "AudioPipeline.h"
#include "util/IRealtimeHelper.h"
// Forward declarations // Forward declarations
class LinkStep; class LinkStep;
@ -39,7 +40,7 @@ class LinkStep;
class TxRxThread : public wxThread class TxRxThread : public wxThread
{ {
public: public:
TxRxThread(bool tx, int inputSampleRate, int outputSampleRate, LinkStep* micAudioLink) TxRxThread(bool tx, int inputSampleRate, int outputSampleRate, LinkStep* micAudioLink, std::shared_ptr<IRealtimeHelper> helper)
: wxThread(wxTHREAD_JOINABLE) : wxThread(wxTHREAD_JOINABLE)
, m_tx(tx) , m_tx(tx)
, m_run(1) , m_run(1)
@ -48,9 +49,14 @@ public:
, outputSampleRate_(outputSampleRate) , outputSampleRate_(outputSampleRate)
, equalizedMicAudioLink_(micAudioLink) , equalizedMicAudioLink_(micAudioLink)
, hasEooBeenSent_(false) , hasEooBeenSent_(false)
, helper_(helper)
{ {
assert(inputSampleRate_ > 0); assert(inputSampleRate_ > 0);
assert(outputSampleRate_ > 0); assert(outputSampleRate_ > 0);
inputSamples_ = std::shared_ptr<short>(
new short[std::max(inputSampleRate_, outputSampleRate_)],
std::default_delete<short[]>());
} }
// thread execution starts here // thread execution starts here
@ -74,6 +80,8 @@ private:
int outputSampleRate_; int outputSampleRate_;
LinkStep* equalizedMicAudioLink_; LinkStep* equalizedMicAudioLink_;
bool hasEooBeenSent_; bool hasEooBeenSent_;
std::shared_ptr<IRealtimeHelper> helper_;
std::shared_ptr<short> inputSamples_;
void initializePipeline_(); void initializePipeline_();
void txProcessing_(); void txProcessing_();

View File

@ -19,7 +19,7 @@ public:
bool tapDataEqual() bool tapDataEqual()
{ {
PassThroughStep* step = new PassThroughStep; PassThroughStep* step = new PassThroughStep;
TapStep tapStep(8000, step); TapStep tapStep(8000, step, false);
int outputSamples = 0; int outputSamples = 0;
short* pData = new short[1]; short* pData = new short[1];

View File

@ -0,0 +1,46 @@
//=========================================================================
// Name: IRealtimeHelper.h
// Purpose: Defines the interface to a helper for real-time operation.
//
// Authors: Mooneer Salem
// License:
//
// All rights reserved.
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 2.1,
// as published by the Free Software Foundation. This program is
// distributed in the hope that it will be useful, but WITHOUT ANY
// WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
// License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, see <http://www.gnu.org/licenses/>.
//
//=========================================================================
#ifndef I_REALTIME_HELPER_H
#define I_REALTIME_HELPER_H
class IRealtimeHelper
{
public:
// Configures current thread for real-time priority. This should be
// called from the thread that will be operating on received audio.
virtual void setHelperRealTime() = 0;
// Lets audio system know that we're beginning to do work with the
// received audio.
virtual void startRealTimeWork() = 0;
// Lets audio system know that we're done with the work on the received
// audio.
virtual void stopRealTimeWork() = 0;
// Reverts real-time priority for current thread.
virtual void clearHelperRealTime() = 0;
};
#endif

View File

@ -20,8 +20,11 @@
// //
//========================================================================= //=========================================================================
#include <chrono>
#include "ThreadedObject.h" #include "ThreadedObject.h"
using namespace std::chrono_literals;
ThreadedObject::ThreadedObject() ThreadedObject::ThreadedObject()
: isDestroying_(false) : isDestroying_(false)
{ {
@ -37,10 +40,41 @@ ThreadedObject::~ThreadedObject()
objectThread_.join(); objectThread_.join();
} }
void ThreadedObject::enqueue_(std::function<void()> fn) void ThreadedObject::enqueue_(std::function<void()> fn, int timeoutMilliseconds)
{ {
std::unique_lock<std::recursive_mutex> lk(eventQueueMutex_); std::unique_lock<std::recursive_mutex> lk(eventQueueMutex_, std::defer_lock_t());
if (timeoutMilliseconds == 0)
{
lk.lock();
}
else
{
auto beginTime = std::chrono::high_resolution_clock::now();
auto endTime = std::chrono::high_resolution_clock::now();
bool locked = false;
do
{
if (lk.try_lock())
{
locked = true;
break;
}
std::this_thread::sleep_for(1ms);
endTime = std::chrono::high_resolution_clock::now();
} while ((endTime - beginTime) < std::chrono::milliseconds(timeoutMilliseconds));
if (!locked)
{
// could not lock, so we're not bothering to enqueue.
return;
}
}
eventQueue_.push_back(fn); eventQueue_.push_back(fn);
lk.unlock();
eventQueueCV_.notify_one(); eventQueueCV_.notify_one();
} }

View File

@ -37,7 +37,9 @@ public:
protected: protected:
ThreadedObject(); ThreadedObject();
void enqueue_(std::function<void()> fn); // Enqueues some code to run on a different thread.
// @param timeoutMilliseconds Timeout to wait for lock. Note: if we can't get a lock within the timeout, the function doesn't run!
void enqueue_(std::function<void()> fn, int timeoutMilliseconds = 0);
private: private:
bool isDestroying_; bool isDestroying_;