From 439e54035f095958985d811eae8d59f6a4050bf7 Mon Sep 17 00:00:00 2001 From: Mooneer Salem Date: Wed, 23 Apr 2025 00:18:24 -0700 Subject: [PATCH] Request real-time priority from the operating system for audio pipeline (#866) * Bound requested frames between both min and max. * Request real-time scheduling for pipeline threads (currently macOS-only). * Ensure pipeline threads never execute at the same time as audio threads. * macOS: ask for 5ms audio blocks. * Temporarily re-enable stress test. * Suppress codespell for .mm files. * Swap times/buffer sizes. * Fix compiler error. * Go back to previous frame size and time. * Re-disable stress test. * Try adjusting time quantum. * Move audio priority handling to audio layer. * Set helper thread to real-time on Windows. * Temporarily disable audio workgroup code. * Disable periodicity in macOS RT request * Forgot to define variable. * Only allow a max of 50% CPU time for audio helpers. * More tuning of the duty cycle. * Make sure we sleep some amount of time every time through the loop. * Go back to smaller time quantum. * Another quantum tweak. * Increase audio sample block size to 20ms. * Use 2.5ms for audio block time. * Try increasing FIFO sizes for RX. * Oops, forgot to comment out the / 1000 too. * Get faster turnaround for macOS GH builds. * Revert FIFO changes. * Add additional debugging. * Fix compiler error. * Fix typo. * Use semaphores to notify TX/RX worker threads. * Try dividing duty cycle by 2 to avoid starvation. * Reenable audio workgroups. * No point in having parallel threads if only one mode is active. * Ensure that ParallelStep gets added to the audio workgroup too. * Anything for GUI consumption shouldn't be in the RT path. * Go back to 10ms audio blocks to see if reporting tests can more reliably pass. * Fix issue causing audio to not work on certain Macs. * Support real-time threading on Linux when using PulseAudio/pipewire. * Fix misspelling. * dbus needs to be installed in the environment. * macOS: try a smaller block size again. * Revert "macOS: try a smaller block size again." This reverts commit 1d21ad69344101326107c7d04904bdc5547b590e. * Try architecture-specific audio block times. * rtkit itself also needs to be installed in the environment. * Revert ordering changes to macOS CI builds. * GH user needs to be added to rtkit group. * Implement semaphores on Windows. * Don't exit the render/capture threads unless requested by higher level code. * Don't take address of a pointer. * macOS: set thread QoS for RT threads. * Move RADE RX/TX memory allocations out of real-time path. * Moving additional memory allocations outside of RT context. * Move all remaining allocations in pipeline outside of RT except for legacy FreeDV modes. * Longer audio block times don't seem to be making a difference. * Move legacy mode memory allocations out of RT context. * Prevent unbounded mutex locks inside pipeline. * Windows: fallback to a simple sleep if we can't lock the semaphore. * Increase maximum wait time to match what Windows returns for buffer size. * PulseAudio: fallback to simple sleep if we can't wait on semaphore. * Prevent unbounded locking in TapStep. --- .github/workflows/cmake-linux.yml | 5 +- CMakeLists.txt | 12 + cmake/FindDBus.cmake | 59 +++++ src/audio/CMakeLists.txt | 9 + src/audio/IAudioDevice.h | 25 +- src/audio/MacAudioDevice.h | 23 ++ src/audio/MacAudioDevice.mm | 204 +++++++++++++++- src/audio/PulseAudioDevice.cpp | 87 +++++++ src/audio/PulseAudioDevice.h | 17 +- src/audio/WASAPIAudioDevice.cpp | 72 +++++- src/audio/WASAPIAudioDevice.h | 18 ++ src/audio/rtkit.c | 308 +++++++++++++++++++++++++ src/audio/rtkit.h | 79 +++++++ src/freedv_interface.cpp | 9 +- src/freedv_interface.h | 5 +- src/main.cpp | 12 +- src/pipeline/ComputeRfSpectrumStep.cpp | 21 +- src/pipeline/ComputeRfSpectrumStep.h | 2 + src/pipeline/EqualizerStep.cpp | 23 +- src/pipeline/EqualizerStep.h | 1 + src/pipeline/FreeDVReceiveStep.cpp | 66 +++--- src/pipeline/FreeDVReceiveStep.h | 5 + src/pipeline/FreeDVTransmitStep.cpp | 75 +++--- src/pipeline/FreeDVTransmitStep.h | 6 + src/pipeline/LevelAdjustStep.cpp | 12 +- src/pipeline/LevelAdjustStep.h | 1 + src/pipeline/LinkStep.cpp | 18 +- src/pipeline/LinkStep.h | 12 +- src/pipeline/MuteStep.cpp | 17 +- src/pipeline/MuteStep.h | 1 + src/pipeline/ParallelStep.cpp | 14 +- src/pipeline/ParallelStep.h | 6 +- src/pipeline/PlaybackStep.cpp | 15 +- src/pipeline/PlaybackStep.h | 1 + src/pipeline/RADEReceiveStep.cpp | 63 ++--- src/pipeline/RADEReceiveStep.h | 6 + src/pipeline/RADETransmitStep.cpp | 78 ++++--- src/pipeline/RADETransmitStep.h | 6 + src/pipeline/ResampleStep.cpp | 47 ++-- src/pipeline/ResampleStep.h | 4 + src/pipeline/SpeexStep.cpp | 14 +- src/pipeline/SpeexStep.h | 2 + src/pipeline/TapStep.cpp | 21 +- src/pipeline/TapStep.h | 6 +- src/pipeline/ToneInterfererStep.cpp | 15 +- src/pipeline/ToneInterfererStep.h | 1 + src/pipeline/TxRxThread.cpp | 93 +++----- src/pipeline/TxRxThread.h | 10 +- src/pipeline/test/TapTest.cpp | 2 +- src/util/IRealtimeHelper.h | 46 ++++ src/util/ThreadedObject.cpp | 38 ++- src/util/ThreadedObject.h | 4 +- 52 files changed, 1383 insertions(+), 313 deletions(-) create mode 100644 cmake/FindDBus.cmake create mode 100644 src/audio/rtkit.c create mode 100644 src/audio/rtkit.h create mode 100644 src/util/IRealtimeHelper.h diff --git a/.github/workflows/cmake-linux.yml b/.github/workflows/cmake-linux.yml index 9e03cac0..a9bbd9be 100644 --- a/.github/workflows/cmake-linux.yml +++ b/.github/workflows/cmake-linux.yml @@ -25,11 +25,12 @@ jobs: run: | sudo apt-get update sudo apt-get upgrade -y - sudo apt-get install codespell libpulse-dev libspeexdsp-dev libsamplerate0-dev sox git libwxgtk3.2-dev portaudio19-dev libhamlib-dev libasound2-dev libao-dev libgsm1-dev libsndfile-dev xvfb pipewire pulseaudio-utils pipewire-pulse wireplumber metacity dbus-x11 at-spi2-core rtkit octave octave-signal + sudo apt-get install codespell libpulse-dev libspeexdsp-dev libsamplerate0-dev sox git libwxgtk3.2-dev portaudio19-dev libhamlib-dev libasound2-dev libao-dev libgsm1-dev libsndfile-dev xvfb pipewire pulseaudio-utils pipewire-pulse wireplumber metacity dbus-x11 at-spi2-core rtkit octave octave-signal libdbus-1-dev + sudo usermod -a -G rtkit $(whoami) - name: Spellcheck codebase shell: bash - run: codespell --ignore-words-list=caf,radae,rade,inout,nin,ontop,parm,tthe,ue `find src -name '*.c*' -o -name '*.h' -o -name '*.mm' | grep -v 3rdparty` + run: codespell --ignore-words-list=caf,radae,rade,inout,nin,ontop,parm,tthe,ue `find src -name '*.c*' -o -name '*.h' | grep -v 3rdparty` - name: Install Python required modules shell: bash diff --git a/CMakeLists.txt b/CMakeLists.txt index bbe55057..a018b3c3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -205,6 +205,18 @@ if(USE_STATIC_DEPS) set(USE_STATIC_SPEEXDSP TRUE FORCE) endif(USE_STATIC_DEPS) +# +# Find DBus (Linux only)--this is so that we can use +# rtkit to request real-time scheduling for our audio +# threads. +if (LINUX) + message(STATUS "Looking for DBus...") + find_package(DBus) + if (DBUS_FOUND) + message(STATUS "Found DBus, will attempt to use rtkit for real-time threading") + endif (DBUS_FOUND) +endif (LINUX) + # # Find wxWidgets # diff --git a/cmake/FindDBus.cmake b/cmake/FindDBus.cmake new file mode 100644 index 00000000..4a1a1805 --- /dev/null +++ b/cmake/FindDBus.cmake @@ -0,0 +1,59 @@ +# - Try to find DBus +# Once done, this will define +# +# DBUS_FOUND - system has DBus +# DBUS_INCLUDE_DIRS - the DBus include directories +# DBUS_LIBRARIES - link these to use DBus +# +# Copyright (C) 2012 Raphael Kubo da Costa +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER AND ITS CONTRIBUTORS ``AS +# IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR ITS +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; +# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +FIND_PACKAGE(PkgConfig) +PKG_CHECK_MODULES(PC_DBUS QUIET dbus-1) + +FIND_LIBRARY(DBUS_LIBRARIES + NAMES dbus-1 + HINTS ${PC_DBUS_LIBDIR} + ${PC_DBUS_LIBRARY_DIRS} +) + +FIND_PATH(DBUS_INCLUDE_DIR + NAMES dbus/dbus.h + HINTS ${PC_DBUS_INCLUDEDIR} + ${PC_DBUS_INCLUDE_DIRS} +) + +GET_FILENAME_COMPONENT(_DBUS_LIBRARY_DIR ${DBUS_LIBRARIES} PATH) +FIND_PATH(DBUS_ARCH_INCLUDE_DIR + NAMES dbus/dbus-arch-deps.h + HINTS ${PC_DBUS_INCLUDEDIR} + ${PC_DBUS_INCLUDE_DIRS} + ${_DBUS_LIBRARY_DIR} + ${DBUS_INCLUDE_DIR} + PATH_SUFFIXES include +) + +SET(DBUS_INCLUDE_DIRS ${DBUS_INCLUDE_DIR} ${DBUS_ARCH_INCLUDE_DIR}) + +INCLUDE(FindPackageHandleStandardArgs) +FIND_PACKAGE_HANDLE_STANDARD_ARGS(DBUS REQUIRED_VARS DBUS_INCLUDE_DIRS DBUS_LIBRARIES) diff --git a/src/audio/CMakeLists.txt b/src/audio/CMakeLists.txt index 577666e6..171fc522 100644 --- a/src/audio/CMakeLists.txt +++ b/src/audio/CMakeLists.txt @@ -10,9 +10,14 @@ if(NATIVE_AUDIO_AVAILABLE) WASAPIAudioDevice.cpp ) elseif(LINUX) + if (DBUS_FOUND) + set(RTKIT_FILES rtkit.c) + endif (DBUS_FOUND) + set(AUDIO_ENGINE_LIBRARY_SPECIFIC_FILES PulseAudioDevice.cpp PulseAudioEngine.cpp + ${RTKIT_FILES} ) endif() else() @@ -37,6 +42,10 @@ if(APPLE AND NATIVE_AUDIO_AVAILABLE) "-framework CoreAudio") elseif(WIN32 AND NATIVE_AUDIO_AVAILABLE) target_link_libraries(fdv_audio PRIVATE uuid avrt) +elseif(LINUX AND NATIVE_AUDIO_AVAILABLE AND DBUS_FOUND) + target_include_directories(fdv_audio PRIVATE ${DBUS_INCLUDE_DIRS}) + target_compile_definitions(fdv_audio PRIVATE -DUSE_RTKIT) + target_link_libraries(fdv_audio PRIVATE ${DBUS_LIBRARIES}) endif() if(BOOTSTRAP_WXWIDGETS) diff --git a/src/audio/IAudioDevice.h b/src/audio/IAudioDevice.h index c350b26e..51840c0f 100644 --- a/src/audio/IAudioDevice.h +++ b/src/audio/IAudioDevice.h @@ -26,9 +26,15 @@ #include #include #include -#include "AudioDeviceSpecification.h" +#include +#include -class IAudioDevice +#include "AudioDeviceSpecification.h" +#include "../util/IRealtimeHelper.h" + +using namespace std::chrono_literals; + +class IAudioDevice : public IRealtimeHelper { public: typedef std::function AudioDataCallbackFn; @@ -46,6 +52,21 @@ public: virtual bool isRunning() = 0; virtual int getLatencyInMicroseconds() = 0; + + // Configures current thread for real-time priority. This should be + // called from the thread that will be operating on received audio. + virtual void setHelperRealTime() override { /* empty */ } + + // Lets audio system know that we're beginning to do work with the + // received audio. + virtual void startRealTimeWork() override { /* empty */ } + + // Lets audio system know that we're done with the work on the received + // audio. + virtual void stopRealTimeWork() override { std::this_thread::sleep_for(10ms); } + + // Reverts real-time priority for current thread. + virtual void clearHelperRealTime() override { /* empty */ } // Sets user friendly description of device. Not used by all engines. void setDescription(std::string desc); diff --git a/src/audio/MacAudioDevice.h b/src/audio/MacAudioDevice.h index dfa87037..ef8193dc 100644 --- a/src/audio/MacAudioDevice.h +++ b/src/audio/MacAudioDevice.h @@ -23,6 +23,9 @@ #ifndef MAC_AUDIO_DEVICE_H #define MAC_AUDIO_DEVICE_H +#include +#include + #include "../util/ThreadedObject.h" #include "IAudioEngine.h" #include "IAudioDevice.h" @@ -41,6 +44,21 @@ public: virtual bool isRunning() override; virtual int getLatencyInMicroseconds() override; + + // Configures current thread for real-time priority. This should be + // called from the thread that will be operating on received audio. + virtual void setHelperRealTime() override; + + // Lets audio system know that we're beginning to do work with the + // received audio. + virtual void startRealTimeWork() override; + + // Lets audio system know that we're done with the work on the received + // audio. + virtual void stopRealTimeWork() override; + + // Reverts real-time priority for current thread. + virtual void clearHelperRealTime() override; protected: friend class MacAudioEngine; @@ -56,6 +74,11 @@ private: void* player_; // actually AVAudioPlayerNode short* inputFrames_; + + static thread_local void* workgroup_; + static thread_local void* joinToken_; + + dispatch_semaphore_t sem_; }; #endif // MAC_AUDIO_DEVICE_H diff --git a/src/audio/MacAudioDevice.mm b/src/audio/MacAudioDevice.mm index 3656da4c..973ba7e9 100644 --- a/src/audio/MacAudioDevice.mm +++ b/src/audio/MacAudioDevice.mm @@ -25,6 +25,26 @@ #include #import +#import + +#include +#include +#include +#include +#include + +thread_local void* MacAudioDevice::workgroup_ = nullptr; +thread_local void* MacAudioDevice::joinToken_ = nullptr; + +// One nanosecond in seconds. +constexpr static double kOneNanosecond = 1.0e9; + +// The I/O interval time in seconds. +//#if __aarch64__ +constexpr static double AUDIO_SAMPLE_BLOCK_SEC = 0.010; +//#else +//constexpr static double AUDIO_SAMPLE_BLOCK_SEC = 0.020; +//#endif // __aarch64__ static OSStatus GetIOBufferFrameSizeRange(AudioObjectID inDeviceID, UInt32* outMinimum, @@ -74,6 +94,8 @@ MacAudioDevice::MacAudioDevice(int coreAudioId, IAudioEngine::AudioDirection dir , inputFrames_(nullptr) { log_info("Create MacAudioDevice with ID %d, channels %d and sample rate %d", coreAudioId, numChannels, sampleRate); + + sem_ = dispatch_semaphore_create(0); } MacAudioDevice::~MacAudioDevice() @@ -82,6 +104,8 @@ MacAudioDevice::~MacAudioDevice() { stop(); } + + dispatch_release(sem_); } int MacAudioDevice::getNumChannels() @@ -132,12 +156,13 @@ void MacAudioDevice::start() // reduces dropouts on marginal hardware. UInt32 minFrameSize = 0; UInt32 maxFrameSize = 0; - UInt32 desiredFrameSize = 512; + UInt32 desiredFrameSize = pow(2, ceil(log(AUDIO_SAMPLE_BLOCK_SEC * sampleRate_) / log(2))); // next power of two GetIOBufferFrameSizeRange(coreAudioId_, &minFrameSize, &maxFrameSize); if (minFrameSize != 0 && maxFrameSize != 0) { log_info("Frame sizes of %d to %d are supported for audio device ID %d", minFrameSize, maxFrameSize, coreAudioId_); - desiredFrameSize = std::min(maxFrameSize, (UInt32)2048); // TBD: investigate why we need a significantly higher than default. + desiredFrameSize = std::min(maxFrameSize, desiredFrameSize); + desiredFrameSize = std::max(minFrameSize, desiredFrameSize); if (SetCurrentIOBufferFrameSize(coreAudioId_, desiredFrameSize) != noErr) { log_warn("Could not set IO frame size to %d for audio device ID %d", desiredFrameSize, coreAudioId_); @@ -224,6 +249,9 @@ void MacAudioDevice::start() onAudioDataFunction(*this, inputFrames_, frameCount, onAudioDataState); } + + dispatch_semaphore_signal(sem_); + return OSStatus(noErr); }; @@ -406,7 +434,7 @@ int MacAudioDevice::getLatencyInMicroseconds() 0, nullptr, &size, - &streams); + streams); if (result == noErr) { propertyAddress.mSelector = kAudioStreamPropertyLatency; @@ -429,3 +457,173 @@ int MacAudioDevice::getLatencyInMicroseconds() }); return fut.get(); } + +void MacAudioDevice::setHelperRealTime() +{ + // Set thread QoS to "user interactive" + pthread_set_qos_class_self_np(QOS_CLASS_USER_INTERACTIVE, 0); + + // Adapted from Chromium project. May need to adjust parameters + // depending on behavior. + + // Get current thread ID + auto currentThreadId = pthread_mach_thread_np(pthread_self()); + + // Increase thread priority to real-time. + // Please note that the thread_policy_set() calls may fail in + // rare cases if the kernel decides the system is under heavy load + // and is unable to handle boosting the thread priority. + // In these cases we just return early and go on with life. + // Make thread fixed priority. + thread_extended_policy_data_t policy; + policy.timeshare = 0; // Set to 1 for a non-fixed thread. + kern_return_t result = + thread_policy_set(currentThreadId, + THREAD_EXTENDED_POLICY, + reinterpret_cast(&policy), + THREAD_EXTENDED_POLICY_COUNT); + if (result != KERN_SUCCESS) + { + log_warn("Could not set current thread to real-time: %d"); + return; + } + + // Set to relatively high priority. + thread_precedence_policy_data_t precedence; + precedence.importance = 63; + result = thread_policy_set(currentThreadId, + THREAD_PRECEDENCE_POLICY, + reinterpret_cast(&precedence), + THREAD_PRECEDENCE_POLICY_COUNT); + if (result != KERN_SUCCESS) + { + log_warn("Could not increase thread priority"); + return; + } + + // Most important, set real-time constraints. + // Define the guaranteed and max fraction of time for the audio thread. + // These "duty cycle" values can range from 0 to 1. A value of 0.5 + // means the scheduler would give half the time to the thread. + // These values have empirically been found to yield good behavior. + // Good means that audio performance is high and other threads won't starve. + const double kGuaranteedAudioDutyCycle = 0.75; + const double kMaxAudioDutyCycle = 0.85; + + // Define constants determining how much time the audio thread can + // use in a given time quantum. All times are in milliseconds. + //auto sampleBuffer = pow(2, ceil(log(0.01 * sampleRate_) / log(2))); // next power of two + const double kTimeQuantum = AUDIO_SAMPLE_BLOCK_SEC * 1000; + + // Time guaranteed each quantum. + const double kAudioTimeNeeded = kGuaranteedAudioDutyCycle * kTimeQuantum; + + // Maximum time each quantum. + const double kMaxTimeAllowed = kMaxAudioDutyCycle * kTimeQuantum; + + // Get the conversion factor from milliseconds to absolute time + // which is what the time-constraints call needs. + mach_timebase_info_data_t tbInfo; + mach_timebase_info(&tbInfo); + double ms_to_abs_time = + (static_cast(tbInfo.denom) / tbInfo.numer) * 1000000; + thread_time_constraint_policy_data_t timeConstraints; + timeConstraints.period = kTimeQuantum * ms_to_abs_time; + timeConstraints.computation = kAudioTimeNeeded * ms_to_abs_time; + timeConstraints.constraint = kMaxTimeAllowed * ms_to_abs_time; + timeConstraints.preemptible = 1; + + result = + thread_policy_set(currentThreadId, + THREAD_TIME_CONSTRAINT_POLICY, + reinterpret_cast(&timeConstraints), + THREAD_TIME_CONSTRAINT_POLICY_COUNT); + if (result != KERN_SUCCESS) + { + log_warn("Could not set time constraint policy"); + return; + } + + // Join Core Audio workgroup + workgroup_ = nullptr; + joinToken_ = nullptr; + + if (@available(macOS 11.0, *)) + { + UInt32 size = sizeof(os_workgroup_t); + os_workgroup_t* wgMem = new os_workgroup_t; + assert(wgMem != nullptr); + os_workgroup_join_token_s* wgToken = new os_workgroup_join_token_s; + assert(wgToken != nullptr); + + AudioObjectPropertyAddress propertyAddress = { + .mSelector = kAudioDevicePropertyIOThreadOSWorkgroup, + .mScope = kAudioObjectPropertyScopeGlobal, + .mElement = kAudioObjectPropertyElementMain + }; + + OSStatus osResult = AudioObjectGetPropertyData( + coreAudioId_, + &propertyAddress, + 0, + nullptr, + &size, + wgMem); + if (osResult != noErr) + { + log_warn("Could not get audio workgroup"); + workgroup_ = nullptr; + joinToken_ = nullptr; + + delete wgMem; + delete wgToken; + return; + } + else + { + workgroup_ = wgMem; + joinToken_ = wgToken; + } + + auto workgroupResult = os_workgroup_join(*wgMem, wgToken); + if (workgroupResult != 0) + { + log_warn("Could not join Core Audio workgroup (err = %d)", workgroupResult); + workgroup_ = nullptr; + joinToken_ = nullptr; + delete wgMem; + delete wgToken; + } + } +} + +void MacAudioDevice::startRealTimeWork() +{ + // empty +} + +void MacAudioDevice::stopRealTimeWork() +{ + dispatch_semaphore_wait(sem_, dispatch_time(DISPATCH_TIME_NOW, AUDIO_SAMPLE_BLOCK_SEC * kOneNanosecond)); +} + +void MacAudioDevice::clearHelperRealTime() +{ + if (@available(macOS 11.0, *)) + { + if (workgroup_ != nullptr) + { + os_workgroup_t* wgMem = (os_workgroup_t*)workgroup_; + os_workgroup_join_token_s* wgToken = (os_workgroup_join_token_s*)joinToken_; + + os_workgroup_leave(*wgMem, wgToken); + + delete wgMem; + delete wgToken; + + workgroup_ = nullptr; + joinToken_ = nullptr; + } + } +} + diff --git a/src/audio/PulseAudioDevice.cpp b/src/audio/PulseAudioDevice.cpp index 29353e01..ec793d1b 100644 --- a/src/audio/PulseAudioDevice.cpp +++ b/src/audio/PulseAudioDevice.cpp @@ -23,9 +23,15 @@ #include #include #include +#include +#include #include "PulseAudioDevice.h" +#if defined(USE_RTKIT) +#include "rtkit.h" +#endif // defined(USE_RTKIT) + #include "../util/logging/ulog.h" using namespace std::chrono_literals; @@ -139,6 +145,12 @@ void PulseAudioDevice::start() } else { + // Set up semaphore for signaling workers + if (sem_init(&sem_, 0, 0) < 0) + { + log_warn("Could not set up semaphore (errno = %d)", errno); + } + // Start data collection thread. This thread // is necessary in order to ensure that we can // provide data to PulseAudio at a rate expected @@ -191,6 +203,7 @@ void PulseAudioDevice::start() { onAudioDataFunction(*this, data, currentLength / getNumChannels(), onAudioDataState); } + sem_post(&sem_); // Sleep up to the number of milliseconds corresponding to the data received int numMilliseconds = 1000.0 * ((double)currentLength / getNumChannels()) / (double)getSampleRate(); @@ -307,6 +320,8 @@ void PulseAudioDevice::stop() delete inputPendingThread_; inputPendingThread_ = nullptr; } + + sem_destroy(&sem_); } } @@ -321,6 +336,78 @@ int PulseAudioDevice::getLatencyInMicroseconds() return (int)latency; } +void PulseAudioDevice::setHelperRealTime() +{ + // Set RLIMIT_RTTIME, required for rtkit + struct rlimit rlim; + memset(&rlim, 0, sizeof(rlim)); + rlim.rlim_cur = 100000ULL; // 100ms + rlim.rlim_max = rlim.rlim_cur; + + if ((setrlimit(RLIMIT_RTTIME, &rlim) < 0)) + { + log_warn("Could not set RLIMIT_RTTIME (errno = %d)", errno); + } + + // Prerequisite: SCHED_RR and SCHED_RESET_ON_FORK need to be set. + struct sched_param p; + memset(&p, 0, sizeof(p)); + + p.sched_priority = 3; + if (sched_setscheduler(0, SCHED_RR|SCHED_RESET_ON_FORK, &p) < 0 && errno == EPERM) + { +#if defined(USE_RTKIT) + DBusError error; + DBusConnection* bus = nullptr; + int result = 0; + + dbus_error_init(&error); + if (!(bus = dbus_bus_get(DBUS_BUS_SYSTEM, &error))) + { + log_warn("Could not connect to system bus: %s", error.message); + } + else if ((result = rtkit_make_realtime(bus, 0, p.sched_priority)) < 0) + { + log_warn("rtkit could not make real-time: %s", strerror(-result)); + } + + if (bus != nullptr) + { + dbus_connection_unref(bus); + } +#else + log_warn("No permission to make real-time"); +#endif // defined(USE_RTKIT) + } +} + +void PulseAudioDevice::stopRealTimeWork() +{ + struct timespec ts; + + if (clock_gettime(CLOCK_REALTIME, &ts) == -1) + { + // Fallback to simple sleep. + IAudioDevice::stopRealTimeWork(); + return; + } + + ts.tv_nsec += 10000000; + ts.tv_sec += (ts.tv_nsec / 1000000000); + ts.tv_nsec = ts.tv_nsec % 1000000000; + + if (sem_timedwait(&sem_, &ts) < 0 && errno != ETIMEDOUT) + { + // Fallback to simple sleep. + IAudioDevice::stopRealTimeWork(); + } +} + +void PulseAudioDevice::clearHelperRealTime() +{ + IAudioDevice::clearHelperRealTime(); +} + void PulseAudioDevice::StreamReadCallback_(pa_stream *s, size_t length, void *userdata) { const void* data = nullptr; diff --git a/src/audio/PulseAudioDevice.h b/src/audio/PulseAudioDevice.h index a5492f39..4f115026 100644 --- a/src/audio/PulseAudioDevice.h +++ b/src/audio/PulseAudioDevice.h @@ -28,6 +28,8 @@ #include #include #include +#include + #include "IAudioEngine.h" #include "IAudioDevice.h" @@ -45,7 +47,18 @@ public: virtual bool isRunning() override; virtual int getLatencyInMicroseconds() override; - + + // Configures current thread for real-time priority. This should be + // called from the thread that will be operating on received audio. + virtual void setHelperRealTime() override; + + // Lets audio system know that we're done with the work on the received + // audio. + virtual void stopRealTimeWork() override; + + // Reverts real-time priority for current thread. + virtual void clearHelperRealTime() override; + protected: // PulseAudioDevice cannot be created directly, only via PulseAudioEngine. friend class PulseAudioEngine; @@ -77,6 +90,8 @@ private: std::mutex streamStateMutex_; std::condition_variable streamStateCondVar_; + sem_t sem_; + static void StreamReadCallback_(pa_stream *s, size_t length, void *userdata); static void StreamWriteCallback_(pa_stream *s, size_t length, void *userdata); static void StreamUnderflowCallback_(pa_stream *p, void *userdata); diff --git a/src/audio/WASAPIAudioDevice.cpp b/src/audio/WASAPIAudioDevice.cpp index 0ea01b8f..7acc1025 100644 --- a/src/audio/WASAPIAudioDevice.cpp +++ b/src/audio/WASAPIAudioDevice.cpp @@ -29,10 +29,12 @@ #include #include "../util/logging/ulog.h" -#define BLOCK_TIME_NS (40000000) +#define BLOCK_TIME_NS (0) // Nanoseconds per REFERENCE_TIME unit #define NS_PER_REFTIME (100) + +thread_local HANDLE WASAPIAudioDevice::helperTask_ = nullptr; WASAPIAudioDevice::WASAPIAudioDevice(IAudioClient* client, IAudioEngine::AudioDirection direction, int sampleRate, int numChannels) : client_(client) @@ -47,6 +49,7 @@ WASAPIAudioDevice::WASAPIAudioDevice(IAudioClient* client, IAudioEngine::AudioDi , latencyFrames_(0) , renderCaptureEvent_(nullptr) , isRenderCaptureRunning_(false) + , semaphore_(nullptr) { // empty } @@ -254,6 +257,15 @@ void WASAPIAudioDevice::start() } } + // Create semaphore + semaphore_ = CreateSemaphore(nullptr, 0, 1, nullptr); + if (semaphore_ == nullptr) + { + std::stringstream ss; + ss << "Could not create semaphore (err = " << GetLastError() << ")"; + log_warn(ss.str().c_str()); + } + // Start render/capture hr = client_->Start(); if (FAILED(hr)) @@ -296,8 +308,9 @@ void WASAPIAudioDevice::start() log_warn("Could not increase thread priority"); } - while (isRenderCaptureRunning_ && WaitForSingleObject(renderCaptureEvent_, 100) == WAIT_OBJECT_0) + while (isRenderCaptureRunning_) { + WaitForSingleObject(renderCaptureEvent_, 100); if (direction_ == IAudioEngine::AUDIO_ENGINE_OUT) { renderAudio_(); @@ -364,6 +377,12 @@ void WASAPIAudioDevice::stop() captureClient_ = nullptr; } + if (semaphore_ != nullptr) + { + CloseHandle(semaphore_); + semaphore_ = nullptr; + } + prom->set_value(); }); fut.wait(); @@ -381,6 +400,49 @@ int WASAPIAudioDevice::getLatencyInMicroseconds() return 1000000 * latencyFrames_ / sampleRate_; } +void WASAPIAudioDevice::setHelperRealTime() +{ + DWORD taskIndex = 0; + helperTask_ = AvSetMmThreadCharacteristics(TEXT("Pro Audio"), &taskIndex); + if (helperTask_ == nullptr) + { + log_warn("Could not increase thread priority"); + } +} + +void WASAPIAudioDevice::startRealTimeWork() +{ + // empty +} + +void WASAPIAudioDevice::stopRealTimeWork() +{ + if (semaphore_ == nullptr) + { + // Fallback to base class behavior + IAudioDevice::stopRealTimeWork(); + return; + } + + // Wait a maximum of (bufferSize / sampleRate) seconds for the semaphore to return + DWORD result = WaitForSingleObject(semaphore_, (1000 * bufferFrameCount_) / sampleRate_); + + if (result != WAIT_TIMEOUT && result != WAIT_OBJECT_0) + { + // Fallback to a simple sleep. + IAudioDevice::stopRealTimeWork(); + } +} + +void WASAPIAudioDevice::clearHelperRealTime() +{ + if (helperTask_ != nullptr) + { + AvRevertMmThreadCharacteristics(helperTask_); + helperTask_ = nullptr; + } +} + void WASAPIAudioDevice::renderAudio_() { // If client is no longer available, abort @@ -514,4 +576,10 @@ void WASAPIAudioDevice::captureAudio_() return; } } + + if (semaphore_ != nullptr) + { + // Notify worker threads + ReleaseSemaphore(semaphore_, 1, nullptr); + } } diff --git a/src/audio/WASAPIAudioDevice.h b/src/audio/WASAPIAudioDevice.h index c0a90403..e7ebad83 100644 --- a/src/audio/WASAPIAudioDevice.h +++ b/src/audio/WASAPIAudioDevice.h @@ -49,6 +49,21 @@ public: virtual bool isRunning() override; virtual int getLatencyInMicroseconds() override; + + // Configures current thread for real-time priority. This should be + // called from the thread that will be operating on received audio. + virtual void setHelperRealTime() override; + + // Lets audio system know that we're beginning to do work with the + // received audio. + virtual void startRealTimeWork() override; + + // Lets audio system know that we're done with the work on the received + // audio. + virtual void stopRealTimeWork() override; + + // Reverts real-time priority for current thread. + virtual void clearHelperRealTime() override; protected: friend class WASAPIAudioEngine; @@ -69,9 +84,12 @@ private: std::thread renderCaptureThread_; HANDLE renderCaptureEvent_; bool isRenderCaptureRunning_; + HANDLE semaphore_; void renderAudio_(); void captureAudio_(); + + static thread_local HANDLE helperTask_; }; #endif // WASAPI_AUDIO_DEVICE_H diff --git a/src/audio/rtkit.c b/src/audio/rtkit.c new file mode 100644 index 00000000..a9806a8c --- /dev/null +++ b/src/audio/rtkit.c @@ -0,0 +1,308 @@ +/*-*- Mode: C; c-basic-offset: 8 -*-*/ + +/*** + Copyright 2009 Lennart Poettering + Copyright 2010 David Henningsson + + Permission is hereby granted, free of charge, to any person + obtaining a copy of this software and associated documentation files + (the "Software"), to deal in the Software without restriction, + including without limitation the rights to use, copy, modify, merge, + publish, distribute, sublicense, and/or sell copies of the Software, + and to permit persons to whom the Software is furnished to do so, + subject to the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +***/ + +#include + +#include "rtkit.h" + +#ifdef __linux__ + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + +#include +#include +#include +#include + +static pid_t _gettid(void) { + return (pid_t) syscall(SYS_gettid); +} + +static int translate_error(const char *name) { + if (strcmp(name, DBUS_ERROR_NO_MEMORY) == 0) + return -ENOMEM; + if (strcmp(name, DBUS_ERROR_SERVICE_UNKNOWN) == 0 || + strcmp(name, DBUS_ERROR_NAME_HAS_NO_OWNER) == 0) + return -ENOENT; + if (strcmp(name, DBUS_ERROR_ACCESS_DENIED) == 0 || + strcmp(name, DBUS_ERROR_AUTH_FAILED) == 0) + return -EACCES; + + return -EIO; +} + +static long long rtkit_get_int_property(DBusConnection *connection, const char* propname, long long* propval) { + DBusMessage *m = NULL, *r = NULL; + DBusMessageIter iter, subiter; + dbus_int64_t i64; + dbus_int32_t i32; + DBusError error; + int current_type; + long long ret; + const char * interfacestr = "org.freedesktop.RealtimeKit1"; + + dbus_error_init(&error); + + if (!(m = dbus_message_new_method_call( + RTKIT_SERVICE_NAME, + RTKIT_OBJECT_PATH, + "org.freedesktop.DBus.Properties", + "Get"))) { + ret = -ENOMEM; + goto finish; + } + + if (!dbus_message_append_args( + m, + DBUS_TYPE_STRING, &interfacestr, + DBUS_TYPE_STRING, &propname, + DBUS_TYPE_INVALID)) { + ret = -ENOMEM; + goto finish; + } + + if (!(r = dbus_connection_send_with_reply_and_block(connection, m, -1, &error))) { + ret = translate_error(error.name); + goto finish; + } + + if (dbus_set_error_from_message(&error, r)) { + ret = translate_error(error.name); + goto finish; + } + + ret = -EBADMSG; + dbus_message_iter_init(r, &iter); + while ((current_type = dbus_message_iter_get_arg_type (&iter)) != DBUS_TYPE_INVALID) { + + if (current_type == DBUS_TYPE_VARIANT) { + dbus_message_iter_recurse(&iter, &subiter); + + while ((current_type = dbus_message_iter_get_arg_type (&subiter)) != DBUS_TYPE_INVALID) { + + if (current_type == DBUS_TYPE_INT32) { + dbus_message_iter_get_basic(&subiter, &i32); + *propval = i32; + ret = 0; + } + + if (current_type == DBUS_TYPE_INT64) { + dbus_message_iter_get_basic(&subiter, &i64); + *propval = i64; + ret = 0; + } + + dbus_message_iter_next (&subiter); + } + } + dbus_message_iter_next (&iter); + } + +finish: + + if (m) + dbus_message_unref(m); + + if (r) + dbus_message_unref(r); + + dbus_error_free(&error); + + return ret; +} + +int rtkit_get_max_realtime_priority(DBusConnection *connection) { + long long retval; + int err; + + err = rtkit_get_int_property(connection, "MaxRealtimePriority", &retval); + return err < 0 ? err : retval; +} + +int rtkit_get_min_nice_level(DBusConnection *connection, int* min_nice_level) { + long long retval; + int err; + + err = rtkit_get_int_property(connection, "MinNiceLevel", &retval); + if (err >= 0) + *min_nice_level = retval; + return err; +} + +long long rtkit_get_rttime_usec_max(DBusConnection *connection) { + long long retval; + int err; + + err = rtkit_get_int_property(connection, "RTTimeUSecMax", &retval); + return err < 0 ? err : retval; +} + +int rtkit_make_realtime(DBusConnection *connection, pid_t thread, int priority) { + DBusMessage *m = NULL, *r = NULL; + dbus_uint64_t u64; + dbus_uint32_t u32; + DBusError error; + int ret; + + dbus_error_init(&error); + + if (thread == 0) + thread = _gettid(); + + if (!(m = dbus_message_new_method_call( + RTKIT_SERVICE_NAME, + RTKIT_OBJECT_PATH, + "org.freedesktop.RealtimeKit1", + "MakeThreadRealtime"))) { + ret = -ENOMEM; + goto finish; + } + + u64 = (dbus_uint64_t) thread; + u32 = (dbus_uint32_t) priority; + + if (!dbus_message_append_args( + m, + DBUS_TYPE_UINT64, &u64, + DBUS_TYPE_UINT32, &u32, + DBUS_TYPE_INVALID)) { + ret = -ENOMEM; + goto finish; + } + + if (!(r = dbus_connection_send_with_reply_and_block(connection, m, -1, &error))) { + ret = translate_error(error.name); + goto finish; + } + + + if (dbus_set_error_from_message(&error, r)) { + ret = translate_error(error.name); + goto finish; + } + + ret = 0; + +finish: + + if (m) + dbus_message_unref(m); + + if (r) + dbus_message_unref(r); + + dbus_error_free(&error); + + return ret; +} + +int rtkit_make_high_priority(DBusConnection *connection, pid_t thread, int nice_level) { + DBusMessage *m = NULL, *r = NULL; + dbus_uint64_t u64; + dbus_int32_t s32; + DBusError error; + int ret; + + dbus_error_init(&error); + + if (thread == 0) + thread = _gettid(); + + if (!(m = dbus_message_new_method_call( + RTKIT_SERVICE_NAME, + RTKIT_OBJECT_PATH, + "org.freedesktop.RealtimeKit1", + "MakeThreadHighPriority"))) { + ret = -ENOMEM; + goto finish; + } + + u64 = (dbus_uint64_t) thread; + s32 = (dbus_int32_t) nice_level; + + if (!dbus_message_append_args( + m, + DBUS_TYPE_UINT64, &u64, + DBUS_TYPE_INT32, &s32, + DBUS_TYPE_INVALID)) { + ret = -ENOMEM; + goto finish; + } + + + + if (!(r = dbus_connection_send_with_reply_and_block(connection, m, -1, &error))) { + ret = translate_error(error.name); + goto finish; + } + + + if (dbus_set_error_from_message(&error, r)) { + ret = translate_error(error.name); + goto finish; + } + + ret = 0; + +finish: + + if (m) + dbus_message_unref(m); + + if (r) + dbus_message_unref(r); + + dbus_error_free(&error); + + return ret; +} + +#else + +int rtkit_make_realtime(DBusConnection *connection, pid_t thread, int priority) { + return -ENOTSUP; +} + +int rtkit_make_high_priority(DBusConnection *connection, pid_t thread, int nice_level) { + return -ENOTSUP; +} + +int rtkit_get_max_realtime_priority(DBusConnection *connection) { + return -ENOTSUP; +} + +int rtkit_get_min_nice_level(DBusConnection *connection, int* min_nice_level) { + return -ENOTSUP; +} + +long long rtkit_get_rttime_usec_max(DBusConnection *connection) { + return -ENOTSUP; +} + +#endif diff --git a/src/audio/rtkit.h b/src/audio/rtkit.h new file mode 100644 index 00000000..54882d4b --- /dev/null +++ b/src/audio/rtkit.h @@ -0,0 +1,79 @@ +/*-*- Mode: C; c-basic-offset: 8 -*-*/ + +#ifndef foortkithfoo +#define foortkithfoo + +/*** + Copyright 2009 Lennart Poettering + Copyright 2010 David Henningsson + + Permission is hereby granted, free of charge, to any person + obtaining a copy of this software and associated documentation files + (the "Software"), to deal in the Software without restriction, + including without limitation the rights to use, copy, modify, merge, + publish, distribute, sublicense, and/or sell copies of the Software, + and to permit persons to whom the Software is furnished to do so, + subject to the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +***/ + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* This is the reference implementation for a client for + * RealtimeKit. You don't have to use this, but if do, just copy these + * sources into your repository */ + +#define RTKIT_SERVICE_NAME "org.freedesktop.RealtimeKit1" +#define RTKIT_OBJECT_PATH "/org/freedesktop/RealtimeKit1" + +/* This is mostly equivalent to sched_setparam(thread, SCHED_RR, { + * .sched_priority = priority }). 'thread' needs to be a kernel thread + * id as returned by gettid(), not a pthread_t! If 'thread' is 0 the + * current thread is used. The returned value is a negative errno + * style error code, or 0 on success. */ +int rtkit_make_realtime(DBusConnection *system_bus, pid_t thread, int priority); + +/* This is mostly equivalent to setpriority(PRIO_PROCESS, thread, + * nice_level). 'thread' needs to be a kernel thread id as returned by + * gettid(), not a pthread_t! If 'thread' is 0 the current thread is + * used. The returned value is a negative errno style error code, or 0 + * on success.*/ +int rtkit_make_high_priority(DBusConnection *system_bus, pid_t thread, int nice_level); + +/* Return the maximum value of realtime priority available. Realtime requests + * above this value will fail. A negative value is an errno style error code. + */ +int rtkit_get_max_realtime_priority(DBusConnection *system_bus); + +/* Retrieve the minimum value of nice level available. High prio requests + * below this value will fail. The returned value is a negative errno + * style error code, or 0 on success.*/ +int rtkit_get_min_nice_level(DBusConnection *system_bus, int* min_nice_level); + +/* Return the maximum value of RLIMIT_RTTIME to set before attempting a + * realtime request. A negative value is an errno style error code. + */ +long long rtkit_get_rttime_usec_max(DBusConnection *system_bus); + + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/freedv_interface.cpp b/src/freedv_interface.cpp index cdce1a16..e3e53ef0 100644 --- a/src/freedv_interface.cpp +++ b/src/freedv_interface.cpp @@ -146,7 +146,7 @@ float FreeDVInterface::GetMinimumSNR_(int mode) void FreeDVInterface::start(int txMode, int fifoSizeMs, bool singleRxThread, bool usingReliableText) { sync_ = 0; - singleRxThread_ = singleRxThread; + singleRxThread_ = enabledModes_.size() > 1 ? singleRxThread : true; modemStatsList_ = new MODEM_STATS[enabledModes_.size()]; for (int index = 0; index < (int)enabledModes_.size(); index++) @@ -757,6 +757,7 @@ IPipelineStep* FreeDVInterface::createTransmitPipeline(int inputSampleRate, int modeFn, modeFn, parallelSteps, + nullptr, nullptr ); @@ -769,7 +770,8 @@ IPipelineStep* FreeDVInterface::createReceivePipeline( std::function getChannelNoiseFn, std::function getChannelNoiseSnrFn, std::function getFreqOffsetFn, - std::function getSigPwrAvgFn) + std::function getSigPwrAvgFn, + std::shared_ptr realtimeHelper) { std::vector parallelSteps; @@ -811,7 +813,8 @@ IPipelineStep* FreeDVInterface::createReceivePipeline( state->preProcessFn, state->postProcessFn, parallelSteps, - state + state, + realtimeHelper ); return parallelStep; diff --git a/src/freedv_interface.h b/src/freedv_interface.h index bb1b26c0..2f527d4e 100644 --- a/src/freedv_interface.h +++ b/src/freedv_interface.h @@ -52,6 +52,8 @@ extern "C" #include "lpcnet.h" } +#include "util/IRealtimeHelper.h" + #include class IPipelineStep; @@ -131,7 +133,8 @@ public: std::function getChannelNoiseFn, std::function getChannelNoiseSnrFn, std::function getFreqOffsetFn, - std::function getSigPwrAvgFn + std::function getSigPwrAvgFn, + std::shared_ptr realtimeHelper ); void restartTxVocoder(); diff --git a/src/main.cpp b/src/main.cpp index dc692c78..6919152b 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -3121,10 +3121,10 @@ void MainFrame::startRxStream() g_rxUserdata = new paCallBackData; // create FIFOs used to interface between IAudioEngine and txRx - // processing loop, which iterates about once every 20ms. - // Sample rate conversion, stats for spectral plots, and - // transmit processng are all performed in the tx/rxProcessing - // loop. + // processing loop, which iterates about once every 10-40ms + // (depending on platform/audio library). Sample rate conversion, + // stats for spectral plots, and transmit processng are all performed + // in the tx/rxProcessing loop. int m_fifoSize_ms = wxGetApp().appConfiguration.fifoSizeMs; int soundCard1InFifoSizeSamples = m_fifoSize_ms*wxGetApp().appConfiguration.audioConfiguration.soundCard1In.sampleRate / 1000; @@ -3430,7 +3430,7 @@ void MainFrame::startRxStream() // start tx/rx processing thread if (txInSoundDevice && txOutSoundDevice) { - m_txThread = new TxRxThread(true, txInSoundDevice->getSampleRate(), txOutSoundDevice->getSampleRate(), wxGetApp().linkStep.get()); + m_txThread = new TxRxThread(true, txInSoundDevice->getSampleRate(), txOutSoundDevice->getSampleRate(), wxGetApp().linkStep.get(), txInSoundDevice); if ( m_txThread->Create() != wxTHREAD_NO_ERROR ) { wxLogError(wxT("Can't create TX thread!")); @@ -3469,7 +3469,7 @@ void MainFrame::startRxStream() } } - m_rxThread = new TxRxThread(false, rxInSoundDevice->getSampleRate(), rxOutSoundDevice->getSampleRate(), wxGetApp().linkStep.get()); + m_rxThread = new TxRxThread(false, rxInSoundDevice->getSampleRate(), rxOutSoundDevice->getSampleRate(), wxGetApp().linkStep.get(), rxInSoundDevice); if ( m_rxThread->Create() != wxTHREAD_NO_ERROR ) { wxLogError(wxT("Can't create RX thread!")); diff --git a/src/pipeline/ComputeRfSpectrumStep.cpp b/src/pipeline/ComputeRfSpectrumStep.cpp index 792b9680..52ac735b 100644 --- a/src/pipeline/ComputeRfSpectrumStep.cpp +++ b/src/pipeline/ComputeRfSpectrumStep.cpp @@ -29,12 +29,17 @@ ComputeRfSpectrumStep::ComputeRfSpectrumStep( : modemStatsFn_(modemStatsFn) , getAvMagFn_(getAvMagFn) { - // empty + rxSpectrum_ = new float[MODEM_STATS_NSPEC]; + assert(rxSpectrum_ != nullptr); + + rxFdm_ = new COMP[FS]; + assert(rxFdm_ != nullptr); } ComputeRfSpectrumStep::~ComputeRfSpectrumStep() { - // empty + delete[] rxSpectrum_; + delete[] rxFdm_; } int ComputeRfSpectrumStep::getInputSampleRate() const @@ -49,29 +54,23 @@ int ComputeRfSpectrumStep::getOutputSampleRate() const std::shared_ptr ComputeRfSpectrumStep::execute(std::shared_ptr inputSamples, int numInputSamples, int* numOutputSamples) { - COMP* rx_fdm = new COMP[numInputSamples]; - assert(rx_fdm != nullptr); - - float rx_spec[MODEM_STATS_NSPEC]; - auto inputSamplesPtr = inputSamples.get(); for (int i = 0; i < numInputSamples; i++) { - rx_fdm[i].real = inputSamplesPtr[i]; + rxFdm_[i].real = inputSamplesPtr[i]; } - modem_stats_get_rx_spectrum(modemStatsFn_(), rx_spec, rx_fdm, numInputSamples); + modem_stats_get_rx_spectrum(modemStatsFn_(), rxSpectrum_, rxFdm_, numInputSamples); // Average rx spectrum data using a simple IIR low pass filter auto avMagPtr = getAvMagFn_(); for(int i = 0; i < MODEM_STATS_NSPEC; i++) { - avMagPtr[i] = BETA * avMagPtr[i] + (1.0 - BETA) * rx_spec[i]; + avMagPtr[i] = BETA * avMagPtr[i] + (1.0 - BETA) * rxSpectrum_[i]; } // Tap only, no output. *numOutputSamples = 0; - delete[] rx_fdm; return std::shared_ptr((short*)nullptr, std::default_delete()); } diff --git a/src/pipeline/ComputeRfSpectrumStep.h b/src/pipeline/ComputeRfSpectrumStep.h index 153506ba..7ef9d2b2 100644 --- a/src/pipeline/ComputeRfSpectrumStep.h +++ b/src/pipeline/ComputeRfSpectrumStep.h @@ -46,6 +46,8 @@ public: private: std::function modemStatsFn_; std::function getAvMagFn_; + float* rxSpectrum_; + COMP* rxFdm_; }; #endif // AUDIO_PIPELINE__COMPUTE_RF_SPECTRUM_STEP_H \ No newline at end of file diff --git a/src/pipeline/EqualizerStep.cpp b/src/pipeline/EqualizerStep.cpp index 45fa016d..e3453e9d 100644 --- a/src/pipeline/EqualizerStep.cpp +++ b/src/pipeline/EqualizerStep.cpp @@ -22,6 +22,7 @@ #include "EqualizerStep.h" +#include #include #include "../sox_biquad.h" #include @@ -34,7 +35,12 @@ EqualizerStep::EqualizerStep(int sampleRate, bool* enableFilter, void** bassFilt , trebleFilter_(trebleFilter) , volFilter_(volFilter) { - // empty + // Pre-allocate buffers so we don't have to do so during real-time operation. + auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate()); + outputSamples_ = std::shared_ptr( + new short[maxSamples], + std::default_delete()); + assert(outputSamples_ != nullptr); } EqualizerStep::~EqualizerStep() @@ -54,31 +60,28 @@ int EqualizerStep::getOutputSampleRate() const std::shared_ptr EqualizerStep::execute(std::shared_ptr inputSamples, int numInputSamples, int* numOutputSamples) { - short* outputSamples = new short[numInputSamples]; - assert(outputSamples != nullptr); - - memcpy(outputSamples, inputSamples.get(), sizeof(short)*numInputSamples); + memcpy(outputSamples_.get(), inputSamples.get(), sizeof(short)*numInputSamples); if (*enableFilter_) { if (*bassFilter_) { - sox_biquad_filter(*bassFilter_, outputSamples, outputSamples, numInputSamples); + sox_biquad_filter(*bassFilter_, outputSamples_.get(), outputSamples_.get(), numInputSamples); } if (*trebleFilter_) { - sox_biquad_filter(*trebleFilter_, outputSamples, outputSamples, numInputSamples); + sox_biquad_filter(*trebleFilter_, outputSamples_.get(), outputSamples_.get(), numInputSamples); } if (*midFilter_) { - sox_biquad_filter(*midFilter_, outputSamples, outputSamples, numInputSamples); + sox_biquad_filter(*midFilter_, outputSamples_.get(), outputSamples_.get(), numInputSamples); } if (*volFilter_) { - sox_biquad_filter(*volFilter_, outputSamples, outputSamples, numInputSamples); + sox_biquad_filter(*volFilter_, outputSamples_.get(), outputSamples_.get(), numInputSamples); } } *numOutputSamples = numInputSamples; - return std::shared_ptr(outputSamples, std::default_delete()); + return outputSamples_; } \ No newline at end of file diff --git a/src/pipeline/EqualizerStep.h b/src/pipeline/EqualizerStep.h index 0a813914..a875e00c 100644 --- a/src/pipeline/EqualizerStep.h +++ b/src/pipeline/EqualizerStep.h @@ -44,6 +44,7 @@ private: void** midFilter_; void** trebleFilter_; void** volFilter_; + std::shared_ptr outputSamples_; }; diff --git a/src/pipeline/FreeDVReceiveStep.cpp b/src/pipeline/FreeDVReceiveStep.cpp index 769d45c5..359da987 100644 --- a/src/pipeline/FreeDVReceiveStep.cpp +++ b/src/pipeline/FreeDVReceiveStep.cpp @@ -42,10 +42,30 @@ FreeDVReceiveStep::FreeDVReceiveStep(struct freedv* dv) rxFreqOffsetPhaseRectObjs_.real = cos(0.0); rxFreqOffsetPhaseRectObjs_.imag = sin(0.0); + + // Pre-allocate buffers so we don't have to do so during real-time operation. + auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate()); + outputSamples_ = std::shared_ptr( + new short[maxSamples], + std::default_delete()); + assert(outputSamples_ != nullptr); + + inputBuf_ = new short[freedv_get_n_max_modem_samples(dv_)]; + assert(inputBuf_ != nullptr); + + rxFdm_ = new COMP[freedv_get_n_max_modem_samples(dv_)]; + assert(rxFdm_ != nullptr); + + rxFdmOffset_ = new COMP[freedv_get_n_max_modem_samples(dv_)]; + assert(rxFdmOffset_ != nullptr); } FreeDVReceiveStep::~FreeDVReceiveStep() { + delete[] inputBuf_; + delete[] rxFdm_; + delete[] rxFdmOffset_; + if (inputSampleFifo_ != nullptr) { codec2_fifo_free(inputSampleFifo_); @@ -65,19 +85,6 @@ int FreeDVReceiveStep::getOutputSampleRate() const std::shared_ptr FreeDVReceiveStep::execute(std::shared_ptr inputSamples, int numInputSamples, int* numOutputSamples) { *numOutputSamples = 0; - short* outputSamples = nullptr; - - short* input_buf = new short[freedv_get_n_max_modem_samples(dv_)]; - assert(input_buf != nullptr); - - short* output_buf = new short[freedv_get_n_speech_samples(dv_)]; - assert(output_buf != nullptr); - - COMP* rx_fdm = new COMP[freedv_get_n_max_modem_samples(dv_)]; - assert(rx_fdm != nullptr); - - COMP* rx_fdm_offset = new COMP[freedv_get_n_max_modem_samples(dv_)]; - assert(rx_fdm_offset != nullptr); short* inputPtr = inputSamples.get(); while (numInputSamples > 0 && inputPtr != nullptr) @@ -87,46 +94,29 @@ std::shared_ptr FreeDVReceiveStep::execute(std::shared_ptr inputSa int nin = freedv_nin(dv_); int nout = 0; - while (codec2_fifo_read(inputSampleFifo_, input_buf, nin) == 0) + while (codec2_fifo_read(inputSampleFifo_, inputBuf_, nin) == 0) { assert(nin <= freedv_get_n_max_modem_samples(dv_)); // demod per frame processing for(int i=0; i(outputSamples, std::default_delete()); + + return outputSamples_; } diff --git a/src/pipeline/FreeDVReceiveStep.h b/src/pipeline/FreeDVReceiveStep.h index 3e400dc0..7e1d3a50 100644 --- a/src/pipeline/FreeDVReceiveStep.h +++ b/src/pipeline/FreeDVReceiveStep.h @@ -63,6 +63,11 @@ private: bool channelNoiseEnabled_; int channelNoiseSnr_; float freqOffsetHz_; + + std::shared_ptr outputSamples_; + short* inputBuf_; + COMP* rxFdm_; + COMP* rxFdmOffset_; }; #endif // AUDIO_PIPELINE__FREEDV_RECEIVE_STEP_H diff --git a/src/pipeline/FreeDVTransmitStep.cpp b/src/pipeline/FreeDVTransmitStep.cpp index 7cf4071f..4a955a59 100644 --- a/src/pipeline/FreeDVTransmitStep.cpp +++ b/src/pipeline/FreeDVTransmitStep.cpp @@ -40,10 +40,35 @@ FreeDVTransmitStep::FreeDVTransmitStep(struct freedv* dv, std::function txFreqOffsetPhaseRectObj_.real = cos(0.0); txFreqOffsetPhaseRectObj_.imag = sin(0.0); + + // Pre-allocate buffers so we don't have to do so during real-time operation. + auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate()); + outputSamples_ = std::shared_ptr( + new short[maxSamples], + std::default_delete()); + assert(outputSamples_ != nullptr); + + codecInput_ = new short[maxSamples]; + assert(codecInput_ != nullptr); + + tmpOutput_ = new short[maxSamples]; + assert(tmpOutput_ != nullptr); + + int nfreedv = freedv_get_n_nom_modem_samples(dv_); + + txFdm_ = new COMP[nfreedv]; + assert(txFdm_ != nullptr); + + txFdmOffset_ = new COMP[nfreedv]; + assert(txFdmOffset_ != nullptr); } FreeDVTransmitStep::~FreeDVTransmitStep() { + delete[] codecInput_; + delete[] txFdm_; + delete[] txFdmOffset_; + if (inputSampleFifo_ != nullptr) { codec2_fifo_free(inputSampleFifo_); @@ -61,9 +86,7 @@ int FreeDVTransmitStep::getOutputSampleRate() const } std::shared_ptr FreeDVTransmitStep::execute(std::shared_ptr inputSamples, int numInputSamples, int* numOutputSamples) -{ - short* outputSamples = nullptr; - +{ int mode = freedv_get_mode(dv_); int samplesUsedForFifo = freedv_get_n_speech_samples(dv_); int nfreedv = freedv_get_n_nom_modem_samples(dv_); @@ -78,54 +101,26 @@ std::shared_ptr FreeDVTransmitStep::execute(std::shared_ptr inputS if (codec2_fifo_used(inputSampleFifo_) >= samplesUsedForFifo) { - short* codecInput = new short[samplesUsedForFifo]; - assert(codecInput != nullptr); - - short* tmpOutput = new short[nfreedv]; - assert(tmpOutput != nullptr); - - codec2_fifo_read(inputSampleFifo_, codecInput, samplesUsedForFifo); + codec2_fifo_read(inputSampleFifo_, codecInput_, samplesUsedForFifo); if (mode == FREEDV_MODE_800XA) { /* 800XA doesn't support complex output just yet */ - freedv_tx(dv_, tmpOutput, codecInput); + freedv_tx(dv_, tmpOutput_, codecInput_); } else - { - COMP* tx_fdm = new COMP[nfreedv]; - assert(tx_fdm != nullptr); - - COMP* tx_fdm_offset = new COMP[nfreedv]; - assert(tx_fdm_offset != nullptr); + { + freedv_comptx(dv_, txFdm_, codecInput_); - freedv_comptx(dv_, tx_fdm, codecInput); - - freq_shift_coh(tx_fdm_offset, tx_fdm, getFreqOffsetFn_(), getOutputSampleRate(), &txFreqOffsetPhaseRectObj_, nfreedv); + freq_shift_coh(txFdmOffset_, txFdm_, getFreqOffsetFn_(), getOutputSampleRate(), &txFreqOffsetPhaseRectObj_, nfreedv); for(int i = 0; i(outputSamples, std::default_delete()); + return outputSamples_; } diff --git a/src/pipeline/FreeDVTransmitStep.h b/src/pipeline/FreeDVTransmitStep.h index 37b70068..3c53fdc1 100644 --- a/src/pipeline/FreeDVTransmitStep.h +++ b/src/pipeline/FreeDVTransmitStep.h @@ -50,6 +50,12 @@ private: std::function getFreqOffsetFn_; struct FIFO* inputSampleFifo_; COMP txFreqOffsetPhaseRectObj_; + + COMP* txFdm_; + COMP* txFdmOffset_; + short* codecInput_; + short* tmpOutput_; + std::shared_ptr outputSamples_; }; #endif // AUDIO_PIPELINE__FREEDV_TRANSMIT_STEP_H diff --git a/src/pipeline/LevelAdjustStep.cpp b/src/pipeline/LevelAdjustStep.cpp index 06b4be56..d3a17036 100644 --- a/src/pipeline/LevelAdjustStep.cpp +++ b/src/pipeline/LevelAdjustStep.cpp @@ -28,7 +28,12 @@ LevelAdjustStep::LevelAdjustStep(int sampleRate, std::function scaleFa : scaleFactorFn_(scaleFactorFn) , sampleRate_(sampleRate) { - // empty + // Pre-allocate buffers so we don't have to do so during real-time operation. + auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate()); + outputSamples_ = std::shared_ptr( + new short[maxSamples], + std::default_delete()); + assert(outputSamples_ != nullptr); } LevelAdjustStep::~LevelAdjustStep() @@ -48,14 +53,13 @@ int LevelAdjustStep::getOutputSampleRate() const std::shared_ptr LevelAdjustStep::execute(std::shared_ptr inputSamples, int numInputSamples, int* numOutputSamples) { - short* outputSamples = new short[numInputSamples]; double scaleFactor = scaleFactorFn_(); for (int index = 0; index < numInputSamples; index++) { - outputSamples[index] = inputSamples.get()[index] * scaleFactor; + outputSamples_.get()[index] = inputSamples.get()[index] * scaleFactor; } *numOutputSamples = numInputSamples; - return std::shared_ptr(outputSamples, std::default_delete()); + return outputSamples_; } diff --git a/src/pipeline/LevelAdjustStep.h b/src/pipeline/LevelAdjustStep.h index 2fb01198..9c96d59e 100644 --- a/src/pipeline/LevelAdjustStep.h +++ b/src/pipeline/LevelAdjustStep.h @@ -40,6 +40,7 @@ public: private: std::function scaleFactorFn_; int sampleRate_; + std::shared_ptr outputSamples_; }; #endif // AUDIO_PIPELINE__LEVEL_ADJUST_STEP_H \ No newline at end of file diff --git a/src/pipeline/LinkStep.cpp b/src/pipeline/LinkStep.cpp index 561e7748..da096cea 100644 --- a/src/pipeline/LinkStep.cpp +++ b/src/pipeline/LinkStep.cpp @@ -33,24 +33,25 @@ LinkStep::LinkStep(int outputSampleRate, size_t numSamples) // Create pipeline steps inputPipelineStep_ = std::make_shared(this); outputPipelineStep_ = std::make_shared(this); + + tmpBuffer_ = new short[numSamples]; + assert(tmpBuffer_ != nullptr); } LinkStep::~LinkStep() { codec2_fifo_destroy(fifo_); fifo_ = nullptr; + + delete[] tmpBuffer_; } void LinkStep::clearFifo() { int numUsed = codec2_fifo_used(fifo_); - short* tmp = new short[numUsed]; - assert(tmp != nullptr); // Read data and then promptly throw it out. - codec2_fifo_read(fifo_, tmp, numUsed); - - delete[] tmp; + codec2_fifo_read(fifo_, tmpBuffer_, numUsed); } std::shared_ptr LinkStep::InputStep::execute(std::shared_ptr inputSamples, int numInputSamples, int* numOutputSamples) @@ -74,11 +75,8 @@ std::shared_ptr LinkStep::OutputStep::execute(std::shared_ptr inpu if (*numOutputSamples > 0) { - short* outputSamples = new short[*numOutputSamples]; - assert(outputSamples != nullptr); - - codec2_fifo_read(fifo, outputSamples, *numOutputSamples); - return std::shared_ptr(outputSamples, std::default_delete()); + codec2_fifo_read(fifo, outputSamples_.get(), *numOutputSamples); + return outputSamples_; } else { diff --git a/src/pipeline/LinkStep.h b/src/pipeline/LinkStep.h index b55abf3b..c0bb7d0e 100644 --- a/src/pipeline/LinkStep.h +++ b/src/pipeline/LinkStep.h @@ -84,7 +84,12 @@ private: OutputStep(LinkStep* parent) : parent_(parent) { - // empty + // Pre-allocate buffers so we don't have to do so during real-time operation. + auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate()); + outputSamples_ = std::shared_ptr( + new short[maxSamples], + std::default_delete()); + assert(outputSamples_ != nullptr); } virtual ~OutputStep() = default; @@ -105,12 +110,15 @@ private: private: LinkStep* parent_; + std::shared_ptr outputSamples_; }; int sampleRate_; std::shared_ptr inputPipelineStep_; std::shared_ptr outputPipelineStep_; - FIFO* fifo_; + FIFO* fifo_; + + short* tmpBuffer_; }; #endif // AUDIO_PIPELINE__LINK_STEP_H \ No newline at end of file diff --git a/src/pipeline/MuteStep.cpp b/src/pipeline/MuteStep.cpp index de7cf3fc..51442744 100644 --- a/src/pipeline/MuteStep.cpp +++ b/src/pipeline/MuteStep.cpp @@ -20,6 +20,7 @@ // //========================================================================= +#include #include #include #include "MuteStep.h" @@ -27,7 +28,14 @@ MuteStep::MuteStep(int outputSampleRate) : sampleRate_(outputSampleRate) { - // empty + // Pre-allocate buffers so we don't have to do so during real-time operation. + auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate()); + outputSamples_ = std::shared_ptr( + new short[maxSamples], + std::default_delete()); + assert(outputSamples_ != nullptr); + + memset(outputSamples_.get(), 0, sizeof(short) * maxSamples); } // Executes pipeline step. @@ -42,12 +50,7 @@ std::shared_ptr MuteStep::execute(std::shared_ptr inputSamples, in if (*numOutputSamples > 0) { - short* outputSamples = new short[*numOutputSamples]; - assert(outputSamples != nullptr); - - memset(outputSamples, 0, sizeof(short) * (*numOutputSamples)); - - return std::shared_ptr(outputSamples, std::default_delete()); + return outputSamples_; } else { diff --git a/src/pipeline/MuteStep.h b/src/pipeline/MuteStep.h index fce13c18..9f086507 100644 --- a/src/pipeline/MuteStep.h +++ b/src/pipeline/MuteStep.h @@ -47,6 +47,7 @@ public: private: int sampleRate_; + std::shared_ptr outputSamples_; }; #endif // AUDIO_PIPELINE__MUTE_STEP_H \ No newline at end of file diff --git a/src/pipeline/ParallelStep.cpp b/src/pipeline/ParallelStep.cpp index 6bbb635a..df76c544 100644 --- a/src/pipeline/ParallelStep.cpp +++ b/src/pipeline/ParallelStep.cpp @@ -32,13 +32,15 @@ ParallelStep::ParallelStep( std::function inputRouteFn, std::function outputRouteFn, std::vector parallelSteps, - std::shared_ptr state) + std::shared_ptr state, + std::shared_ptr realtimeHelper) : inputSampleRate_(inputSampleRate) , outputSampleRate_(outputSampleRate) , runMultiThreaded_(runMultiThreaded) , inputRouteFn_(inputRouteFn) , outputRouteFn_(outputRouteFn) , state_(state) + , realtimeHelper_(realtimeHelper) { for (auto& step : parallelSteps) { @@ -51,7 +53,17 @@ ParallelStep::ParallelStep( state->exitingThread = false; state->thread = std::thread([this](ThreadInfo* threadState) { + if (realtimeHelper_) + { + realtimeHelper_->setHelperRealTime(); + } + executeRunnerThread_(threadState); + + if (realtimeHelper_) + { + realtimeHelper_->clearHelperRealTime(); + } }, state); threads_.push_back(state); diff --git a/src/pipeline/ParallelStep.h b/src/pipeline/ParallelStep.h index a3c121d3..9890aa0e 100644 --- a/src/pipeline/ParallelStep.h +++ b/src/pipeline/ParallelStep.h @@ -32,6 +32,8 @@ #include #include +#include "../util/IRealtimeHelper.h" + class ParallelStep : public IPipelineStep { public: @@ -41,7 +43,8 @@ public: std::function inputRouteFn, std::function outputRouteFn, std::vector parallelSteps, - std::shared_ptr state); + std::shared_ptr state, + std::shared_ptr realtimeHelper); virtual ~ParallelStep(); virtual int getInputSampleRate() const; @@ -81,6 +84,7 @@ private: std::map, std::shared_ptr> resamplers_; std::vector threads_; std::shared_ptr state_; + std::shared_ptr realtimeHelper_; void executeRunnerThread_(ThreadInfo* threadState); std::future enqueueTask_(ThreadInfo* taskQueueThread, IPipelineStep* step, std::shared_ptr inputSamples, int numInputSamples); diff --git a/src/pipeline/PlaybackStep.cpp b/src/pipeline/PlaybackStep.cpp index cf8491e1..052e26f0 100644 --- a/src/pipeline/PlaybackStep.cpp +++ b/src/pipeline/PlaybackStep.cpp @@ -33,7 +33,12 @@ PlaybackStep::PlaybackStep( , getSndFileFn_(getSndFileFn) , fileCompleteFn_(fileCompleteFn) { - // empty + // Pre-allocate buffers so we don't have to do so during real-time operation. + auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate()); + outputSamples_ = std::shared_ptr( + new short[maxSamples], + std::default_delete()); + assert(outputSamples_ != nullptr); } PlaybackStep::~PlaybackStep() @@ -57,19 +62,15 @@ std::shared_ptr PlaybackStep::execute(std::shared_ptr inputSamples assert(playFile != nullptr); unsigned int nsf = numInputSamples * getOutputSampleRate()/getInputSampleRate(); - short* outputSamples = nullptr; *numOutputSamples = 0; if (nsf > 0) { - outputSamples = new short[nsf]; - assert(outputSamples != nullptr); - - *numOutputSamples = sf_read_short(playFile, outputSamples, nsf); + *numOutputSamples = sf_read_short(playFile, outputSamples_.get(), nsf); if ((unsigned)*numOutputSamples < nsf) { fileCompleteFn_(); } } - return std::shared_ptr(outputSamples, std::default_delete()); + return outputSamples_; } diff --git a/src/pipeline/PlaybackStep.h b/src/pipeline/PlaybackStep.h index 38af159b..1f7af277 100644 --- a/src/pipeline/PlaybackStep.h +++ b/src/pipeline/PlaybackStep.h @@ -44,6 +44,7 @@ private: std::function fileSampleRateFn_; std::function getSndFileFn_; std::function fileCompleteFn_; + std::shared_ptr outputSamples_; }; #endif // AUDIO_PIPELINE__PLAYBACK_STEP_H \ No newline at end of file diff --git a/src/pipeline/RADEReceiveStep.cpp b/src/pipeline/RADEReceiveStep.cpp index 7eee603c..0f71fbd2 100644 --- a/src/pipeline/RADEReceiveStep.cpp +++ b/src/pipeline/RADEReceiveStep.cpp @@ -48,10 +48,36 @@ RADEReceiveStep::RADEReceiveStep(struct rade* dv, FARGANState* fargan, rade_text featuresFile_ = fopen((const char*)utRxFeatureFile.ToUTF8(), "wb"); assert(featuresFile_ != nullptr); } + + // Pre-allocate buffers so we don't have to do so during real-time operation. + auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate()); + outputSamples_ = std::shared_ptr( + new short[maxSamples], + std::default_delete()); + assert(outputSamples_ != nullptr); + + inputBufCplx_ = new RADE_COMP[rade_nin_max(dv_)]; + assert(inputBufCplx_ != nullptr); + + inputBuf_ = new short[rade_nin_max(dv_)]; + assert(inputBuf_ != nullptr); + + featuresOut_ = new float[rade_n_features_in_out(dv_)]; + assert(featuresOut_ != nullptr); + + eooOut_ = new float[rade_n_eoo_bits(dv_)]; + assert(eooOut_ != nullptr); + + pendingFeatures_.reserve(rade_n_features_in_out(dv_)); } RADEReceiveStep::~RADEReceiveStep() { + delete[] inputBuf_; + delete[] inputBufCplx_; + delete[] featuresOut_; + delete[] eooOut_; + if (featuresFile_ != nullptr) { fclose(featuresFile_); @@ -81,17 +107,7 @@ int RADEReceiveStep::getOutputSampleRate() const std::shared_ptr RADEReceiveStep::execute(std::shared_ptr inputSamples, int numInputSamples, int* numOutputSamples) { *numOutputSamples = 0; - short* outputSamples = nullptr; - RADE_COMP* input_buf_cplx = new RADE_COMP[rade_nin_max(dv_)]; - assert(input_buf_cplx != nullptr); - - short* input_buf = new short[rade_nin_max(dv_)]; - assert(input_buf != nullptr); - - float* features_out = new float[rade_n_features_in_out(dv_)]; - assert(features_out != nullptr); - short* inputPtr = inputSamples.get(); while (numInputSamples > 0 && inputPtr != nullptr) { @@ -100,38 +116,36 @@ std::shared_ptr RADEReceiveStep::execute(std::shared_ptr inputSamp int nin = rade_nin(dv_); int nout = 0; - while (codec2_fifo_read(inputSampleFifo_, input_buf, nin) == 0) + while (codec2_fifo_read(inputSampleFifo_, inputBuf_, nin) == 0) { assert(nin <= rade_nin_max(dv_)); // demod per frame processing for(int i=0; ifeatures). int hasEooOut = 0; - float* eooOut = new float[rade_n_eoo_bits(dv_)]; - assert(eooOut != nullptr); - nout = rade_rx(dv_, features_out, &hasEooOut, eooOut, input_buf_cplx); + nout = rade_rx(dv_, featuresOut_, &hasEooOut, eooOut_, inputBufCplx_); if (hasEooOut && textPtr_ != nullptr) { // Handle RX of bits from EOO. - rade_text_rx(textPtr_, eooOut, rade_n_eoo_bits(dv_) / 2); + rade_text_rx(textPtr_, eooOut_, rade_n_eoo_bits(dv_) / 2); } else if (!hasEooOut) { if (featuresFile_) { - fwrite(features_out, sizeof(float), nout, featuresFile_); + fwrite(featuresOut_, sizeof(float), nout, featuresFile_); } for (int i = 0; i < nout; i++) { - pendingFeatures_.push_back(features_out[i]); + pendingFeatures_.push_back(featuresOut_[i]); } // FARGAN processing (features->analog audio) @@ -163,21 +177,14 @@ std::shared_ptr RADEReceiveStep::execute(std::shared_ptr inputSamp } } - delete[] eooOut; nin = rade_nin(dv_); } } if (*numOutputSamples > 0) { - outputSamples = new short[*numOutputSamples]; - assert(outputSamples != nullptr); - codec2_fifo_read(outputSampleFifo_, outputSamples, *numOutputSamples); + codec2_fifo_read(outputSampleFifo_, outputSamples_.get(), *numOutputSamples); } - delete[] input_buf_cplx; - delete[] input_buf; - delete[] features_out; - - return std::shared_ptr(outputSamples, std::default_delete()); + return outputSamples_; } diff --git a/src/pipeline/RADEReceiveStep.h b/src/pipeline/RADEReceiveStep.h index c6730642..3d87392a 100644 --- a/src/pipeline/RADEReceiveStep.h +++ b/src/pipeline/RADEReceiveStep.h @@ -57,6 +57,12 @@ private: std::vector pendingFeatures_; FILE* featuresFile_; rade_text_t textPtr_; + + RADE_COMP* inputBufCplx_; + short* inputBuf_; + float* featuresOut_; + float* eooOut_; + std::shared_ptr outputSamples_; }; #endif // AUDIO_PIPELINE__RADE_RECEIVE_STEP_H diff --git a/src/pipeline/RADETransmitStep.cpp b/src/pipeline/RADETransmitStep.cpp index a35e0368..48e2198f 100644 --- a/src/pipeline/RADETransmitStep.cpp +++ b/src/pipeline/RADETransmitStep.cpp @@ -48,10 +48,41 @@ RADETransmitStep::RADETransmitStep(struct rade* dv, LPCNetEncState* encState) featuresFile_ = fopen((const char*)utTxFeatureFile.ToUTF8(), "wb"); assert(featuresFile_ != nullptr); } + + // Pre-allocate buffers so we don't have to do so during real-time operation. + auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate()); + outputSamples_ = std::shared_ptr( + new short[maxSamples], + std::default_delete()); + assert(outputSamples_ != nullptr); + + int numOutputSamples = rade_n_tx_out(dv_); + + radeOut_ = new RADE_COMP[numOutputSamples]; + assert(radeOut_ != nullptr); + + radeOutShort_ = new short[numOutputSamples]; + assert(radeOutShort_ != nullptr); + + const int NUM_SAMPLES_SILENCE = 60 * getOutputSampleRate() / 1000; + int numEOOSamples = rade_n_tx_eoo_out(dv_); + + eooOut_ = new RADE_COMP[numEOOSamples]; + assert(eooOut_ != nullptr); + + eooOutShort_ = new short[numEOOSamples + NUM_SAMPLES_SILENCE]; + assert(eooOutShort_ != nullptr); + + featureList_.reserve(rade_n_features_in_out(dv_)); } RADETransmitStep::~RADETransmitStep() { + delete[] radeOut_; + delete[] radeOutShort_; + delete[] eooOut_; + delete[] eooOutShort_; + if (featuresFile_ != nullptr) { fclose(featuresFile_); @@ -80,7 +111,6 @@ int RADETransmitStep::getOutputSampleRate() const std::shared_ptr RADETransmitStep::execute(std::shared_ptr inputSamples, int numInputSamples, int* numOutputSamples) { - short* outputSamples = nullptr; *numOutputSamples = 0; if (numInputSamples == 0) @@ -89,15 +119,12 @@ std::shared_ptr RADETransmitStep::execute(std::shared_ptr inputSam *numOutputSamples = std::min(codec2_fifo_used(outputSampleFifo_), (int)(RADE_MODEM_SAMPLE_RATE * .02)); if (*numOutputSamples > 0) { - outputSamples = new short[*numOutputSamples]; - assert(outputSamples != nullptr); - - codec2_fifo_read(outputSampleFifo_, outputSamples, *numOutputSamples); + codec2_fifo_read(outputSampleFifo_, outputSamples_.get(), *numOutputSamples); log_info("Returning %d EOO samples (remaining in FIFO: %d)", *numOutputSamples, codec2_fifo_used(outputSampleFifo_)); } - return std::shared_ptr(outputSamples, std::default_delete());; + return outputSamples_; } short* inputPtr = inputSamples.get(); @@ -113,12 +140,6 @@ std::shared_ptr RADETransmitStep::execute(std::shared_ptr inputSam short pcm[LPCNET_FRAME_SIZE]; float features[NB_TOTAL_FEATURES]; - RADE_COMP* radeOut = new RADE_COMP[numOutputSamples]; - assert(radeOut != nullptr); - - short* radeOutShort = new short[numOutputSamples]; - assert(radeOutShort != nullptr); - int arch = opus_select_arch(); // Feature extraction @@ -138,7 +159,7 @@ std::shared_ptr RADETransmitStep::execute(std::shared_ptr inputSam // RADE TX handling while (featureList_.size() >= numRequiredFeaturesForRADE) { - rade_tx(dv_, radeOut, &featureList_[0]); + rade_tx(dv_, radeOut_, &featureList_[0]); for (unsigned int index = 0; index < numRequiredFeaturesForRADE; index++) { featureList_.erase(featureList_.begin()); @@ -146,26 +167,20 @@ std::shared_ptr RADETransmitStep::execute(std::shared_ptr inputSam for (int index = 0; index < numOutputSamples; index++) { // We only need the real component for TX. - radeOutShort[index] = radeOut[index].real * RADE_SCALING_FACTOR; + radeOutShort_[index] = radeOut_[index].real * RADE_SCALING_FACTOR; } - codec2_fifo_write(outputSampleFifo_, radeOutShort, numOutputSamples); + codec2_fifo_write(outputSampleFifo_, radeOutShort_, numOutputSamples); } - - delete[] radeOutShort; - delete[] radeOut; } } *numOutputSamples = codec2_fifo_used(outputSampleFifo_); if (*numOutputSamples > 0) { - outputSamples = new short[*numOutputSamples]; - assert(outputSamples != nullptr); - - codec2_fifo_read(outputSampleFifo_, outputSamples, *numOutputSamples); + codec2_fifo_read(outputSampleFifo_, outputSamples_.get(), *numOutputSamples); } - return std::shared_ptr(outputSamples, std::default_delete()); + return outputSamples_; } void RADETransmitStep::restartVocoder() @@ -174,26 +189,17 @@ void RADETransmitStep::restartVocoder() const int NUM_SAMPLES_SILENCE = 60 * getOutputSampleRate() / 1000; int numEOOSamples = rade_n_tx_eoo_out(dv_); - RADE_COMP* eooOut = new RADE_COMP[numEOOSamples]; - assert(eooOut != nullptr); + rade_tx_eoo(dv_, eooOut_); - short* eooOutShort = new short[numEOOSamples + NUM_SAMPLES_SILENCE]; - assert(eooOutShort != nullptr); - - rade_tx_eoo(dv_, eooOut); - - memset(eooOutShort, 0, sizeof(short) * (numEOOSamples + NUM_SAMPLES_SILENCE)); + memset(eooOutShort_, 0, sizeof(short) * (numEOOSamples + NUM_SAMPLES_SILENCE)); for (int index = 0; index < numEOOSamples; index++) { - eooOutShort[index] = eooOut[index].real * RADE_SCALING_FACTOR; + eooOutShort_[index] = eooOut_[index].real * RADE_SCALING_FACTOR; } log_info("Queueing %d EOO samples to output FIFO", numEOOSamples + NUM_SAMPLES_SILENCE); - if (codec2_fifo_write(outputSampleFifo_, eooOutShort, numEOOSamples + NUM_SAMPLES_SILENCE) != 0) + if (codec2_fifo_write(outputSampleFifo_, eooOutShort_, numEOOSamples + NUM_SAMPLES_SILENCE) != 0) { log_warn("Could not queue EOO samples (remaining space in FIFO = %d)", codec2_fifo_free(outputSampleFifo_)); } - - delete[] eooOutShort; - delete[] eooOut; } diff --git a/src/pipeline/RADETransmitStep.h b/src/pipeline/RADETransmitStep.h index 941f41a7..9b71f831 100644 --- a/src/pipeline/RADETransmitStep.h +++ b/src/pipeline/RADETransmitStep.h @@ -51,6 +51,12 @@ private: std::vector featureList_; FILE* featuresFile_; + + std::shared_ptr outputSamples_; + RADE_COMP* radeOut_; + short* radeOutShort_; + RADE_COMP* eooOut_; + short* eooOutShort_; }; #endif // AUDIO_PIPELINE__RADE_TRANSMIT_STEP_H diff --git a/src/pipeline/ResampleStep.cpp b/src/pipeline/ResampleStep.cpp index c51b617a..47dc8c2d 100644 --- a/src/pipeline/ResampleStep.cpp +++ b/src/pipeline/ResampleStep.cpp @@ -36,24 +36,20 @@ static int resample_step(SRC_STATE *src, int output_sample_rate, int input_sample_rate, int length_output_short, // maximum output array length in samples - int length_input_short + int length_input_short, + float *tmpInput, + float *tmpOutput ) { SRC_DATA src_data; - float* input = new float[length_input_short]; - assert(input != nullptr); - - float* output = new float[length_output_short]; - assert(output != nullptr); - int ret; assert(src != NULL); - src_short_to_float_array(input_short, input, length_input_short); + src_short_to_float_array(input_short, tmpInput, length_input_short); - src_data.data_in = input; - src_data.data_out = output; + src_data.data_in = tmpInput; + src_data.data_out = tmpOutput; src_data.input_frames = length_input_short; src_data.output_frames = length_output_short; src_data.end_of_input = 0; @@ -67,10 +63,7 @@ static int resample_step(SRC_STATE *src, assert(ret == 0); assert(src_data.output_frames_gen <= length_output_short); - src_float_to_short_array(output, output_short, src_data.output_frames_gen); - - delete[] input; - delete[] output; + src_float_to_short_array(tmpOutput, output_short, src_data.output_frames_gen); return src_data.output_frames_gen; } @@ -82,11 +75,27 @@ ResampleStep::ResampleStep(int inputSampleRate, int outputSampleRate) int src_error; resampleState_ = src_new(SRC_SINC_MEDIUM_QUALITY, 1, &src_error); assert(resampleState_ != nullptr); + + // Pre-allocate buffers so we don't have to do so during real-time operation. + auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate()); + outputSamples_ = std::shared_ptr( + new short[maxSamples], + std::default_delete()); + assert(outputSamples_ != nullptr); + + tempInput_ = new float[std::max(inputSampleRate, outputSampleRate)]; + assert(tempInput_ != nullptr); + + tempOutput_ = new float[std::max(inputSampleRate, outputSampleRate)]; + assert(tempOutput_ != nullptr); } ResampleStep::~ResampleStep() { src_delete(resampleState_); + + delete[] tempInput_; + delete[] tempOutput_; } int ResampleStep::getInputSampleRate() const @@ -101,24 +110,20 @@ int ResampleStep::getOutputSampleRate() const std::shared_ptr ResampleStep::execute(std::shared_ptr inputSamples, int numInputSamples, int* numOutputSamples) { - short* outputSamples = nullptr; if (numInputSamples > 0) { double scaleFactor = ((double)outputSampleRate_)/((double)inputSampleRate_); int outputArraySize = std::max(numInputSamples, (int)(scaleFactor*numInputSamples)); assert(outputArraySize > 0); - outputSamples = new short[outputArraySize]; - assert(outputSamples != nullptr); - *numOutputSamples = resample_step( - resampleState_, outputSamples, inputSamples.get(), outputSampleRate_, - inputSampleRate_, outputArraySize, numInputSamples); + resampleState_, outputSamples_.get(), inputSamples.get(), outputSampleRate_, + inputSampleRate_, outputArraySize, numInputSamples, tempInput_, tempOutput_); } else { *numOutputSamples = 0; } - return std::shared_ptr(outputSamples, std::default_delete()); + return outputSamples_; } diff --git a/src/pipeline/ResampleStep.h b/src/pipeline/ResampleStep.h index c8ebe93b..a0dde6f3 100644 --- a/src/pipeline/ResampleStep.h +++ b/src/pipeline/ResampleStep.h @@ -41,6 +41,10 @@ private: int inputSampleRate_; int outputSampleRate_; SRC_STATE* resampleState_; + + float* tempInput_; + float* tempOutput_; + std::shared_ptr outputSamples_; }; #endif // AUDIO_PIPELINE__RESAMPLE_STEP_H \ No newline at end of file diff --git a/src/pipeline/SpeexStep.cpp b/src/pipeline/SpeexStep.cpp index 761524d9..6fd99671 100644 --- a/src/pipeline/SpeexStep.cpp +++ b/src/pipeline/SpeexStep.cpp @@ -41,6 +41,13 @@ SpeexStep::SpeexStep(int sampleRate) // Set FIFO to be 2x the number of samples per run so we don't lose anything. inputSampleFifo_ = codec2_fifo_create(numSamplesPerSpeexRun_ * 2); assert(inputSampleFifo_ != nullptr); + + // Pre-allocate buffers so we don't have to do so during real-time operation. + auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate()); + outputSamples_ = std::shared_ptr( + new short[maxSamples], + std::default_delete()); + assert(outputSamples_ != nullptr); } SpeexStep::~SpeexStep() @@ -61,17 +68,14 @@ int SpeexStep::getOutputSampleRate() const std::shared_ptr SpeexStep::execute(std::shared_ptr inputSamples, int numInputSamples, int* numOutputSamples) { - short* outputSamples = nullptr; *numOutputSamples = 0; int numSpeexRuns = (codec2_fifo_used(inputSampleFifo_) + numInputSamples) / numSamplesPerSpeexRun_; if (numSpeexRuns > 0) { *numOutputSamples = numSpeexRuns * numSamplesPerSpeexRun_; - outputSamples = new short[*numOutputSamples]; - assert(outputSamples != nullptr); - short* tmpOutput = outputSamples; + short* tmpOutput = outputSamples_.get(); short* tmpInput = inputSamples.get(); while (numInputSamples > 0 && tmpInput != nullptr) @@ -92,5 +96,5 @@ std::shared_ptr SpeexStep::execute(std::shared_ptr inputSamples, i codec2_fifo_write(inputSampleFifo_, inputSamples.get(), numInputSamples); } - return std::shared_ptr(outputSamples, std::default_delete()); + return outputSamples_; } diff --git a/src/pipeline/SpeexStep.h b/src/pipeline/SpeexStep.h index f2621aee..db2c17c4 100644 --- a/src/pipeline/SpeexStep.h +++ b/src/pipeline/SpeexStep.h @@ -50,6 +50,8 @@ private: SpeexPreprocessState* speexStateObj_; int numSamplesPerSpeexRun_; struct FIFO* inputSampleFifo_; + + std::shared_ptr outputSamples_; }; diff --git a/src/pipeline/TapStep.cpp b/src/pipeline/TapStep.cpp index 1981d387..53756f24 100644 --- a/src/pipeline/TapStep.cpp +++ b/src/pipeline/TapStep.cpp @@ -24,9 +24,10 @@ #include -TapStep::TapStep(int sampleRate, IPipelineStep* tapStep) +TapStep::TapStep(int sampleRate, IPipelineStep* tapStep, bool operateBackground) : tapStep_(tapStep) , sampleRate_(sampleRate) + , operateBackground_(operateBackground) { // empty } @@ -48,9 +49,23 @@ int TapStep::getOutputSampleRate() const std::shared_ptr TapStep::execute(std::shared_ptr inputSamples, int numInputSamples, int* numOutputSamples) { - int temp = 0; assert(tapStep_->getInputSampleRate() == sampleRate_); - tapStep_->execute(inputSamples, numInputSamples, &temp); + + if (operateBackground_) + { + // 5 millisecond timeout to queue to background thread. + // Since this is likely for the UI, it's fine if the step + // doesn't execute. + enqueue_([&, inputSamples, numInputSamples]() { + int temp = 0; + tapStep_->execute(inputSamples, numInputSamples, &temp); + }, 5); + } + else + { + int temp = 0; + tapStep_->execute(inputSamples, numInputSamples, &temp); + } *numOutputSamples = numInputSamples; return inputSamples; diff --git a/src/pipeline/TapStep.h b/src/pipeline/TapStep.h index 931aa362..fd0408e0 100644 --- a/src/pipeline/TapStep.h +++ b/src/pipeline/TapStep.h @@ -26,11 +26,12 @@ #include #include "IPipelineStep.h" +#include "util/ThreadedObject.h" -class TapStep : public IPipelineStep +class TapStep : public IPipelineStep, public ThreadedObject { public: - TapStep(int inputSampleRate, IPipelineStep* tapStep); + TapStep(int inputSampleRate, IPipelineStep* tapStep, bool operateBackground); virtual ~TapStep(); virtual int getInputSampleRate() const; @@ -40,6 +41,7 @@ public: private: std::shared_ptr tapStep_; int sampleRate_; + bool operateBackground_; }; #endif // AUDIO_PIPELINE__TAP_STEP_H \ No newline at end of file diff --git a/src/pipeline/ToneInterfererStep.cpp b/src/pipeline/ToneInterfererStep.cpp index 03459a9b..1fb9b477 100644 --- a/src/pipeline/ToneInterfererStep.cpp +++ b/src/pipeline/ToneInterfererStep.cpp @@ -38,7 +38,12 @@ ToneInterfererStep::ToneInterfererStep( , toneAmplitudeFn_(toneAmplitudeFn) , tonePhaseFn_(tonePhaseFn) { - // empty + // Pre-allocate buffers so we don't have to do so during real-time operation. + auto maxSamples = std::max(getInputSampleRate(), getOutputSampleRate()); + outputSamples_ = std::shared_ptr( + new short[maxSamples], + std::default_delete()); + assert(outputSamples_ != nullptr); } ToneInterfererStep::~ToneInterfererStep() @@ -59,11 +64,9 @@ int ToneInterfererStep::getOutputSampleRate() const std::shared_ptr ToneInterfererStep::execute( std::shared_ptr inputSamples, int numInputSamples, int* numOutputSamples) { - short* outputSamples = new short[numInputSamples]; - assert(outputSamples != nullptr); *numOutputSamples = numInputSamples; - memcpy(outputSamples, inputSamples.get(), numInputSamples * sizeof(short)); + memcpy(outputSamples_.get(), inputSamples.get(), numInputSamples * sizeof(short)); auto toneFrequency = toneFrequencyFn_(); auto toneAmplitude = toneAmplitudeFn_(); @@ -72,10 +75,10 @@ std::shared_ptr ToneInterfererStep::execute( float w = 2.0 * M_PI * toneFrequency / sampleRate_; for(int i = 0; i < numInputSamples; i++) { float s = (float)toneAmplitude * cos(*tonePhase); - outputSamples[i] += (int)s; + outputSamples_.get()[i] += (int)s; *tonePhase += w; } *tonePhase -= 2.0 * M_PI * floor(*tonePhase / (2.0 * M_PI)); - return std::shared_ptr(outputSamples, std::default_delete()); + return outputSamples_; } \ No newline at end of file diff --git a/src/pipeline/ToneInterfererStep.h b/src/pipeline/ToneInterfererStep.h index 6112ca8e..14cf520e 100644 --- a/src/pipeline/ToneInterfererStep.h +++ b/src/pipeline/ToneInterfererStep.h @@ -45,6 +45,7 @@ private: std::function toneFrequencyFn_; std::function toneAmplitudeFn_; std::function tonePhaseFn_; + std::shared_ptr outputSamples_; }; #endif // AUDIO_PIPELINE__TONE_INTERFERER_STEP_H \ No newline at end of file diff --git a/src/pipeline/TxRxThread.cpp b/src/pipeline/TxRxThread.cpp index 2d61380a..702d5246 100644 --- a/src/pipeline/TxRxThread.cpp +++ b/src/pipeline/TxRxThread.cpp @@ -48,6 +48,7 @@ using namespace std::chrono_literals; #include "LinkStep.h" #include "util/logging/ulog.h" +#include "os/os_interface.h" #include @@ -125,7 +126,8 @@ void TxRxThread::initializePipeline_() { // Function definitions shared across both pipelines. auto callbackLockFn = []() { - g_mutexProtectingCallbackData.Lock(); + // Prevent priority inversions by bounding the time we can wait for a lock. + g_mutexProtectingCallbackData.LockTimeout(5); }; auto callbackUnlockFn = []() { @@ -148,7 +150,7 @@ void TxRxThread::initializePipeline_() auto recordMicPipeline = new AudioPipeline(inputSampleRate_, inputSampleRate_); recordMicPipeline->appendPipelineStep(std::shared_ptr(recordMicStep)); - auto recordMicTap = new TapStep(inputSampleRate_, recordMicPipeline); + auto recordMicTap = new TapStep(inputSampleRate_, recordMicPipeline, false); auto bypassRecordMic = new AudioPipeline(inputSampleRate_, inputSampleRate_); auto eitherOrRecordMic = new EitherOrStep( @@ -215,7 +217,7 @@ void TxRxThread::initializePipeline_() auto micAudioPipeline = new AudioPipeline(inputSampleRate_, equalizedMicAudioLink_->getSampleRate()); micAudioPipeline->appendPipelineStep(equalizedMicAudioLink_->getInputPipelineStep()); - auto micAudioTap = std::make_shared(inputSampleRate_, micAudioPipeline); + auto micAudioTap = std::make_shared(inputSampleRate_, micAudioPipeline, false); pipeline_->appendPipelineStep(micAudioTap); } @@ -224,7 +226,7 @@ void TxRxThread::initializePipeline_() auto resampleForPlotPipeline = new AudioPipeline(inputSampleRate_, resampleForPlotStep->getOutputSampleRate()); resampleForPlotPipeline->appendPipelineStep(std::shared_ptr(resampleForPlotStep)); - auto resampleForPlotTap = new TapStep(inputSampleRate_, resampleForPlotPipeline); + auto resampleForPlotTap = new TapStep(inputSampleRate_, resampleForPlotPipeline, true); pipeline_->appendPipelineStep(std::shared_ptr(resampleForPlotTap)); // FreeDV TX step (analog leg) @@ -252,7 +254,7 @@ void TxRxThread::initializePipeline_() auto recordModulatedPipeline = new AudioPipeline(outputSampleRate_, recordModulatedStep->getOutputSampleRate()); recordModulatedPipeline->appendPipelineStep(std::shared_ptr(recordModulatedStep)); - auto recordModulatedTap = new TapStep(outputSampleRate_, recordModulatedPipeline); + auto recordModulatedTap = new TapStep(outputSampleRate_, recordModulatedPipeline, false); auto recordModulatedTapPipeline = new AudioPipeline(outputSampleRate_, outputSampleRate_); recordModulatedTapPipeline->appendPipelineStep(std::shared_ptr(recordModulatedTap)); @@ -293,7 +295,7 @@ void TxRxThread::initializePipeline_() auto recordRadioPipeline = new AudioPipeline(inputSampleRate_, inputSampleRate_); recordRadioPipeline->appendPipelineStep(std::shared_ptr(recordRadioStep)); - auto recordRadioTap = new TapStep(inputSampleRate_, recordRadioPipeline); + auto recordRadioTap = new TapStep(inputSampleRate_, recordRadioPipeline, false); auto bypassRecordRadio = new AudioPipeline(inputSampleRate_, inputSampleRate_); auto eitherOrRecordRadio = new EitherOrStep( @@ -324,7 +326,8 @@ void TxRxThread::initializePipeline_() auto eitherOrPlayRadioStep = new EitherOrStep( []() { - g_mutexProtectingCallbackData.Lock(); + // Prevent priority inversions by bounding the time we can wait for a lock. + g_mutexProtectingCallbackData.LockTimeout(5); auto result = g_playFileFromRadio && (g_sfPlayFileFromRadio != NULL); g_mutexProtectingCallbackData.Unlock(); return result; @@ -339,7 +342,7 @@ void TxRxThread::initializePipeline_() auto resampleForPlotPipeline = new AudioPipeline(inputSampleRate_, resampleForPlotStep->getOutputSampleRate()); resampleForPlotPipeline->appendPipelineStep(std::shared_ptr(resampleForPlotStep)); - auto resampleForPlotTap = new TapStep(inputSampleRate_, resampleForPlotPipeline); + auto resampleForPlotTap = new TapStep(inputSampleRate_, resampleForPlotPipeline, true); pipeline_->appendPipelineStep(std::shared_ptr(resampleForPlotTap)); // Tone interferer step (optional) @@ -366,7 +369,7 @@ void TxRxThread::initializePipeline_() inputSampleRate_, computeRfSpectrumStep->getOutputSampleRate()); computeRfSpectrumPipeline->appendPipelineStep(std::shared_ptr(computeRfSpectrumStep)); - auto computeRfSpectrumTap = new TapStep(inputSampleRate_, computeRfSpectrumPipeline); + auto computeRfSpectrumTap = new TapStep(inputSampleRate_, computeRfSpectrumPipeline, true); pipeline_->appendPipelineStep(std::shared_ptr(computeRfSpectrumTap)); // RX demodulation step @@ -378,7 +381,8 @@ void TxRxThread::initializePipeline_() []() { return g_channel_noise; }, []() { return wxGetApp().appConfiguration.noiseSNR; }, []() { return g_RxFreqOffsetHz; }, - []() { return &g_sig_pwr_av; } + []() { return &g_sig_pwr_av; }, + helper_ ); rfDemodulationPipeline->appendPipelineStep(std::shared_ptr(rfDemodulationStep)); @@ -455,7 +459,7 @@ void TxRxThread::initializePipeline_() auto resampleForPlotOutPipeline = new AudioPipeline(outputSampleRate_, resampleForPlotOutStep->getOutputSampleRate()); resampleForPlotOutPipeline->appendPipelineStep(std::shared_ptr(resampleForPlotOutStep)); - auto resampleForPlotOutTap = new TapStep(outputSampleRate_, resampleForPlotOutPipeline); + auto resampleForPlotOutTap = new TapStep(outputSampleRate_, resampleForPlotOutPipeline, true); pipeline_->appendPipelineStep(std::shared_ptr(resampleForPlotOutTap)); // Clear anything in the FIFO before resuming decode. @@ -467,6 +471,9 @@ void* TxRxThread::Entry() { initializePipeline_(); + // Request real-time scheduling from the operating system. + helper_->setHelperRealTime(); + while (m_run) { #if defined(__linux__) @@ -486,18 +493,23 @@ void* TxRxThread::Entry() } #endif - auto currentTime = std::chrono::steady_clock::now(); - if (!m_run) break; + + //log_info("thread woken up: m_tx=%d", (int)m_tx); + helper_->startRealTimeWork(); + if (m_tx) txProcessing_(); else rxProcessing_(); - - std::this_thread::sleep_until(currentTime + 10ms); + + helper_->stopRealTimeWork(); } // Force pipeline to delete itself when we're done with the thread. pipeline_ = nullptr; + // Return to normal scheduling + helper_->clearHelperRealTime(); + return NULL; } @@ -534,19 +546,13 @@ void TxRxThread::clearFifos_() auto used = codec2_fifo_used(cbData->outfifo1); if (used > 0) { - short* temp = new short[used]; - assert(temp != nullptr); - codec2_fifo_read(cbData->outfifo1, temp, used); - delete[] temp; + codec2_fifo_read(cbData->outfifo1, inputSamples_.get(), used); } used = codec2_fifo_used(cbData->infifo2); if (used > 0) { - short* temp = new short[used]; - assert(temp != nullptr); - codec2_fifo_read(cbData->infifo2, temp, used); - delete[] temp; + codec2_fifo_read(cbData->infifo2, inputSamples_.get(), used); } } else @@ -554,20 +560,14 @@ void TxRxThread::clearFifos_() auto used = codec2_fifo_used(cbData->infifo1); if (used > 0) { - short* temp = new short[used]; - assert(temp != nullptr); - codec2_fifo_read(cbData->infifo1, temp, used); - delete[] temp; + codec2_fifo_read(cbData->infifo1, inputSamples_.get(), used); } auto outFifo = (g_nSoundCards == 1) ? cbData->outfifo1 : cbData->outfifo2; used = codec2_fifo_used(outFifo); if (used > 0) { - short* temp = new short[used]; - assert(temp != nullptr); - codec2_fifo_read(outFifo, temp, used); - delete[] temp; + codec2_fifo_read(outFifo, inputSamples_.get(), used); } } } @@ -621,9 +621,6 @@ void TxRxThread::txProcessing_() int nsam_in_48 = freedvInterface.getTxNumSpeechSamples() * ((float)inputSampleRate_ / (float)freedvInterface.getTxSpeechSampleRate()); assert(nsam_in_48 > 0); - short* insound_card = new short[nsam_in_48]; - assert(insound_card != nullptr); - int nout; @@ -638,11 +635,11 @@ void TxRxThread::txProcessing_() // again in the decoded audio at the other end. // zero speech input just in case infifo2 underflows - memset(insound_card, 0, nsam_in_48*sizeof(short)); + memset(inputSamples_.get(), 0, nsam_in_48*sizeof(short)); // There may be recorded audio left to encode while ending TX. To handle this, // we keep reading from the FIFO until we have less than nsam_in_48 samples available. - int nread = codec2_fifo_read(cbData->infifo2, insound_card, nsam_in_48); + int nread = codec2_fifo_read(cbData->infifo2, inputSamples_.get(), nsam_in_48); if (nread != 0 && endingTx) { if (freedvInterface.getCurrentMode() >= FREEDV_MODE_RADE) @@ -656,9 +653,7 @@ void TxRxThread::txProcessing_() hasEooBeenSent_ = true; } - short* inputSamples = new short[1]; - auto inputSamplesPtr = std::shared_ptr(inputSamples, std::default_delete()); - auto outputSamples = pipeline_->execute(inputSamplesPtr, 0, &nout); + auto outputSamples = pipeline_->execute(inputSamples_, 0, &nout); if (nout > 0 && outputSamples.get() != nullptr) { log_debug("Injecting %d samples of resampled EOO into TX stream", nout); @@ -680,11 +675,7 @@ void TxRxThread::txProcessing_() hasEooBeenSent_ = false; } - short* inputSamples = new short[nsam_in_48]; - memcpy(inputSamples, insound_card, nsam_in_48 * sizeof(short)); - - auto inputSamplesPtr = std::shared_ptr(inputSamples, std::default_delete()); - auto outputSamples = pipeline_->execute(inputSamplesPtr, nsam_in_48, &nout); + auto outputSamples = pipeline_->execute(inputSamples_, nsam_in_48, &nout); if (g_dump_fifo_state) { log_info(" nout: %d", nout); @@ -696,7 +687,6 @@ void TxRxThread::txProcessing_() } } - delete[] insound_card; txModeChangeMutex.Unlock(); } else @@ -741,9 +731,6 @@ void TxRxThread::rxProcessing_() int nsam = (int)(inputSampleRate_ * FRAME_DURATION); assert(nsam > 0); - short* insound_card = new short[nsam]; - assert(insound_card != nullptr); - int nout; @@ -757,16 +744,12 @@ void TxRxThread::rxProcessing_() } // while we have enough input samples available ... - while (codec2_fifo_read(cbData->infifo1, insound_card, nsam) == 0 && processInputFifo) { + while (codec2_fifo_read(cbData->infifo1, inputSamples_.get(), nsam) == 0 && processInputFifo) { // send latest squelch level to FreeDV API, as it handles squelch internally freedvInterface.setSquelch(g_SquelchActive, g_SquelchLevel); - short* inputSamples = new short[nsam]; - memcpy(inputSamples, insound_card, nsam * sizeof(short)); - - auto inputSamplesPtr = std::shared_ptr(inputSamples, std::default_delete()); - auto outputSamples = pipeline_->execute(inputSamplesPtr, nsam, &nout); + auto outputSamples = pipeline_->execute(inputSamples_, nsam, &nout); auto outFifo = (g_nSoundCards == 1) ? cbData->outfifo1 : cbData->outfifo2; if (nout > 0 && outputSamples.get() != nullptr) @@ -779,6 +762,4 @@ void TxRxThread::rxProcessing_() (g_tx && wxGetApp().appConfiguration.monitorTxAudio) || (!g_voice_keyer_tx && ((g_half_duplex && !g_tx) || !g_half_duplex)); } - - delete[] insound_card; } diff --git a/src/pipeline/TxRxThread.h b/src/pipeline/TxRxThread.h index 7ebe47c9..2593a89f 100644 --- a/src/pipeline/TxRxThread.h +++ b/src/pipeline/TxRxThread.h @@ -29,6 +29,7 @@ #include #include "AudioPipeline.h" +#include "util/IRealtimeHelper.h" // Forward declarations class LinkStep; @@ -39,7 +40,7 @@ class LinkStep; class TxRxThread : public wxThread { public: - TxRxThread(bool tx, int inputSampleRate, int outputSampleRate, LinkStep* micAudioLink) + TxRxThread(bool tx, int inputSampleRate, int outputSampleRate, LinkStep* micAudioLink, std::shared_ptr helper) : wxThread(wxTHREAD_JOINABLE) , m_tx(tx) , m_run(1) @@ -48,9 +49,14 @@ public: , outputSampleRate_(outputSampleRate) , equalizedMicAudioLink_(micAudioLink) , hasEooBeenSent_(false) + , helper_(helper) { assert(inputSampleRate_ > 0); assert(outputSampleRate_ > 0); + + inputSamples_ = std::shared_ptr( + new short[std::max(inputSampleRate_, outputSampleRate_)], + std::default_delete()); } // thread execution starts here @@ -74,6 +80,8 @@ private: int outputSampleRate_; LinkStep* equalizedMicAudioLink_; bool hasEooBeenSent_; + std::shared_ptr helper_; + std::shared_ptr inputSamples_; void initializePipeline_(); void txProcessing_(); diff --git a/src/pipeline/test/TapTest.cpp b/src/pipeline/test/TapTest.cpp index 32939d90..20ad895f 100644 --- a/src/pipeline/test/TapTest.cpp +++ b/src/pipeline/test/TapTest.cpp @@ -19,7 +19,7 @@ public: bool tapDataEqual() { PassThroughStep* step = new PassThroughStep; - TapStep tapStep(8000, step); + TapStep tapStep(8000, step, false); int outputSamples = 0; short* pData = new short[1]; diff --git a/src/util/IRealtimeHelper.h b/src/util/IRealtimeHelper.h new file mode 100644 index 00000000..c7bd3b24 --- /dev/null +++ b/src/util/IRealtimeHelper.h @@ -0,0 +1,46 @@ +//========================================================================= +// Name: IRealtimeHelper.h +// Purpose: Defines the interface to a helper for real-time operation. +// +// Authors: Mooneer Salem +// License: +// +// All rights reserved. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License version 2.1, +// as published by the Free Software Foundation. This program is +// distributed in the hope that it will be useful, but WITHOUT ANY +// WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public +// License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, see . +// +//========================================================================= + + +#ifndef I_REALTIME_HELPER_H +#define I_REALTIME_HELPER_H + +class IRealtimeHelper +{ +public: + // Configures current thread for real-time priority. This should be + // called from the thread that will be operating on received audio. + virtual void setHelperRealTime() = 0; + + // Lets audio system know that we're beginning to do work with the + // received audio. + virtual void startRealTimeWork() = 0; + + // Lets audio system know that we're done with the work on the received + // audio. + virtual void stopRealTimeWork() = 0; + + // Reverts real-time priority for current thread. + virtual void clearHelperRealTime() = 0; +}; + +#endif \ No newline at end of file diff --git a/src/util/ThreadedObject.cpp b/src/util/ThreadedObject.cpp index f5d8cb87..6d862d06 100644 --- a/src/util/ThreadedObject.cpp +++ b/src/util/ThreadedObject.cpp @@ -20,8 +20,11 @@ // //========================================================================= +#include #include "ThreadedObject.h" +using namespace std::chrono_literals; + ThreadedObject::ThreadedObject() : isDestroying_(false) { @@ -37,10 +40,41 @@ ThreadedObject::~ThreadedObject() objectThread_.join(); } -void ThreadedObject::enqueue_(std::function fn) +void ThreadedObject::enqueue_(std::function fn, int timeoutMilliseconds) { - std::unique_lock lk(eventQueueMutex_); + std::unique_lock lk(eventQueueMutex_, std::defer_lock_t()); + + if (timeoutMilliseconds == 0) + { + lk.lock(); + } + else + { + auto beginTime = std::chrono::high_resolution_clock::now(); + auto endTime = std::chrono::high_resolution_clock::now(); + bool locked = false; + + do + { + if (lk.try_lock()) + { + locked = true; + break; + } + std::this_thread::sleep_for(1ms); + endTime = std::chrono::high_resolution_clock::now(); + } while ((endTime - beginTime) < std::chrono::milliseconds(timeoutMilliseconds)); + + if (!locked) + { + // could not lock, so we're not bothering to enqueue. + return; + } + } + eventQueue_.push_back(fn); + lk.unlock(); + eventQueueCV_.notify_one(); } diff --git a/src/util/ThreadedObject.h b/src/util/ThreadedObject.h index 3f2b5572..d0abc3ab 100644 --- a/src/util/ThreadedObject.h +++ b/src/util/ThreadedObject.h @@ -37,7 +37,9 @@ public: protected: ThreadedObject(); - void enqueue_(std::function fn); + // Enqueues some code to run on a different thread. + // @param timeoutMilliseconds Timeout to wait for lock. Note: if we can't get a lock within the timeout, the function doesn't run! + void enqueue_(std::function fn, int timeoutMilliseconds = 0); private: bool isDestroying_;