Juce named pipe code sanity check,
-
Hello, I would be eternally grateful if someone could help me with a Juce named pipe sanity check. This code will be ported to a third party scriptnode to take in signal from another instance and then I will move it along for processing. Namedpipe.read is a blocking call so I didn’t want to call it directly in processblock. So, I am running it in a new thread and filling up a fifo buffer that will be ingested in process block and written to the block being processed(no signal will be coming in to the scriptnode node) . Here is my juce class for namedpipereader and audioprocessor, the variables used are declared in header files not included here. It’s kind of hairy code and I’m not sure it will work, synchronization, race conditions etc. If you see any glaring or not so glaring issues I could fix I would be very grateful. I am an adequate programmer in general but audio processing is still kind of new to me.
#include "NamedPipeReader.h" NamedPipeReader::NamedPipeReader(const juce::String& pipeName, juce::AbstractFIFO& fifo, bool createNewPipeIfNeeded) : Thread("NamedPipeReaderThread"), namedPipe(), fifo(fifo), fifoBuffer(nullptr), bufferSize(0) { if (createNewPipeIfNeeded) { if (!namedPipe.openExistingPipe(pipeName)) { namedPipe.createNewPipe(pipeName); } } else { namedPipe.openExistingPipe(pipeName); } startThread(); } NamedPipeReader::~NamedPipeReader() { stopThread(5000); delete[] fifoBuffer; } void NamedPipeReader::setBufferSize(int newSize) { if (newSize != bufferSize) { bufferSize = newSize; delete[] fifoBuffer; fifoBuffer = new juce::uint8[bufferSize]; } } void NamedPipeReader::run() { while (!threadShouldExit()) { if (namedPipe.isOpen() && fifoBuffer) { if (dataAvailable.wait(10)) // Wait for data or timeout after 10 ms { juce::MemoryBlock buffer(bufferSize); int bytesRead = namedPipe.read(buffer.getData(), buffer.getSize(), 100); // 100ms timeout if (bytesRead > 0) { // Write data to the FIFO int start1, size1, start2, size2; fifo.prepareToWrite(bytesRead, start1, size1, start2, size2); if (size1 > 0) memcpy(fifoBuffer + start1, buffer.getData(), size1); if (size2 > 0) memcpy(fifoBuffer + start2, buffer.getData() + size1, size2); fifo.finishedWrite(size1 + size2); } } } } } void NamedPipeReader::writeData(const float* data, int numSamples) { if (namedPipe.isOpen()) { namedPipe.write(data, numSamples * sizeof(float)); signalDataAvailable(); // Signal that data is available } } void NamedPipeReader::signalDataAvailable() { dataAvailable.signal(); }
#include "MyAudioProcessor.h" MyAudioProcessor::MyAudioProcessor() : fifo(2048), namedPipeReader("MyNamedPipe", fifo, true), isMaster(false) // Set to true to create a new pipe if it doesn't exist { } void MyAudioProcessor::prepareToPlay(double sampleRate, int samplesPerBlock) { juce::ignoreUnused(sampleRate); setFifoSize(samplesPerBlock * sizeof(float)); // Set initial FIFO size } void MyAudioProcessor::processBlock(juce::dsp::AudioBlock<float>& audioBlock, juce::MidiBuffer&) { const int blockSize = audioBlock.getNumSamples(); const int numChannels = audioBlock.getNumChannels(); // Adjust FIFO size if necessary setFifoSize(blockSize * numChannels * sizeof(float)); if (isMaster) { // Check if the FIFO has enough data before copying if (fifo.getNumReady() >= blockSize * numChannels * sizeof(float)) { for (int channel = 0; channel < numChannels; ++channel) { int start1, size1, start2, size2; fifo.prepareToRead(blockSize * sizeof(float), start1, size1, start2, size2); if (size1 > 0) memcpy(audioBlock.getChannelPointer(channel), fifoBuffer + start1, size1); if (size2 > 0) memcpy(audioBlock.getChannelPointer(channel) + (size1 / sizeof(float)), fifoBuffer + start2, size2); fifo.finishedRead(size1 + size2); } } else { // If the FIFO does not have enough data, clear the audio block to silence audioBlock.clear(); } } else { // Write data to the named pipe if this instance is a slave for (int channel = 0; channel < numChannels; ++channel) { namedPipeReader.writeData(audioBlock.getChannelPointer(channel), blockSize); } } } void MyAudioProcessor::setIsMaster(bool master) { isMaster = master; } void MyAudioProcessor::setFifoSize(int newSize) { if (newSize != fifo.getTotalSize()) { fifo = juce::AbstractFIFO(newSize); namedPipeReader.setBufferSize(newSize); } }
-
Code looks OK on the first glance, although I wouldn‘t care too much about making it lock free, reading in a scratchbuffer while waiting for the pipe and the locking the access just for the short time you copy the buffer to the shared resource would be simpler.
But can you guarantee that the reader and writer are properly synced? What happens if the reader can‘t fill the FIFO fast enough? What happens if you run that in a multithreaded DAW which processes both plugins simultaneously?
-
@Christoph-Hart these are all questions I don’t have the answer to, I’ve been worried about sync. I am pretty new to audio processing but not programming so there are some things I’m trying to still get my head around. If the fifo buffer is not full when master goes to process you can see I’m just skipping that process block, but that creates an issue. Is there a better way I can look to sync the reader and writer? And resolve some of these glaring issues?
-
@jdurnil NamedPipe.read is a blocking call and could be put in the processblock of master and you could get rid of the fifo all together, then master process would not continue until buffer is full, but blocking processblock sounds like a bad idea
-
@Christoph-Hart I don't know if you noticed but I am using a cross thread waitable event to attempt to synchronize read/write. I am also not going to process any incoming buffer here in this node, I am using your sidechain2 snippet example to try and ensure that the pipe data is read in first before sending it over to be processed with any incoming buffer from master. I'll paste a few screenshots to explain, first the waitable, then the script fx. Does any of this help solve sync?
-
never mind about waitable event, I just found out it will not work across instances, back to the drawing board