(original) (raw)

//---------------------------------------------------------------------------------------------------------------------- // SaneCppAsyncStreams.h - Sane C++ AsyncStreams Library (single file build) //---------------------------------------------------------------------------------------------------------------------- // Dependencies: SaneCppAsync.h, SaneCppFile.h, SaneCppFileSystem.h, SaneCppFoundation.h, SaneCppSocket.h, SaneCppThreading.h // Version: release/2025/11 (cf7313e5) // LOC header: 502 (code) + 339 (comments) // LOC implementation: 1645 (code) + 283 (comments) // Documentation: https://pagghiu.github.io/SaneCppLibraries // Source Code: https://github.com/pagghiu/SaneCppLibraries //---------------------------------------------------------------------------------------------------------------------- // All copyrights and SPDX information for this library (each amalgamated section has its own copyright attributions): // Copyright (c) Stefano Cristiano // SPDX-License-Identifier: MIT //---------------------------------------------------------------------------------------------------------------------- #include "SaneCppAsync.h" #include "SaneCppFile.h" #include "SaneCppFileSystem.h" #include "SaneCppFoundation.h" #include "SaneCppSocket.h" #include "SaneCppThreading.h" #if !defined(SANE_CPP_ASYNCSTREAMS_HEADER) #define SANE_CPP_ASYNCSTREAMS_HEADER 1 //---------------------------------------------------------------------------------------------------------------------- // AsyncStreams/AsyncRequestStreams.h //---------------------------------------------------------------------------------------------------------------------- // Copyright (c) Stefano Cristiano // SPDX-License-Identifier: MIT //---------------------------------------------------------------------------------------------------------------------- // AsyncStreams/AsyncStreams.h //---------------------------------------------------------------------------------------------------------------------- // Copyright (c) Stefano Cristiano // SPDX-License-Identifier: MIT //---------------------------------------------------------------------------------------------------------------------- // AsyncStreams/Internal/CircularQueue.h //---------------------------------------------------------------------------------------------------------------------- // Copyright (c) Stefano Cristiano // SPDX-License-Identifier: MIT namespace SC { /// @brief A fixed size circular queue (also known as ring buffer) /// @note Uses only up to N-1 element slots of the passed in Span template struct CircularQueue { CircularQueue() = default; CircularQueue(Span buffer) : buffer(buffer) {} [[nodiscard]] bool isEmpty() const { return readIndex == writeIndex; } [[nodiscard]] bool pushBack(T&& request) { return pushBack(request); } [[nodiscard]] bool pushBack(T& request) { const uint32_t nextWriteIndex = (writeIndex + 1) % buffer.sizeInElements(); if (nextWriteIndex == readIndex) SC_LANGUAGE_UNLIKELY { return false; // Ring is full } buffer[writeIndex] = request; writeIndex = nextWriteIndex; return true; } [[nodiscard]] bool popFront(T& request) { if (isEmpty()) SC_LANGUAGE_UNLIKELY { return false; // Ring is empty } request = move(buffer[readIndex]); readIndex = (readIndex + 1) % buffer.sizeInElements(); return true; } [[nodiscard]] bool pushFront(T& request) { const uint32_t nextReadIndex = readIndex == 0 ? static_cast(buffer.sizeInElements()) - 1 : readIndex - 1; if (nextReadIndex == writeIndex) SC_LANGUAGE_UNLIKELY { return false; // Ring is full } buffer[nextReadIndex] = move(request); readIndex = nextReadIndex; return true; } private: Span buffer; uint32_t readIndex = 0; uint32_t writeIndex = 0; }; } // namespace SC //---------------------------------------------------------------------------------------------------------------------- // AsyncStreams/Internal/Event.h //---------------------------------------------------------------------------------------------------------------------- // Copyright (c) Stefano Cristiano // SPDX-License-Identifier: MIT namespace SC { //! @addtogroup group_foundation_utility //! @{ /// @brief Tracks multiple listeners that must be notified for an event that happened. /// Listeners can be removed with the integer written by Event::addListener in a parameter pointer. /// @note The ordering of listeners will __NOT__ be preserved under multiple add / remove template struct Event { /// @brief Emits the event, calling all registered listeners with the given parameters template void emit(U&&... t) { const auto numListenersCopy = numListeners; const auto listenersCopy = listeners; for (int idx = 0; idx < numListenersCopy; ++idx) { listenersCopy[idx](t...); } } /// @brief Adds a listener to this event, optionally saving the index to use for its removal /// @see Event::removeListener template [[nodiscard]] bool addListener(Class& pself, int* idx = nullptr) { if (numListeners + 1 > MaxListeners) { return false; } else { Function<void(t...)> func; func.template bind<class, memberfunction="">(pself); if (idx) { *idx = numListeners; } listeners[numListeners++] = move(func); return true; } } /// @brief Adds a listener to this event, optionally saving the index to use for its removal /// @see Event::removeListener template [[nodiscard]] bool addListener(Func&& func, int* idx = nullptr) { if (numListeners + 1 > MaxListeners) { return false; } else { if (idx) { *idx = numListeners; } listeners[numListeners++] = move(func); return true; } } template [[nodiscard]] bool removeListener(Class& pself) { Function<void(t...)> func; func.template bind<class, memberfunction="">(pself); for (int idx = 0; idx < numListeners; ++idx) { if (listeners[idx] == func) { return removeListenerAt(idx); } } return false; } template [[nodiscard]] bool removeAllListenersBoundTo(Class& pself) { bool someRemoved = false; for (int idx = 0; idx < numListeners; ++idx) { if (listeners[idx].isBoundToClassInstance(&pself)) { someRemoved |= removeListenerAt(idx); } } return someRemoved; } /// @brief Removes a listener where operator == evaluates to true for the passed in func template [[nodiscard]] bool removeListener(Func& func) { for (int idx = 0; idx < numListeners; ++idx) { if (listeners[idx] == func) { return removeListenerAt(idx); } } return false; } /// @brief Removes a listener at a given index /// @see Event::addListener [[nodiscard]] bool removeListenerAt(int idx) { if (idx < 0 or idx >= numListeners) return false; listeners[idx] = {}; if (idx + 1 != numListeners) { listeners[idx] = move(listeners[numListeners - 1]); } numListeners--; return true; } private: // Avoid pulling Array<t, n=""> to reduce inter-dependencies Function<void(t...)> listeners[MaxListeners]; int numListeners = 0; }; //! @} } // namespace SC //! @defgroup group_async_streams Async Streams //! Read, transform and write data concurrently from async sources to destinations. //! /// Read, writes and transforms happen in parallel if sources and destinations are asynchronous. /// This library does not allocate any memory, all buffers are supplied by the caller. /// /// Async Streams are largely inspired by [node.js Streams](https://nodejs.org/api/stream.html), a very powerful tool to /// process large amounts of data concurrently. /// /// The basic idea about an async stream is to create a Source / Sink abstraction (also called Readable and Writable) /// and process small buffers of data at time. /// /// The state machine that coordinates this interaction handles data buffering and more importantly handles also /// back-pressure, that means: /// /// - **Pausing** the readable stream when a connected writable stream cannot process data fast enough /// - **Resuming** the readable stream when a connected writable stream is finally able to receive more data /// /// By implementing streams on top of async operations it's possible to run many of them concurrently very efficiently. /// When properly implemented for example an async pipeline can concurrently read from disk, write to a socket while /// compressing data. /// /// /// Most notable differences with node.js streams are for now: /// /// - No allocation (designed to work inside user-provided list of buffers) /// - No object mode /// - Fixed Layout to create data pipelines (AsyncPipeline) /// - onData support only (no readable event) //! @addtogroup group_async_streams //! @{ namespace SC { /// @brief A Span of bytes memory to be read or written by async streams struct AsyncBufferView { struct ID { using NumericType = int32_t; NumericType identifier; static constexpr NumericType InvalidValue = -1; constexpr ID() : identifier(InvalidValue) {} explicit constexpr ID(int32_t value) : identifier(value) {} [[nodiscard]] constexpr bool operator==(ID other) const { return identifier == other.identifier; } }; enum class Type { Empty, Writable, ReadOnly, Growable, }; AsyncBufferView() { type = Type::Empty; } AsyncBufferView(Span data) : writableData(data) { type = Type::Writable; } AsyncBufferView(Span data) : readonlyData(data) { type = Type::ReadOnly; } /// @brief Saves a copy (or a moved instance) of a String / Buffer (or anything that works with GrowableBuffer) /// inside an AsyncBufferView in order to access its data later, as long as its size fits inside the inline storage. /// Destroying the AsyncBufferView will also destroy the copied / moved instance. template AsyncBufferView(T&& t) // universal reference, it can capture both lvalue and rvalue { type = Type::Growable; // Here we're type-erasing T in our own inline storage provided by a slightly oversized Function<> // that it will be able to construct (and destruct) the right GrowableBuffer from just a piece of storage // and return a pointer to the corresponding IGrowableBuffer* interface getGrowableBuffer = [t = forward(t)](GrowableStorage& storage, bool construct) mutable -> IGrowableBuffer* { using Type = typename TypeTraits::RemoveReference::type; if (construct) { placementNew(storage.reinterpret_as<growablebuffer>(), t); return &storage.reinterpret_as<growablebuffer>(); } else { dtor(storage.reinterpret_as<growablebuffer>()); return nullptr; } }; } template AsyncBufferView(const char (&literal)[N]) { readonlyData = {literal, N - 1}; type = Type::ReadOnly; } Type getType() const { return type; } private: static constexpr int TypeErasedCaptureSize = sizeof(void*) * 3; // This is enough to hold String / Buffer by copy static constexpr int TypeErasedGrowableSize = sizeof(void*) * 6; using GrowableStorage = AlignedStorage; Function<igrowablebuffer*(growablestorage&, bool),="" typeerasedcapturesize=""> getGrowableBuffer; union { Span writableData; Span readonlyData; }; union { Span originalWritableData; Span originalReadonlyData; }; friend struct AsyncBuffersPool; int32_t refs = 0; // Counts AsyncReadable (single) or AsyncWritable (multiple) using it Type type; }; /// @brief Holds a Span of AsyncBufferView (allocated by user) holding available memory for the streams /// @note User must fill the AsyncBuffersPool::buffers with a `Span` of AsyncBufferView struct AsyncBuffersPool { /// @brief Span of buffers to be filled in by the user Span buffers; /// @brief Increments a buffer reference count void refBuffer(AsyncBufferView::ID bufferID); /// @brief Decrements a buffer reference count. /// When reference count becomes zero the buffer will be re-used void unrefBuffer(AsyncBufferView::ID bufferID); /// @brief Access data span owned by the buffer Result getReadableData(AsyncBufferView::ID bufferID, Span& data); /// @brief Access data span owned by the buffer Result getWritableData(AsyncBufferView::ID bufferID, Span& data); /// @brief Access the raw AsyncBufferView (if any) at a given bufferID (or nullptr if invalid) AsyncBufferView* getBuffer(AsyncBufferView::ID bufferID); /// @brief Requests a new available buffer that is at least minimumSizeInBytes, incrementing its refcount Result requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID& bufferID, Span& data); /// @brief Sets the new size in bytes for the buffer void setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes); /// @brief Adds a buffer to the pool in any empty slot (found by scanning from start to end) Result pushBuffer(AsyncBufferView&& buffer, AsyncBufferView::ID& bufferID); }; /// @brief Async source abstraction emitting data events in caller provided byte buffers. /// After AsyncReadableStream::start it will start emitting AsyncReadableStream::eventData with buffers. /// User must provide a custom async red implementation in AsyncReadableStream::asyncRead. /// The stream must be paused when the AsyncBuffersPool is full (use AsyncReadableStream::getBufferOrPause). /// Once the stream is ended, it will emit AsyncReadableStream::eventEnd and it cannot be used further. /// AsyncReadableStream::eventError will be emitted when an error occurs in any phase. struct AsyncReadableStream { struct Request { AsyncBufferView::ID bufferID; }; /// @brief Function that every stream must define to implement its custom read operation Function<result()> asyncRead; static constexpr int MaxListeners = 8; Event<maxlisteners, result=""> eventError; /// Emitted when an error occurs Event<maxlisteners, asyncbufferview::id=""> eventData; /// Emitted when a new buffer has been read Event eventEnd; /// Emitted when there is no more data Event eventClose; /// Emitted when the underlying resource has been closed /// @brief Inits the readable stream with an AsyncBuffersPool instance that will provide memory for it /// @param buffersPool An instance of AsyncBuffersPool providing read buffers /// @param requests User owned memory to hold a circular buffer for read requests Result init(AsyncBuffersPool& buffersPool, Span requests); /// @brief Starts the readable stream, that will emit eventData Result start(); /// @brief Pauses the readable stream (that can be later resumed) void pause(); /// @brief Resumes the readable stream paused by AsyncReadableStream::pause void resumeReading(); /// @brief Forcefully destroys the readable stream before it's end event releasing all resources void destroy(); /// @brief Returns true if the stream is ended (AsyncReadableStream::end has been called) [[nodiscard]] bool isEnded() const { return state == State::Ended; } /// @brief Obtains the AsyncBuffersPool to request more buffers AsyncBuffersPool& getBuffersPool(); /// @brief Use push from inside AsyncReadableStream::asyncRead function to queue received data /// @return `true` if the caller can continue pushing [[nodiscard]] bool push(AsyncBufferView::ID bufferID, size_t newSize); /// @brief Use pushEnd from inside AsyncReadableStream::asyncRead to signal production end void pushEnd(); /// @brief Use reactivate(true) from inside AsyncReadableStream::asyncRead function to ask the /// state machine to invoke asyncRead again. void reactivate(bool doReactivate); /// @brief Signals an async error received void emitError(Result error); /// @brief Returns an unused buffer from pool or pauses the stream if none is available [[nodiscard]] bool getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID& bufferID, Span& data); private: void emitOnData(); void executeRead(); enum class State { Stopped, // Stream must be inited CanRead, // Stream is ready to issue a read ( AsyncReadableStream::start / AsyncReadableStream::resume) Reading, // A read is being issued (may be sync or async) SyncPushing, // One or multiple AsyncReadableStream::push have been received (sync) SyncReadMore, // SyncPushing + AsyncReadableStream::reactivate(true) AsyncReading, // An async read is in flight AsyncPushing, // AsyncReading + AsyncReadableStream::push Pausing, // Pause requested while read in flight Paused, // Actually paused with no read in flight Ended, // Emitted all data, no more data will be emitted Destroyed, // Readable has been destroyed before emitting all data Errored, // Error occurred }; State state = State::Stopped; AsyncBuffersPool* buffers = nullptr; CircularQueue readQueue; }; /// @brief Async destination abstraction where bytes can be written to. /// When buffers are pushed faster than the stream can handle, they will get queued. /// Queuing process happens with a linked list stored in the AsyncBufferView itself. /// As AsyncBufferView contains a fixed (at init) number of buffers, the queue is bounded /// by the fact that user will be unable to allocate buffers to write until at least one /// will be made available again (i.e. a write finishes). /// User can listen to AsyncWritableStream::eventWritten to know when a buffer is written /// (and its refcount decreased) or AsyncWritableStream::eventDrain when the queue is empty. struct AsyncWritableStream { /// @brief Function that every stream must define to implement its custom write operation Function<result(asyncbufferview::id, function<void(asyncbufferview::id)="">)> asyncWrite; struct Request { AsyncBufferView::ID bufferID; Function<void(asyncbufferview::id)> cb; }; static constexpr int MaxListeners = 8; Event<maxlisteners, result=""> eventError; /// Emitted when an error occurs Event eventDrain; /// Emitted when write queue is empty Event eventFinish; /// Emitted when no more data can be written /// @brief Inits the writable stream /// @param buffersPool An instance of AsyncBuffersPool providing write buffers /// @param requests User owned memory to hold a circular buffer for write requests Result init(AsyncBuffersPool& buffersPool, Span requests); /// @brief Writes a buffer (that must be allocated by the AsyncBuffersPool passed in AsyncWritableStream) /// When the buffer it will be actually written, AsyncWritableStream::eventWritten will be raised and /// its reference count will be decreased. /// @param bufferID Buffer allocated from the associated AsyncBuffersPool (AsyncWritableStream::getBuffersPool) /// @param cb Callback that will be invoked when the write is finished /// @return Invalid Result if write queue is full Result write(AsyncBufferView::ID bufferID, Function<void(asyncbufferview::id)> cb = {}); /// @brief Push a new buffer view to the queue, registering it with the allocator /// @return Invalid Result if write queue is full or if there are no available empty buffers slots in the pool Result write(AsyncBufferView&& bufferView, Function<void(asyncbufferview::id)> cb = {}); /// @brief Ends the writable stream, waiting for all in-flight and queued writes to finish. /// After this happens, AsyncWritableStream::eventFinished will be raised void end(); /// @brief Obtains the buffers pool to access its data AsyncBuffersPool& getBuffersPool(); /// @brief Signals that the given buffer (previously queued by write) has been fully written void finishedWriting(AsyncBufferView::ID bufferID, Function<void(asyncbufferview::id)>&& cb, Result res); /// @brief Resumes writing queued requests for this stream void resumeWriting(); /// @brief Puts back a buffer at the top of the write queue Result unshift(AsyncBufferView::ID bufferID, Function<void(asyncbufferview::id)>&& cb); /// @brief Signals an async error received void emitError(Result error); /// @brief Allows keeping a writable in ENDING state until it has finished flushing all pending data. /// If a writable stream redefines this function it should return true to allow transitioning to ENDED /// state and return false to keep staying in ENDING state. Function<bool()> canEndWritable; /// @brief Will emit error if the passed in Result is false void tryAsync(Result potentialError); void stop() { state = State::Stopped; } private: enum class State { Stopped, Writing, Ending, Ended }; State state = State::Stopped; AsyncBuffersPool* buffers = nullptr; CircularQueue writeQueue; }; /// @brief A stream that can both produce and consume buffers struct AsyncDuplexStream : public AsyncReadableStream, public AsyncWritableStream { AsyncDuplexStream(); Result init(AsyncBuffersPool& buffersPool, Span readableRequests, Span writableRequests); }; /// @brief A duplex stream that produces new buffers transforming received buffers struct AsyncTransformStream : public AsyncDuplexStream { AsyncTransformStream(); void afterProcess(Span inputAfter, Span outputAfter); void afterFinalize(Span outputAfter, bool streamEnded); Function<result(span, Span)> onProcess; Function<result(span)> onFinalize; private: Function<void(asyncbufferview::id)> inputCallback; Span inputData; Span outputData; AsyncBufferView::ID inputBufferID; AsyncBufferView::ID outputBufferID; Result transform(AsyncBufferView::ID bufferID, Function<void(asyncbufferview::id)> cb); Result prepare(AsyncBufferView::ID bufferID, Function<void(asyncbufferview::id)> cb); bool canEndTransform(); void tryFinalize(); enum class State { None, Paused, Processing, Finalizing, Finalized, }; State state = State::None; }; /// @brief Pipes read data from SC::AsyncReadableStream, forwarding them to SC::AsyncWritableStream. /// When the source provides data at a faster rate than what the sink (writable) is able to process, /// or when running out of buffers to read data into, AsyncPipeline will AsyncReadableStream::pause /// the source. This is called "back-pressure" handling in the Async Streams terminology. /// When a writable has finished writing, AsyncReadableStream::resume will be called to try un-pausing. /// Caller needs to set AsyncPipeline::source field and AsyncPipeline::sinks with valid streams. /// @note It's crucial to use the same AsyncBuffersPool for the AsyncReadableStream and all AsyncWritableStream struct AsyncPipeline { static constexpr int MaxListeners = 8; static constexpr int MaxTransforms = 8; static constexpr int MaxSinks = 8; AsyncPipeline() = default; AsyncPipeline(const AsyncPipeline&) = delete; AsyncPipeline(AsyncPipeline&&) = delete; AsyncPipeline& operator=(const AsyncPipeline&) = delete; AsyncPipeline& operator=(AsyncPipeline&&) = delete; ~AsyncPipeline(); AsyncReadableStream* source = nullptr; /// Provided source (must be != nullptr) AsyncDuplexStream* transforms[MaxTransforms] = {nullptr}; /// Provided transforms (optional, can be all nullptrs) AsyncWritableStream* sinks[MaxSinks] = {nullptr}; /// Provided sinks (at least one must be != nullptr) Event<maxlisteners, result=""> eventError = {}; /// Reports errors by source, transforms or sinks /// @brief Pipes source, transforms and sinks together /// @note Caller must have already setup source and sinks (and optionally transforms) Result pipe(); /// @brief Unregisters all events from source, transforms and sinks [[nodiscard]] bool unpipe(); /// @brief Starts the pipeline /// @note Both source and sinks must have been already setup by the caller Result start(); // TODO: Add a pause and cancel/step private: void emitError(Result res); Result checkBuffersPool(); Result chainTransforms(AsyncReadableStream*& readable); Result validate(); void asyncWriteWritable(AsyncBufferView::ID bufferID, AsyncWritableStream& writable); void dispatchToPipes(AsyncBufferView::ID bufferID); void endPipes(); void afterWrite(AsyncBufferView::ID bufferID); bool listenToEventData(AsyncReadableStream& readable, AsyncDuplexStream& transform, bool listen); }; } // namespace SC //! @} //! @addtogroup group_async_streams //! @{ namespace SC { template struct AsyncRequestReadableStream : public AsyncReadableStream { AsyncRequestReadableStream(); /// @brief Automatically closes descriptor during read stream close event void setAutoCloseDescriptor(bool value) { autoCloseDescriptor = value; } AsyncRequestType request; /// AsyncFileRead / AsyncFileWrite / AsyncSocketReceive / AsyncSocketSend protected: struct Internal; AsyncEventLoop* eventLoop = nullptr; bool autoCloseDescriptor = false; bool justUnrefBuffer = false; Result read(); void afterRead(typename AsyncRequestType::Result& result, AsyncBufferView::ID bufferID); void onCloseStopRequest(); }; template struct AsyncRequestWritableStream : public AsyncWritableStream { AsyncRequestWritableStream(); template Result init(AsyncBuffersPool& buffersPool, Span requests, AsyncEventLoop& eventLoop, const DescriptorType& descriptor); /// @brief Automatically closes descriptor during write stream finish event void setAutoCloseDescriptor(bool value) { autoCloseDescriptor = value; } AsyncRequestType request; /// AsyncFileRead / AsyncFileWrite / AsyncSocketReceive / AsyncSocketSend protected: struct Internal; AsyncEventLoop* eventLoop = nullptr; bool autoCloseDescriptor = false; bool justUnrefBuffer = false; Function<void(asyncbufferview::id)> callback; Result write(AsyncBufferView::ID bufferID, Function<void(asyncbufferview::id)> cb); void onFinishStopRequest(); }; /// @brief Uses an SC::AsyncFileRead to stream data from a file struct ReadableFileStream : public AsyncRequestReadableStream{ Result init(AsyncBuffersPool& buffersPool, Span requests, AsyncEventLoop& eventLoop, const FileDescriptor& descriptor); }; /// @brief Uses an SC::AsyncFileWrite to stream data to a file struct WritableFileStream : public AsyncRequestWritableStream{ Result init(AsyncBuffersPool& buffersPool, Span requests, AsyncEventLoop& eventLoop, const FileDescriptor& descriptor); }; /// @brief Uses an SC::AsyncFileWrite to stream data from a socket struct ReadableSocketStream : public AsyncRequestReadableStream{ Result init(AsyncBuffersPool& buffersPool, Span requests, AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor); }; /// @brief Uses an SC::AsyncFileWrite to stream data to a socket struct WritableSocketStream : public AsyncRequestWritableStream{ Result init(AsyncBuffersPool& buffersPool, Span requests, AsyncEventLoop& eventLoop, const SocketDescriptor& descriptor); }; } // namespace SC //! @} //---------------------------------------------------------------------------------------------------------------------- // AsyncStreams/ZLibTransformStreams.h //---------------------------------------------------------------------------------------------------------------------- // Copyright (c) Stefano Cristiano // SPDX-License-Identifier: MIT //---------------------------------------------------------------------------------------------------------------------- // AsyncStreams/Internal/ZLibStream.h //---------------------------------------------------------------------------------------------------------------------- // Copyright (c) Stefano Cristiano // SPDX-License-Identifier: MIT namespace SC { //! @addtogroup group_async_streams //! @{ /// @brief Compresses or decompresses byte streams using gzip, zlib or deflate. /// @n /// Data can be added until needed with SC::ZLibStream::process call. /// SC::ZLibStream::finalize will compute any end-of-stream data if needed. struct ZLibStream { enum Algorithm { CompressZLib, ///< Use ZLIB algorithm to compress DecompressZLib, ///< Use ZLIB algorithm to decompress CompressGZip, ///< Use GZIP algorithm to compress DecompressGZip, ///< Use GZIP algorithm to decompress CompressDeflate, ///< Use DEFLATE algorithm to compress DecompressDeflate ///< Use DEFLATE algorithm to decompress }; /// @brief Initializes a ZLibStream struct ZLibStream(); /// @brief Destroys an ZLibStream struct ~ZLibStream(); ZLibStream(const ZLibStream&) = delete; ZLibStream(ZLibStream&&) = delete; ZLibStream& operator=(const ZLibStream&) = delete; ZLibStream& operator=(ZLibStream&&) = delete; /// @brief Inits the compressor / decompressor with the required algorithm /// @param wantedAlgorithm Wanted algorithm (ZLIB, GZIP or DEFLATE with compression or decompression) /// @return Valid Result if the algorithm has been inited successfully Result init(Algorithm wantedAlgorithm); /// @brief Add data to be processed. Can be called multiple times before ZLibStream::finalize. /// @param input Span containing data to be processed, that will be modified pointing to data not (yet) /// processed due to insufficient output space. /// @param output Writable memory receiving processed data. It will then point to unused memory. /// @return Valid Result if data has been processed successfully Result process(Span& input, Span& output); /// @brief Finalize stream by computing CRC or similar footers if needed (depending on the choosen Algorithm) /// @param output Writable memory receiving processed data. It will then point to unused memory. /// @param streamEnded Will be set to `true` if the stream has ended. /// @return Valid Result if no error has happened during finalization Result finalize(Span& output, bool& streamEnded); private: struct Internal; AlignedStorage<112> buffer; enum class State { Constructed, Inited, }; State state = State::Constructed; Algorithm algorithm = Algorithm::CompressZLib; }; //! @} } // namespace SC namespace SC { struct SyncZLibTransformStream : public AsyncDuplexStream { SyncZLibTransformStream(); ZLibStream stream; private: size_t consumedInputBytes = 0; Result transform(AsyncBufferView::ID bufferID, Function<void(asyncbufferview::id)> cb); bool canEndTransform(); }; struct AsyncZLibTransformStream : public AsyncTransformStream { AsyncZLibTransformStream(); ZLibStream stream; AsyncLoopWork asyncWork; void setEventLoop(AsyncEventLoop& loop) { eventLoop = &loop; } private: AsyncEventLoop* eventLoop = nullptr; Result compressExecute(Span input, Span output); Result compressFinalize(Span output); void afterWork(AsyncLoopWork::Result& result); Result work(); bool finalizing = false; bool streamEnded = false; Span savedInput; Span savedOutput; }; } // namespace SC #endif // SANE_CPP_ASYNCSTREAMS_HEADER #if defined(SANE_CPP_IMPLEMENTATION) && !defined(SANE_CPP_ASYNCSTREAMS_IMPLEMENTATION) #define SANE_CPP_ASYNCSTREAMS_IMPLEMENTATION 1 //---------------------------------------------------------------------------------------------------------------------- // AsyncStreams/AsyncRequestStreams.cpp //---------------------------------------------------------------------------------------------------------------------- // Copyright (c) Stefano Cristiano // SPDX-License-Identifier: MIT //------------------------------------------------------------------------------------------------------- // AsyncRequestReadableStream //------------------------------------------------------------------------------------------------------- template struct SC::AsyncRequestReadableStream::Internal { static bool isEnded(AsyncFileRead::Result& result) { return result.completionData.endOfFile; } static bool isEnded(AsyncSocketReceive::Result& result) { return result.completionData.disconnected; } template static auto& getDescriptor(T& async) { return async.handle; } static Result closeDescriptor(AsyncFileRead& async) { return detail::FileDescriptorDefinition::releaseHandle(async.handle); } static Result closeDescriptor(AsyncSocketReceive& async) { return detail::SocketDescriptorDefinition::releaseHandle(async.handle); } template static Result init(AsyncRequestReadableStream& self, AsyncBuffersPool& buffersPool, Span requests, AsyncEventLoop& eventLoop, const DescriptorType& descriptor) { self.eventLoop = &eventLoop; SC_TRY(descriptor.get(Internal::getDescriptor(self.request), Result::Error("Missing descriptor"))); return self.AsyncReadableStream::init(buffersPool, requests); } }; template SC::AsyncRequestReadableStream::AsyncRequestReadableStream() { AsyncReadableStream::asyncRead.bind<asyncrequestreadablestream, &asyncrequestreadablestream::read="">(*this); (void)AsyncReadableStream::eventClose .addListener<asyncrequestreadablestream, &asyncrequestreadablestream::onclosestoprequest="">(*this); } template void SC::AsyncRequestReadableStream::onCloseStopRequest() { if (not request.isFree()) { SC::Result res = SC::Result::Error("Fake error to unref bufferID"); typename AsyncReadRequest::Result result(*eventLoop, request, res, nullptr); justUnrefBuffer = true; request.callback(result); // will free bufferID request.stop(*eventLoop); justUnrefBuffer = false; } if (autoCloseDescriptor) { Result res = Internal::closeDescriptor(request); if (not res) { emitError(res); } } } template SC::Result SC::AsyncRequestReadableStream::read() { SC_ASSERT_RELEASE(request.isFree()); AsyncBufferView::ID bufferID; if (getBufferOrPause(0, bufferID, request.buffer)) { request.callback = [this, bufferID](typename AsyncReadRequest::Result& result) { afterRead(result, bufferID); }; SC_TRY_MSG(eventLoop != nullptr, "AsyncRequestReadableStream eventLoop == nullptr"); const Result startResult = request.start(*eventLoop); if (not startResult) { getBuffersPool().unrefBuffer(bufferID); return startResult; // Error occurred during request start } } return Result(true); } template void SC::AsyncRequestReadableStream::afterRead(typename AsyncReadRequest::Result& result, AsyncBufferView::ID bufferID) { Span data; if (result.get(data)) { SC_ASSERT_RELEASE(request.isFree()); if (Internal::isEnded(result)) { getBuffersPool().unrefBuffer(bufferID); AsyncReadableStream::pushEnd(); } else { const bool continuePushing = AsyncReadableStream::push(bufferID, data.sizeInBytes()); SC_ASSERT_RELEASE(result.getAsync().isFree()); getBuffersPool().unrefBuffer(bufferID); // Check if we're still pushing (so not, paused, destroyed or errored etc.) if (continuePushing) { if (getBufferOrPause(0, bufferID, result.getAsync().buffer)) { request.callback = [this, bufferID](typename AsyncReadRequest::Result& result) { afterRead(result, bufferID); }; result.reactivateRequest(true); // Stream is in AsyncPushing mode and SC::AsyncResult::reactivateRequest(true) will cause more // data to be delivered here, so it's not necessary calling AsyncReadableStream::reactivate(true). } } } } else { getBuffersPool().unrefBuffer(bufferID); if (justUnrefBuffer) return; AsyncReadableStream::emitError(result.isValid()); } } //------------------------------------------------------------------------------------------------------- // AsyncRequestWritableStream //------------------------------------------------------------------------------------------------------- template struct SC::AsyncRequestWritableStream::Internal { template static auto& getDescriptor(T& async) { return async.handle; } static Result closeDescriptor(AsyncFileWrite& async) { return detail::FileDescriptorDefinition::releaseHandle(async.handle); } static Result closeDescriptor(AsyncSocketSend& async) { return detail::SocketDescriptorDefinition::releaseHandle(async.handle); } template static Result init(AsyncRequestWritableStream& self, AsyncBuffersPool& buffersPool, Span requests, AsyncEventLoop& eventLoop, const DescriptorType& descriptor) { self.eventLoop = &eventLoop; SC_TRY(descriptor.get(Internal::getDescriptor(self.request), Result::Error("Missing descriptor"))); return self.AsyncWritableStream::init(buffersPool, requests); } }; template SC::AsyncRequestWritableStream::AsyncRequestWritableStream() { AsyncWritableStream::asyncWrite.bind<asyncrequestwritablestream, &asyncrequestwritablestream::write="">(*this); (void)AsyncWritableStream::eventFinish .addListener<asyncrequestwritablestream, &asyncrequestwritablestream::onfinishstoprequest="">(*this); } template void SC::AsyncRequestWritableStream::onFinishStopRequest() { if (not request.isFree()) { SC::Result res = SC::Result::Error("Fake error to unref bufferID"); typename AsyncWriteRequest::Result result(*eventLoop, request, res, nullptr); justUnrefBuffer = true; request.callback(result); // will free bufferID request.stop(*eventLoop); justUnrefBuffer = false; } if (autoCloseDescriptor) { Result res = Internal::closeDescriptor(request); if (not res) { emitError(res); } } } template SC::Result SC::AsyncRequestWritableStream::write(AsyncBufferView::ID bufferID, Function<void(asyncbufferview::id)> cb) { SC_ASSERT_RELEASE(not callback.isValid()); callback = move(cb); SC_TRY(getBuffersPool().getReadableData(bufferID, request.buffer)); request.callback = [this, bufferID](typename AsyncWriteRequest::Result& result) { getBuffersPool().unrefBuffer(bufferID); if (justUnrefBuffer) return; auto callbackCopy = move(callback); callback = {}; AsyncWritableStream::finishedWriting(bufferID, move(callbackCopy), result.isValid()); }; SC_TRY_MSG(eventLoop != nullptr, "AsyncRequestReadableStream eventLoop == nullptr"); const Result res = request.start(*eventLoop); if (res) { getBuffersPool().refBuffer(bufferID); } return res; } namespace SC { SC_COMPILER_EXTERN template struct AsyncRequestReadableStream; SC_COMPILER_EXTERN template struct AsyncRequestReadableStream; SC_COMPILER_EXTERN template struct AsyncRequestWritableStream; SC_COMPILER_EXTERN template struct AsyncRequestWritableStream; Result ReadableSocketStream::init(AsyncBuffersPool& buffersPool, Span requests, AsyncEventLoop& loop, const SocketDescriptor& descriptor) { return Internal::init(*this, buffersPool, requests, loop, descriptor); } Result WritableSocketStream::init(AsyncBuffersPool& buffersPool, Span requests, AsyncEventLoop& loop, const SocketDescriptor& descriptor) { return Internal::init(*this, buffersPool, requests, loop, descriptor); } Result ReadableFileStream::init(AsyncBuffersPool& buffersPool, Span requests, AsyncEventLoop& loop, const FileDescriptor& descriptor) { return Internal::init(*this, buffersPool, requests, loop, descriptor); } Result WritableFileStream::init(AsyncBuffersPool& buffersPool, Span requests, AsyncEventLoop& loop, const FileDescriptor& descriptor) { return Internal::init(*this, buffersPool, requests, loop, descriptor); } } // namespace SC //---------------------------------------------------------------------------------------------------------------------- // AsyncStreams/AsyncStreams.cpp //---------------------------------------------------------------------------------------------------------------------- // Copyright (c) Stefano Cristiano // SPDX-License-Identifier: MIT namespace SC { //------------------------------------------------------------------------------------------------------- // AsyncBufferView //------------------------------------------------------------------------------------------------------- void AsyncBuffersPool::refBuffer(AsyncBufferView::ID bufferID) { AsyncBufferView* buffer = getBuffer(bufferID); SC_ASSERT_RELEASE(buffer); buffer->refs++; } void AsyncBuffersPool::unrefBuffer(AsyncBufferView::ID bufferID) { AsyncBufferView* buffer = getBuffer(bufferID); SC_ASSERT_RELEASE(buffer); SC_ASSERT_RELEASE(buffer->refs != 0); buffer->refs--; if (buffer->refs == 0) { switch (buffer->type) { case AsyncBufferView::Type::Writable: buffer->writableData = buffer->originalWritableData; break; case AsyncBufferView::Type::ReadOnly: buffer->type = AsyncBufferView::Type::Empty; break; case AsyncBufferView::Type::Growable: buffer->type = AsyncBufferView::Type::Empty; break; case AsyncBufferView::Type::Empty: Assert::unreachable(); break; } } } Result AsyncBuffersPool::getReadableData(AsyncBufferView::ID bufferID, Span& data) { AsyncBufferView* buffer = getBuffer(bufferID); SC_TRY_MSG(buffer != nullptr, "AsyncBuffersPool::getData - Invalid bufferID"); switch (buffer->type) { case AsyncBufferView::Type::Writable: data = buffer->writableData; break; case AsyncBufferView::Type::ReadOnly: data = buffer->readonlyData; break; case AsyncBufferView::Type::Growable: { AsyncBufferView::GrowableStorage storage; auto da = buffer->getGrowableBuffer(storage, true)->getDirectAccess(); data = {static_cast<char*>(da.data), da.sizeInBytes}; (void)buffer->getGrowableBuffer(storage, false); // destruct break; } case AsyncBufferView::Type::Empty: Assert::unreachable(); break; } return Result(true); } Result AsyncBuffersPool::getWritableData(AsyncBufferView::ID bufferID, Span& data) { AsyncBufferView* buffer = getBuffer(bufferID); SC_TRY_MSG(buffer != nullptr, "AsyncBuffersPool::getWritableData - Invalid bufferID"); SC_TRY_MSG(buffer->type == AsyncBufferView::Type::Writable, "AsyncBuffersPool::getWritableData - Readonly buffer"); data = buffer->writableData; return Result(true); } AsyncBufferView* AsyncBuffersPool::getBuffer(AsyncBufferView::ID bufferID) { if (bufferID.identifier >= 0 and buffers.sizeInElements() > unsigned(bufferID.identifier)) { AsyncBufferView* buffer = &buffers[unsigned(bufferID.identifier)]; return buffer->type != AsyncBufferView::Type::Empty ? buffer : nullptr; } return nullptr; } Result AsyncBuffersPool::requestNewBuffer(size_t minimumSizeInBytes, AsyncBufferView::ID& bufferID, Span& data) { for (AsyncBufferView& buffer : buffers) { if (buffer.type == AsyncBufferView::Type::Empty) continue; if (buffer.refs == 0 and buffer.writableData.sizeInBytes() >= minimumSizeInBytes) { buffer.refs = 1; switch (buffer.type) { case AsyncBufferView::Type::Writable: buffer.originalWritableData = buffer.writableData; break; case AsyncBufferView::Type::ReadOnly: buffer.originalReadonlyData = buffer.readonlyData; break; case AsyncBufferView::Type::Growable: { AsyncBufferView::GrowableStorage storage; auto da = buffer.getGrowableBuffer(storage, true)->getDirectAccess(); buffer.writableData = {static_cast<char*>(da.data), da.sizeInBytes}; buffer.originalWritableData = {static_cast<char*>(da.data), da.sizeInBytes}; (void)buffer.getGrowableBuffer(storage, false); // destruct break; } case AsyncBufferView::Type::Empty: SC_ASSERT_RELEASE(false); break; } bufferID = AsyncBufferView::ID(static_cast(&buffer - buffers.begin())); return getWritableData(bufferID, data); } } return Result::Error("AsyncBuffersPool::requestNewBuffer failed"); } void AsyncBuffersPool::setNewBufferSize(AsyncBufferView::ID bufferID, size_t newSizeInBytes) { AsyncBufferView* buffer = getBuffer(bufferID); if (buffer != nullptr) { switch (buffer->type) { case AsyncBufferView::Type::Writable: if ((newSizeInBytes < buffer->originalWritableData.sizeInBytes())) { buffer->writableData = {buffer->writableData.data(), newSizeInBytes}; } break; case AsyncBufferView::Type::ReadOnly: if ((newSizeInBytes < buffer->originalReadonlyData.sizeInBytes())) { buffer->readonlyData = {buffer->readonlyData.data(), newSizeInBytes}; } break; case AsyncBufferView::Type::Growable: { AsyncBufferView::GrowableStorage storage; IGrowableBuffer* growable = buffer->getGrowableBuffer(storage, true); if (growable->resizeWithoutInitializing(newSizeInBytes)) { auto da = growable->getDirectAccess(); buffer->writableData = {static_cast<char*>(da.data), da.sizeInBytes}; buffer->originalWritableData = {static_cast<char*>(da.data), da.sizeInBytes}; } (void)buffer->getGrowableBuffer(storage, false); // destruct break; } break; case AsyncBufferView::Type::Empty: SC_ASSERT_RELEASE(false); break; } } } Result AsyncBuffersPool::pushBuffer(AsyncBufferView&& buffer, AsyncBufferView::ID& bufferID) { for (size_t idx = 0; idx < buffers.sizeInElements(); ++idx) { if (buffers[idx].getType() == AsyncBufferView::Type::Empty) { buffer.refs = 0; buffers[idx] = buffer; bufferID = AsyncBufferView::ID(static_cast(idx)); return Result(true); } } return Result::Error("pushBuffer failed"); } //------------------------------------------------------------------------------------------------------- // AsyncReadableStream //------------------------------------------------------------------------------------------------------- Result AsyncReadableStream::init(AsyncBuffersPool& buffersPool, Span requests) { SC_TRY_MSG(state == State::Stopped or state == State::Destroyed or state == State::Ended, "AsyncReadableStream::init - Can be called only in Stopped / Destroyed / Ended state") buffers = &buffersPool; readQueue = requests; state = State::CanRead; return Result(true); } Result AsyncReadableStream::start() { SC_TRY_MSG(state == State::CanRead, "Can start only in CanRead state") executeRead(); return Result(true); } void AsyncReadableStream::emitOnData() { Request request; while (readQueue.popFront(request)) { eventData.emit(request.bufferID); buffers->unrefBuffer(request.bufferID); // 1b. refBuffer in push } } bool AsyncReadableStream::push(AsyncBufferView::ID bufferID, size_t newSize) { if (newSize == 0) { emitError(Result::Error("AsyncReadableStream::push zero sized buffer is not allowed")); return false; } // Push buffer to the queue buffers->setNewBufferSize(bufferID, newSize); Request request; request.bufferID = bufferID; if (not readQueue.pushBack(request)) { state = State::Errored; emitError(Result::Error("AsyncReadableStream::push dropping buffer")); return false; } buffers->refBuffer(bufferID); // 1a. unrefBuffer in emitOnData() switch (state) { case State::SyncPushing: case State::Reading: { emitOnData(); state = State::SyncPushing; } break; case State::AsyncPushing: case State::AsyncReading: { emitOnData(); if (state == State::AsyncReading) { state = State::AsyncPushing; } } break; case State::Pausing: { // Process buffers received while Pausing is being propagated upstream emitOnData(); } break; default: { emitError(Result::Error("AsyncReadableStream::push - called in wrong state")); } break; } return state == State::AsyncPushing or state == State::SyncPushing; } void AsyncReadableStream::reactivate(bool doReactivate) { switch (state) { case State::SyncPushing: { if (doReactivate) { state = State::SyncReadMore; } else { state = State::CanRead; } } break; case State::AsyncPushing: { if (doReactivate) { executeRead(); // -> State::Reading } else { state = State::CanRead; } } break; default: { emitError(Result::Error("AsyncReadableStream::reactivate - called in wrong state")); } } } void AsyncReadableStream::pause() { switch (state) { case State::Reading: case State::AsyncReading: case State::SyncPushing: case State::AsyncPushing: { state = State::Pausing; } break; default: { emitError(Result::Error("AsyncReadableStream::pause - called in wrong state")); } } } void AsyncReadableStream::resumeReading() { switch (state) { case State::Pausing: case State::Paused: { executeRead(); // -> State::Reading emitOnData(); } break; case State::CanRead: { executeRead(); // -> State::Reading } break; case State::Stopped: case State::Errored: { emitError(Result::Error("AsyncReadableStream::resume - called in wrong state")); } break; case State::Ended: break; default: break; // Ignore resume requests while reading } } void AsyncReadableStream::destroy() { switch (state) { case State::CanRead: case State::SyncPushing: case State::SyncReadMore: case State::Paused: case State::Pausing: case State::Reading: case State::AsyncPushing: case State::AsyncReading: state = State::Destroyed; eventClose.emit(); break; case State::Destroyed: emitError(Result::Error("AsyncReadableStream::destroy - already destroyed")); break; case State::Ended: emitError(Result::Error("AsyncReadableStream::destroy - already ended")); break; case State::Stopped: emitError(Result::Error("AsyncReadableStream::destroy - already stopped")); break; case State::Errored: emitError(Result::Error("AsyncReadableStream::destroy - already in error state")); break; } } void AsyncReadableStream::executeRead() { state = State::Reading; while (true) { const Result res = asyncRead(); if (res) { switch (state) { case State::SyncReadMore: // push + reactivate(true) have been called synchronously (inside this method) state = State::Reading; continue; // loop calling one more asyncRead case State::Reading: // push + reactivate(...) have not been called so this becomes an async call state = State::AsyncReading; break; case State::SyncPushing: state = State::Errored; emitError(Result::Error("Forgot to call reactivate({true || false}) from asyncRead")); break; default: break; } } else { state = State::Errored; emitError(res); } break; } } void AsyncReadableStream::pushEnd() { switch (state) { case State::CanRead: case State::Reading: case State::SyncPushing: case State::SyncReadMore: case State::Paused: case State::AsyncPushing: case State::AsyncReading: case State::Pausing: // In all these state we can just end directly state = State::Ended; eventEnd.emit(); eventClose.emit(); break; case State::Destroyed: emitError(Result::Error("AsyncReadableStream::pushEnd - stream is destroyed")); break; case State::Ended: emitError(Result::Error("AsyncReadableStream::pushEnd - stream already ended")); break; case State::Stopped: emitError(Result::Error("AsyncReadableStream::pushEnd - stream is not even inited")); break; case State::Errored: emitError(Result::Error("AsyncReadableStream::pushEnd - stream is in error state")); break; } } AsyncBuffersPool& AsyncReadableStream::getBuffersPool() { return *buffers; } void AsyncReadableStream::emitError(Result error) { eventError.emit(error); } bool AsyncReadableStream::getBufferOrPause(size_t minumumSizeInBytes, AsyncBufferView::ID& bufferID, Span& data) { if (getBuffersPool().requestNewBuffer(minumumSizeInBytes, bufferID, data)) { return true; } else { pause(); return false; } } //------------------------------------------------------------------------------------------------------- // AsyncWritableStream //------------------------------------------------------------------------------------------------------- Result AsyncWritableStream::init(AsyncBuffersPool& buffersPool, Span requests) { SC_TRY_MSG(state == State::Stopped or state == State::Ended, "AsyncWritableStream::init - Can be called only in Stopped or Ended states"); buffers = &buffersPool; writeQueue = requests; if (state == State::Ended) { state = State::Stopped; } return Result(true); } Result AsyncWritableStream::write(AsyncBufferView::ID bufferID, Function<void(asyncbufferview::id)> cb) { if (state == State::Ended or state == State::Ending) { return Result::Error("AsyncWritableStream::write - failed (ending or ended state)"); } Request request; request.bufferID = bufferID; request.cb = move(cb); if (not writeQueue.pushBack(request)) { return Result::Error("AsyncWritableStream::write - queue is full"); } buffers->refBuffer(bufferID); // 2a. unrefBuffer below or in finishedWriting resumeWriting(); return Result(true); } Result AsyncWritableStream::write(AsyncBufferView&& bufferView, Function<void(asyncbufferview::id)> cb) { AsyncBufferView::ID bufferID; SC_TRY(buffers->pushBuffer(move(bufferView), bufferID)); return write(bufferID, cb); } void AsyncWritableStream::resumeWriting() { switch (state) { case State::Stopped: { Request request; if (writeQueue.popFront(request)) { state = State::Writing; tryAsync(asyncWrite(request.bufferID, request.cb)); buffers->unrefBuffer(request.bufferID); // 2b. refBuffer above } } break; case State::Writing: { // This is fine, it has already been queued } break; case State::Ending: if (not canEndWritable.isValid() or canEndWritable()) { eventFinish.emit(); state = State::Ended; } break; case State::Ended: break; } } Result AsyncWritableStream::unshift(AsyncBufferView::ID bufferID, Function<void(asyncbufferview::id)>&& cb) { Request request; request.cb = move(cb); request.bufferID = bufferID; buffers->refBuffer(bufferID); // Let's push this request in front instead of to the back SC_TRY_MSG(writeQueue.pushFront(request), "unshift failed"); return Result(true); } void AsyncWritableStream::tryAsync(Result potentialError) { if (not potentialError) { eventError.emit(potentialError); } } void AsyncWritableStream::finishedWriting(AsyncBufferView::ID bufferID, Function<void(asyncbufferview::id)>&& callback, Result res) { SC_ASSERT_RELEASE(state == State::Writing or state == State::Ending); if (not res) { eventError.emit(res); } bool emitDrain = false; Request request; if (writeQueue.popFront(request)) { tryAsync(asyncWrite(request.bufferID, request.cb)); buffers->unrefBuffer(request.bufferID); // 2c. refbuffer in AsyncWritable::write } else { // Queue is empty if (state == State::Ending) { if (not canEndWritable.isValid() or canEndWritable()) { state = State::Ended; } } else { state = State::Stopped; emitDrain = true; } } if (callback.isValid()) { callback(bufferID); } if (state == State::Ended) { eventFinish.emit(); } else if (emitDrain) { eventDrain.emit(); } } void AsyncWritableStream::end() { switch (state) { case State::Stopped: if (canEndWritable.isValid()) { if (canEndWritable()) { state = State::Ended; eventFinish.emit(); } else { state = State::Ending; } } else { // Can just jump to ended state state = State::Ended; eventFinish.emit(); } break; case State::Writing: // We need to wait for current in-flight write to end state = State::Ending; break; case State::Ending: case State::Ended: { // Invalid state, already ended or already ending eventError.emit(Result::Error("AsyncWritableStream::end - already called")); } break; } } AsyncBuffersPool& AsyncWritableStream::getBuffersPool() { return *buffers; } void AsyncWritableStream::emitError(Result error) { eventError.emit(error); } //------------------------------------------------------------------------------------------------------- // AsyncDuplexStream //------------------------------------------------------------------------------------------------------- AsyncDuplexStream::AsyncDuplexStream() { asyncRead.bind([] { return Result(true); }); } Result AsyncDuplexStream::init(AsyncBuffersPool& buffersPool, Span readableRequests, Span writableRequests) { SC_TRY(AsyncReadableStream::init(buffersPool, readableRequests)); SC_TRY(AsyncWritableStream::init(buffersPool, writableRequests)); return Result(true); } //------------------------------------------------------------------------------------------------------- // AsyncTransformStream //------------------------------------------------------------------------------------------------------- AsyncTransformStream::AsyncTransformStream() { using Self = AsyncTransformStream; AsyncWritableStream::asyncWrite.bind<self, &self::transform="">(*this); AsyncWritableStream::canEndWritable.bind<self, &self::canendtransform="">(*this); } Result AsyncTransformStream::transform(AsyncBufferView::ID bufferID, Function<void(asyncbufferview::id)> cb) { switch (state) { case State::None: { SC_TRY(AsyncReadableStream::getBuffersPool().getReadableData(bufferID, inputData)); return prepare(bufferID, cb); } case State::Paused: { SC_TRY_MSG(bufferID == inputBufferID, "Logical Error") return prepare(bufferID, cb); } case State::Finalized: { return Result::Error("Transform cannot be called during Finalized State"); } case State::Processing: { return Result::Error("Transform cannot be called during Processing State"); } case State::Finalizing: { return Result::Error("Transform cannot be called during Finalizing State"); } } return Result(true); } Result AsyncTransformStream::prepare(AsyncBufferView::ID bufferID, Function<void(asyncbufferview::id)> cb) { inputCallback = move(cb); inputBufferID = bufferID; if (getBufferOrPause(0, outputBufferID, outputData)) { state = State::Processing; return onProcess(inputData, outputData); } else { state = State::Paused; SC_TRY(AsyncWritableStream::unshift(inputBufferID, move(inputCallback))); AsyncWritableStream::stop(); return Result(true); } } void AsyncTransformStream::afterProcess(Span inputAfter, Span outputAfter) { const size_t consumedOutput = outputData.sizeInBytes() - outputAfter.sizeInBytes(); if (consumedOutput > 0) { // Ignore whatever push returns because later on the stream is either finalizing or pausing either way (void)AsyncReadableStream::push(outputBufferID, consumedOutput); } AsyncReadableStream::getBuffersPool().unrefBuffer(outputBufferID); if (inputAfter.empty()) { auto cb = move(inputCallback); auto bufferID = inputBufferID; inputCallback = {}; inputBufferID = {}; inputData = {}; outputBufferID = {}; outputData = {}; state = State::None; AsyncWritableStream::finishedWriting(bufferID, move(cb), Result(true)); } else { inputData = inputAfter; state = State::Paused; tryAsync(transform(inputBufferID, inputCallback)); } } void AsyncTransformStream::afterFinalize(Span outputAfter, bool streamEnded) { const size_t consumedOutput = outputData.sizeInBytes() - outputAfter.sizeInBytes(); if (consumedOutput > 0) { // Ignore whatever push returns because later on the stream is either finalizing or pausing either way (void)AsyncReadableStream::push(outputBufferID, consumedOutput); } AsyncReadableStream::getBuffersPool().unrefBuffer(outputBufferID); if (streamEnded) { AsyncReadableStream::pushEnd(); state = State::Finalized; // --> Transition to ENDED (all data written) } else { state = State::Paused; tryFinalize(); } } void AsyncTransformStream::tryFinalize() { if (getBufferOrPause(0, outputBufferID, outputData)) { state = State::Finalizing; // Retry later when we can get some memory Result res = onFinalize(outputData); if (not res) { AsyncWritableStream::emitError(Result::Error("AsyncTransformStream::onFinalize error")); state = State::None; // --> Transition to ENDED (unrecoverable error) } } else { state = State::Paused; // Retry later when we can get some memory } } bool AsyncTransformStream::canEndTransform() { switch (state) { case State::None: case State::Paused: { tryFinalize(); return state == State::Finalized; } case State::Finalized: { return true; } case State::Finalizing: case State::Processing: { return false; // Still processing stuff } } Assert::unreachable(); } //------------------------------------------------------------------------------------------------------- // AsyncPipeline //------------------------------------------------------------------------------------------------------- AsyncPipeline::~AsyncPipeline() { SC_ASSERT_DEBUG(unpipe()); } Result AsyncPipeline::validate() { int validSinks = 0; for (size_t idx = 0; idx < MaxSinks; ++idx) { if (sinks[idx] != nullptr) validSinks++; } SC_TRY_MSG(source != nullptr, "AsyncPipeline::validate() invalid source"); SC_TRY_MSG(validSinks > 0, "AsyncPipeline::validate() invalid 0 sized list of sinks"); return Result(true); } Result AsyncPipeline::pipe() { SC_TRY(validate()); SC_TRY(checkBuffersPool()); AsyncReadableStream* readable = source; SC_TRY(chainTransforms(readable)); bool res; res = readable->eventData.addListener<asyncpipeline, &asyncpipeline::dispatchtopipes="">(*this); SC_TRY_MSG(res, "AsyncPipeline::pipe() run out of eventData"); res = readable->eventEnd.addListener<asyncpipeline, &asyncpipeline::endpipes="">(*this); SC_TRY_MSG(res, "AsyncPipeline::pipe() run out of eventEnd"); res = readable->eventError.addListener<asyncpipeline, &asyncpipeline::emiterror="">(*this); SC_TRY_MSG(res, "AsyncPipeline::pipe() run out of eventError"); for (AsyncWritableStream* sink : sinks) { if (sink == nullptr) break; res = sink->eventError.addListener<asyncpipeline, &asyncpipeline::emiterror="">(*this); SC_TRY_MSG(res, "AsyncPipeline::pipe() pipe run out of eventError"); } return Result(true); } bool AsyncPipeline::unpipe() { bool res = true; // Deregister all source events if (source) { int validTransforms = 0; for (size_t idx = 0; idx < MaxTransforms; ++idx) { if (transforms[idx] != nullptr) validTransforms++; } if (validTransforms == 0) { res = source->eventData.removeAllListenersBoundTo(*this); SC_TRY(res); res = source->eventEnd.removeAllListenersBoundTo(*this); SC_TRY(res); } else { AsyncWritableStream& writable = *transforms[0]; res = listenToEventData(*source, *transforms[0], false); SC_TRY(res); res = source->eventEnd.removeAllListenersBoundTo(writable); SC_TRY(res); } res = source->eventError.removeAllListenersBoundTo(*this); SC_TRY(res); source = nullptr; } // Deregister all transforms events for (size_t idx = 0; idx < MaxTransforms; ++idx) { AsyncDuplexStream* transform = transforms[idx]; if (transform == nullptr) break; if ((idx + 1 == MaxTransforms) or transforms[idx + 1] == nullptr) { res = transform->eventData.removeAllListenersBoundTo(*this); SC_TRY(res); res = transform->eventEnd.removeAllListenersBoundTo(*this); SC_TRY(res); break; } else { AsyncDuplexStream& nextTransform = *transforms[idx + 1]; AsyncWritableStream& nextWritable = nextTransform; res = listenToEventData(*transform, nextTransform, false); SC_TRY(res); res = source->eventEnd.removeAllListenersBoundTo(nextWritable); SC_TRY(res); } res = transform->AsyncReadableStream::eventError.removeAllListenersBoundTo(*this); SC_TRY(res); res = transform->AsyncWritableStream::eventError.removeAllListenersBoundTo(*this); SC_TRY(res); } for (size_t idx = 0; idx < MaxTransforms; ++idx) transforms[idx] = nullptr; // Deregister all sinks events for (AsyncWritableStream* sink : sinks) { if (sink == nullptr) break; res = sink->eventError.removeListener<asyncpipeline, &asyncpipeline::emiterror="">(*this); SC_TRY(res); } for (size_t idx = 0; idx < MaxSinks; ++idx) sinks[idx] = nullptr; return true; } Result AsyncPipeline::start() { SC_TRY(validate()); for (auto transform : transforms) { if (transform == nullptr) break; SC_TRY(transform->start()); } SC_TRY(source->start()); return Result(true); } void AsyncPipeline::emitError(Result res) { eventError.emit(res); } Result AsyncPipeline::checkBuffersPool() { AsyncBuffersPool& buffers = source->getBuffersPool(); for (AsyncWritableStream* sink : sinks) { if (sink == nullptr) break; if (&sink->getBuffersPool() != &buffers) { return Result::Error("AsyncPipeline::start - all streams must use the same AsyncBuffersPool"); } } for (AsyncDuplexStream* transform : transforms) { if (transform == nullptr) break; if ((&transform->AsyncReadableStream::getBuffersPool() != &buffers) and (&transform->AsyncWritableStream::getBuffersPool() != &buffers)) { return Result::Error("AsyncPipeline::start - all streams must use the same AsyncBuffersPool"); } } return Result(true); } bool AsyncPipeline::listenToEventData(AsyncReadableStream& readable, AsyncDuplexStream& transform, bool listen) { AsyncDuplexStream* pTransform = &transform; auto lambda = [this, pTransform](AsyncBufferView::ID bufferID) { // Write readable to transform asyncWriteWritable(bufferID, *pTransform); }; if (listen) { return readable.eventData.addListener(lambda); } else { return readable.eventData.removeListener(lambda); } } Result AsyncPipeline::chainTransforms(AsyncReadableStream*& readable) { for (AsyncDuplexStream* transform : transforms) { if (transform == nullptr) { break; } bool res; res = listenToEventData(*readable, *transform, true); SC_TRY_MSG(res, "AsyncPipeline::chainTransforms run out of eventData"); AsyncWritableStream& writable = *transform; res = readable->eventEnd.addListener<asyncwritablestream, &asyncwritablestream::end="">(writable); SC_TRY_MSG(res, "AsyncPipeline::chainTransforms run out of eventEnd"); res = readable->eventError.addListener<asyncpipeline, &asyncpipeline::emiterror="">(*this); SC_TRY_MSG(res, "AsyncPipeline::chainTransforms run out of eventError"); readable = transform; res = transform->AsyncReadableStream::eventError.addListener<asyncpipeline, &asyncpipeline::emiterror="">(*this); SC_TRY_MSG(res, "AsyncPipeline::chainTransforms run out of eventError"); res = transform->AsyncWritableStream::eventError.addListener<asyncpipeline, &asyncpipeline::emiterror="">(*this); SC_TRY_MSG(res, "AsyncPipeline::chainTransforms run out of eventError"); } return Result(true); } void AsyncPipeline::asyncWriteWritable(AsyncBufferView::ID bufferID, AsyncWritableStream& writable) { source->getBuffersPool().refBuffer(bufferID); Function<void(asyncbufferview::id)> func; func.template bind<asyncpipeline, &asyncpipeline::afterwrite="">(*this); // TODO: We should probably block when closing for in-flight writes Result res = writable.write(bufferID, func); if (not res) { eventError.emit(res); } } void AsyncPipeline::afterWrite(AsyncBufferView::ID bufferID) { source->getBuffersPool().unrefBuffer(bufferID); // Try resume in reverse for (size_t idx = 0; idx < MaxTransforms; ++idx) { AsyncDuplexStream* transform = transforms[MaxTransforms - 1 - idx]; if (transform) { transform->resumeWriting(); transform->resumeReading(); } } source->resumeReading(); } void AsyncPipeline::dispatchToPipes(AsyncBufferView::ID bufferID) { for (AsyncWritableStream* sink : sinks) { if (sink == nullptr) break; asyncWriteWritable(bufferID, *sink); } } void AsyncPipeline::endPipes() { for (AsyncWritableStream* sink : sinks) { if (sink == nullptr) break; sink->end(); } } } // namespace SC //---------------------------------------------------------------------------------------------------------------------- // AsyncStreams/ZLibTransformStreams.cpp //---------------------------------------------------------------------------------------------------------------------- // Copyright (c) Stefano Cristiano // SPDX-License-Identifier: MIT //---------------------------------------------------------------------------------------------------------------------- // AsyncStreams/Internal/ZLibStream.inl //---------------------------------------------------------------------------------------------------------------------- // Copyright (c) Stefano Cristiano // SPDX-License-Identifier: MIT //---------------------------------------------------------------------------------------------------------------------- // AsyncStreams/Internal/ZLibAPI.h //---------------------------------------------------------------------------------------------------------------------- // Copyright (c) Stefano Cristiano // SPDX-License-Identifier: MIT namespace SC { struct ZLibAPI; } struct SC::ZLibAPI { struct Stream { using alloc_func = void* (*)(void* opaque, unsigned int items, unsigned int size); using free_func = void (*)(void* opaque, void* address); const unsigned char* next_in; // Next input byte unsigned int avail_in; // Number of bytes available at next_in unsigned long total_in; // Total number of input bytes read so far unsigned char* next_out; // Next output byte unsigned int avail_out; // Number of bytes available at next_out unsigned long total_out; // Total number of output bytes written so far const char* msg; // Last error message, NULL if no error void* state; // Internal state, not used directly by applications alloc_func* zalloc; // Custom memory allocation function (or NULL) free_func* zfree; // Custom memory deallocation function (or NULL) void* opaque; // Opaque data passed to zalloc and zfree int data_type; // Type of data (e.g., binary, text) unsigned long adler; // Adler-32 checksum of the uncompressed data unsigned long reserved; // Reserved for future use }; static constexpr int MaxBits = 15; static constexpr const char* Version = "1.2.12"; enum Flush : int { NO_FLUSH = 0, PARTIAL_FLUSH = 1, SYNC_FLUSH = 2, FULL_FLUSH = 3, FINISH = 4, BLOCK = 5, TREES = 6, }; enum Error : int { OK = 0, STREAM_END = 1, NEED_DICT = 2, ERRNO = -1, STREAM_ERROR = -2, DATA_ERROR = -3, MEM_ERROR = -4, BUF_ERROR = -5, VERSION_ERROR = -6, }; enum Compression : int { NO_COMPRESSION = 0, BEST_SPEED = 1, BEST_COMPRESSION = 9, DEFAULT_COMPRESSION = -1, }; enum Strategy : int { FILTERED = 1, HUFFMAN_ONLY = 2, RLE = 3, FIXED = 4, DEFAULT_STRATEGY = 0, }; enum Method : int { DEFLATED = 8, }; Result load(const char* libPath = nullptr); void unload(); Error deflate(Stream& strm, Flush flag) { return pDeflate(&strm, flag); } Error inflate(Stream& strm, Flush flag) { return pInflate(&strm, flag); } Error inflateEnd(Stream& strm) { return pInflateEnd(&strm); } Error deflateEnd(Stream& strm) { return pDeflateEnd(&strm); } Error deflateInit2(Stream& strm, Compression level, Method method, int windowBits, int memLevel, Strategy strategy) { return pDeflateInit2(&strm, level, method, windowBits, memLevel, strategy, Version, static_cast(sizeof(Stream))); } Error inflateInit2(Stream& strm, int windowBits) { return pInflateInit2(&strm, windowBits, Version, static_cast(sizeof(Stream))); } private: #if SC_PLATFORM_WINDOWS #define SC_ZLIB_API_CC __stdcall #else #define SC_ZLIB_API_CC #endif // Function pointers for zlib functions Error(SC_ZLIB_API_CC* pDeflate)(void* strm, Flush flush) = nullptr; Error(SC_ZLIB_API_CC* pDeflateEnd)(void* strm) = nullptr; Error(SC_ZLIB_API_CC* pInflate)(void* strm, Flush flush) = nullptr; Error(SC_ZLIB_API_CC* pInflateEnd)(void* strm) = nullptr; Error(SC_ZLIB_API_CC* pDeflateInit2)(void* strm, Compression level, Method method, int windowBits, int memLevel, Strategy strategy, const char* version, int stream_size) = nullptr; Error(SC_ZLIB_API_CC* pInflateInit2)(void* strm, int windowBits, const char* version, int stream_size) = nullptr; #undef SC_ZLIB_API_CC // Handle for dynamic library void* library = nullptr; int refCount = 0; struct Internal; }; //---------------------------------------------------------------------------------------------------------------------- // AsyncStreams/Internal/ZLibAPI.inl //---------------------------------------------------------------------------------------------------------------------- // Copyright (c) Stefano Cristiano // SPDX-License-Identifier: MIT #ifdef _WIN32 #include #include #else #include #endif struct SC::ZLibAPI::Internal { template static Result loadSymbol(ZLibAPI& zlib, Func& sym, const char* name) { #ifdef _WIN32 HMODULE hmodule; memcpy(&hmodule, &zlib.library, sizeof(HMODULE)); auto func = ::GetProcAddress(hmodule, name); memcpy(&sym, &func, sizeof(func)); static_assert(sizeof(Func) == sizeof(void*), ""); #else sym = reinterpret_cast(::dlsym(zlib.library, name)); #endif if (!sym) { return Result::Error("Failed to load zlib symbol"); } return Result(true); } #if _WIN32 static Result GetClrCompressionPath(char* pathBuffer, size_t bufferSize) { const char* subKey = "SOFTWARE\\Microsoft\\NET Framework Setup\\NDP\\v4\\Full"; const char* valueName = "InstallPath"; HKEY hKey; DWORD valueType; DWORD valueSize = static_cast(bufferSize); // Open the registry key if (::RegOpenKeyExA(HKEY_LOCAL_MACHINE, subKey, 0, KEY_READ, &hKey) != ERROR_SUCCESS) { return Result::Error("GetClrCompressionPath: Failed to open registry key."); } // Query the InstallPath value if (::RegQueryValueExA(hKey, valueName, NULL, &valueType, (LPBYTE)pathBuffer, &valueSize) != ERROR_SUCCESS) { ::RegCloseKey(hKey); return Result::Error("GetClrCompressionPath: Failed to read registry value"); } ::RegCloseKey(hKey); // Check the type of the registry value if (valueType != REG_SZ) { return Result::Error("GetClrCompressionPath: Unexpected registry value type."); } // Construct the path to clrcompression.dll if (::strcat_s(pathBuffer, bufferSize, "clrcompression.dll") != 0) { return Result::Error("GetClrCompressionPath: Not enough space"); } return Result(true); } #endif }; SC::Result SC::ZLibAPI::load(const char* libPath) { if (library != nullptr) { refCount++; return Result(true); } #if _WIN32 const char* zlib_library_path = "zlib1.dll"; #elif __APPLE__ const char* zlib_library_path = "libz.dylib"; #else const char* zlib_library_path = "libz.so.1"; #endif if (libPath == nullptr) { libPath = zlib_library_path; } #ifdef _WIN32 library = ::LoadLibraryA(libPath); if (library == nullptr) { char clr_zlib_path[MAX_PATH] = {0}; SC_TRY(Internal::GetClrCompressionPath(clr_zlib_path, sizeof(clr_zlib_path))); library = ::LoadLibraryA(clr_zlib_path); } #else library = ::dlopen(libPath, RTLD_NOW); #endif if (!library) { return Result::Error("Failed to load zlib library"); } // Load functions SC_TRY(Internal::loadSymbol(*this, pDeflate, "deflate")); SC_TRY(Internal::loadSymbol(*this, pDeflateEnd, "deflateEnd")); SC_TRY(Internal::loadSymbol(*this, pInflate, "inflate")); SC_TRY(Internal::loadSymbol(*this, pInflateEnd, "inflateEnd")); SC_TRY(Internal::loadSymbol(*this, pDeflateInit2, "deflateInit2_")); SC_TRY(Internal::loadSymbol(*this, pInflateInit2, "inflateInit2_")); refCount = 1; return Result(true); } void SC::ZLibAPI::unload() { --refCount; if (refCount > 0) { return; } if (library) { #ifdef _WIN32 HMODULE hmodule; memcpy(&hmodule, &library, sizeof(HMODULE)); ::FreeLibrary(hmodule); #else ::dlclose(library); #endif library = nullptr; } } static SC::ZLibAPI zlib; struct SC::ZLibStream::Internal { static Result compress(ZLibAPI::Stream& stream, Span& input, Span& output) { stream.next_in = reinterpret_cast(input.data()); stream.avail_in = static_cast(input.sizeInBytes()); stream.next_out = reinterpret_cast<uint8_t*>(output.data()); stream.avail_out = static_cast(output.sizeInBytes()); const auto result = zlib.deflate(stream, ZLibAPI::Flush::NO_FLUSH); const auto offsetOut = output.sizeInBytes() - stream.avail_out; const auto offsetIn = input.sizeInBytes() - stream.avail_in; const bool outSliceOk = output.sliceStart(offsetOut, output); const bool inSliceOk = input.sliceStart(offsetIn, input); SC_TRY_MSG(inSliceOk and outSliceOk, "compress sliceStart"); switch (result) { case ZLibAPI::OK: // All good return Result(true); case ZLibAPI::BUF_ERROR: return Result::Error("BUF_ERROR"); case ZLibAPI::STREAM_END: return Result::Error("STREAM_END"); case ZLibAPI::NEED_DICT: return Result::Error("NEED_DICT"); case ZLibAPI::ERRNO: return Result::Error("ERRNO"); case ZLibAPI::STREAM_ERROR: return Result::Error("STREAM_ERROR"); case ZLibAPI::DATA_ERROR: return Result::Error("DATA_ERROR"); case ZLibAPI::MEM_ERROR: return Result::Error("MEM_ERROR"); case ZLibAPI::VERSION_ERROR: return Result::Error("VERSION_ERROR"); } return Result::Error("UNKNOWN"); } static Result compressFinalize(ZLibAPI::Stream& stream, Span& output, bool& streamEnded) { stream.next_in = nullptr; stream.avail_in = 0; stream.next_out = reinterpret_cast<uint8_t*>(output.data()); stream.avail_out = static_cast(output.sizeInBytes()); const auto result = zlib.deflate(stream, ZLibAPI::Flush::FINISH); const auto offsetOut = output.sizeInBytes() - stream.avail_out; const bool slicesOk = output.sliceStart(offsetOut, output); SC_TRY_MSG(slicesOk, "compressFinalize sliceStart"); streamEnded = result == ZLibAPI::Error::STREAM_END; switch (result) { case ZLibAPI::OK: // All good case ZLibAPI::BUF_ERROR: // Returned when output space is insufficient case ZLibAPI::STREAM_END: // Stream Ended return Result(true); case ZLibAPI::NEED_DICT: return Result::Error("NEED_DICT"); case ZLibAPI::ERRNO: return Result::Error("ERRNO"); case ZLibAPI::STREAM_ERROR: return Result::Error("STREAM_ERROR"); case ZLibAPI::DATA_ERROR: return Result::Error("DATA_ERROR"); case ZLibAPI::MEM_ERROR: return Result::Error("MEM_ERROR"); case ZLibAPI::VERSION_ERROR: return Result::Error("VERSION_ERROR"); } return Result::Error("UNKNOWN"); } static Result decompress(ZLibAPI::Stream& stream, Span& input, Span& output) { stream.next_in = reinterpret_cast(input.data()); stream.avail_in = static_cast(input.sizeInBytes()); stream.next_out = reinterpret_cast<uint8_t*>(output.data()); stream.avail_out = static_cast(output.sizeInBytes()); const auto result = zlib.inflate(stream, ZLibAPI::Flush::NO_FLUSH); const auto offsetOut = output.sizeInBytes() - stream.avail_out; const auto offsetIn = input.sizeInBytes() - stream.avail_in; const bool outSliceOk = output.sliceStart(offsetOut, output); const bool inSliceOk = input.sliceStart(offsetIn, input); SC_TRY_MSG(inSliceOk and outSliceOk, "decompress sliceStart"); switch (result) { case ZLibAPI::OK: // All good case ZLibAPI::STREAM_END: // Stream ended return Result(true); case ZLibAPI::BUF_ERROR: return Result::Error("BUF_ERROR"); case ZLibAPI::NEED_DICT: return Result::Error("NEED_DICT"); case ZLibAPI::ERRNO: return Result::Error("ERRNO"); case ZLibAPI::STREAM_ERROR: return Result::Error("STREAM_ERROR"); case ZLibAPI::DATA_ERROR: return Result::Error("DATA_ERROR"); case ZLibAPI::MEM_ERROR: return Result::Error("MEM_ERROR"); case ZLibAPI::VERSION_ERROR: return Result::Error("VERSION_ERROR"); } return Result::Error("UNKNOWN"); } static Result decompressFinalize(ZLibAPI::Stream& stream, Span& output, bool& streamEnded) { // Intentionally not resetting next_in / avail_in, that can contain leftover data to process stream.next_out = reinterpret_cast<uint8_t*>(output.data()); stream.avail_out = static_cast(output.sizeInBytes()); const auto result = zlib.inflate(stream, ZLibAPI::Flush::FINISH); const auto offsetOut = output.sizeInBytes() - stream.avail_out; const bool slicesOk = output.sliceStart(offsetOut, output); SC_TRY_MSG(slicesOk, "decompressFinalize sliceStart"); streamEnded = result == ZLibAPI::Error::STREAM_END; switch (result) { case ZLibAPI::OK: // All good case ZLibAPI::BUF_ERROR: // Returned when output space is insufficient case ZLibAPI::STREAM_END: // Stream Ended return Result(true); case ZLibAPI::NEED_DICT: return Result::Error("NEED_DICT"); case ZLibAPI::ERRNO: return Result::Error("ERRNO"); case ZLibAPI::STREAM_ERROR: return Result::Error("STREAM_ERROR"); case ZLibAPI::DATA_ERROR: return Result::Error("DATA_ERROR"); case ZLibAPI::MEM_ERROR: return Result::Error("MEM_ERROR"); case ZLibAPI::VERSION_ERROR: return Result::Error("VERSION_ERROR"); } return Result::Error("UNKNOWN"); } }; SC::ZLibStream::ZLibStream() {} SC::ZLibStream::~ZLibStream() { ZLibAPI::Stream& stream = buffer.reinterpret_as(); switch (algorithm) { case Algorithm::CompressZLib: case Algorithm::CompressGZip: case Algorithm::CompressDeflate: // Cleanup compression data zlib.deflateEnd(stream); break; case Algorithm::DecompressZLib: case Algorithm::DecompressGZip: case Algorithm::DecompressDeflate: // Cleanup decompression data zlib.inflateEnd(stream); break; } zlib.unload(); } SC::Result SC::ZLibStream::init(Algorithm wantedAlgorithm) { ZLibAPI::Stream& stream = buffer.reinterpret_as(); SC_TRY_MSG(state == State::Constructed, "Init can be called only in State::Constructed"); SC_TRY(zlib.load()); algorithm = wantedAlgorithm; // Initialize the stream stream.zalloc = nullptr; stream.zfree = nullptr; stream.opaque = nullptr; ZLibAPI::Error ret = ZLibAPI::Error::STREAM_ERROR; switch (wantedAlgorithm) { case Algorithm::CompressZLib: // Zlib requires deflateInit2 and ZLibAPI::MaxBits ret = zlib.deflateInit2(stream, ZLibAPI::DEFAULT_COMPRESSION, ZLibAPI::DEFLATED, ZLibAPI::MaxBits, 8, ZLibAPI::DEFAULT_STRATEGY); break; case Algorithm::CompressGZip: // GZip requires deflateInit2 and 16 + ZLibAPI::MaxBits ret = zlib.deflateInit2(stream, ZLibAPI::DEFAULT_COMPRESSION, ZLibAPI::DEFLATED, 16 + ZLibAPI::MaxBits, 8, ZLibAPI::DEFAULT_STRATEGY); break; case Algorithm::CompressDeflate: // Deflate requires deflateInit2 and 16 + ZLibAPI::MaxBits ret = zlib.deflateInit2(stream, ZLibAPI::DEFAULT_COMPRESSION, ZLibAPI::DEFLATED, -ZLibAPI::MaxBits, 8, ZLibAPI::DEFAULT_STRATEGY); break; case Algorithm::DecompressZLib: // Zlib requires inflateInit2 and ZLibAPI::MaxBits ret = zlib.inflateInit2(stream, ZLibAPI::MaxBits); break; case Algorithm::DecompressGZip: // GZip requires inflateInit2 and 16 + ZLibAPI::MaxBits ret = zlib.inflateInit2(stream, 16 + ZLibAPI::MaxBits); break; case Algorithm::DecompressDeflate: // Deflate requires inflateInit2 and 16 + ZLibAPI::MaxBits ret = zlib.inflateInit2(stream, -ZLibAPI::MaxBits); break; } if (ret == ZLibAPI::Error::OK) { state = State::Inited; return Result(true); } return Result::Error("ZLibStream::Init failed"); } SC::Result SC::ZLibStream::process(Span& input, Span& output) { SC_TRY_MSG(not output.empty(), "ZLibStream::process empty output is not allowed"); ZLibAPI::Stream& stream = buffer.reinterpret_as(); switch (algorithm) { case Algorithm::CompressZLib: case Algorithm::CompressGZip: case Algorithm::CompressDeflate: // Compression return Internal::compress(stream, input, output); case Algorithm::DecompressZLib: case Algorithm::DecompressGZip: case Algorithm::DecompressDeflate: // Decompression return Internal::decompress(stream, input, output); } Assert::unreachable(); } SC::Result SC::ZLibStream::finalize(Span& output, bool& streamEnded) { ZLibAPI::Stream& stream = buffer.reinterpret_as(); switch (algorithm) { case Algorithm::CompressZLib: case Algorithm::CompressGZip: case Algorithm::CompressDeflate: // Compression finalization return Internal::compressFinalize(stream, output, streamEnded); case Algorithm::DecompressZLib: case Algorithm::DecompressGZip: case Algorithm::DecompressDeflate: // Decompression finalization return Internal::decompressFinalize(stream, output, streamEnded); } Assert::unreachable(); } //------------------------------------------------------------------------------------------------------- // SyncZLibTransformStream //------------------------------------------------------------------------------------------------------- SC::SyncZLibTransformStream::SyncZLibTransformStream() { using Self = SyncZLibTransformStream; AsyncWritableStream::asyncWrite.bind<self, &self::transform="">(*this); AsyncWritableStream::canEndWritable.bind<self, &self::canendtransform="">(*this); } SC::Result SC::SyncZLibTransformStream::transform(AsyncBufferView::ID bufferID, Function<void(asyncbufferview::id)> cb) { // This function will either process the bufferID fully OR it will unshift the buffer, that means placing it // again on top of the AsyncWritableStream write queue Span sourceData; SC_TRY(AsyncReadableStream::getBuffersPool().getReadableData(bufferID, sourceData)); Span inputData; SC_TRY(sourceData.sliceStart(consumedInputBytes, inputData)); while (not inputData.empty()) { AsyncBufferView::ID outputBufferID; Span outputData; bool continuePushing = true; if (getBufferOrPause(0, outputBufferID, outputData) and continuePushing) { const size_t outputBefore = outputData.sizeInBytes(); const size_t inputBefore = inputData.sizeInBytes(); const Result result = stream.process(inputData, outputData); if (not result) { AsyncReadableStream::getBuffersPool().unrefBuffer(outputBufferID); return result; } const size_t consumedInput = inputBefore - inputData.sizeInBytes(); const size_t consumedOutput = outputBefore - outputData.sizeInBytes(); consumedInputBytes += consumedInput; if (consumedOutput > 0) { continuePushing = AsyncReadableStream::push(outputBufferID, consumedOutput); } AsyncReadableStream::getBuffersPool().unrefBuffer(outputBufferID); } else { SC_TRY(AsyncWritableStream::unshift(bufferID, move(cb))); AsyncWritableStream::stop(); return Result(true); } } auto callbackCopy = move(cb); cb = {}; AsyncWritableStream::finishedWriting(bufferID, move(callbackCopy), Result(true)); consumedInputBytes = 0; return Result(true); } bool SC::SyncZLibTransformStream::canEndTransform() { // Loop to get buffers in order to finish finalizing the stream // If there are no buffers, return true to signal AsyncWritableStream // we need to hold the "Ending" state of the state machine, to finish // writing this last trail of transformed data. AsyncBufferView::ID outputBufferID; Span outputBefore; bool continuePushing = true; while (getBufferOrPause(0, outputBufferID, outputBefore) and continuePushing) { Span outputData = outputBefore; bool streamEnded = false; if (not stream.finalize(outputData, streamEnded)) { AsyncReadableStream::getBuffersPool().unrefBuffer(outputBufferID); AsyncWritableStream::emitError(Result::Error("SyncZLibTransformStream::canEndTransform error")); return true; // --> Transition to ENDED (unrecoverable error) } const size_t outputBytes = outputBefore.sizeInBytes() - outputData.sizeInBytes(); if (outputBytes > 0) { continuePushing = AsyncReadableStream::push(outputBufferID, outputBytes); } AsyncReadableStream::getBuffersPool().unrefBuffer(outputBufferID); if (streamEnded) { AsyncReadableStream::pushEnd(); return true; // --> Transition to ENDED (all data written) } } return false; // == Keep in ENDING state } //------------------------------------------------------------------------------------------------------- // AsyncZLibTransformStream //------------------------------------------------------------------------------------------------------- SC::AsyncZLibTransformStream::AsyncZLibTransformStream() { AsyncTransformStream::onProcess.bind<asynczlibtransformstream, &asynczlibtransformstream::compressexecute="">(*this); AsyncTransformStream::onFinalize.bind<asynczlibtransformstream, &asynczlibtransformstream::compressfinalize="">(*this); asyncWork.work.bind<asynczlibtransformstream, &asynczlibtransformstream::work="">(*this); asyncWork.callback.bind<asynczlibtransformstream, &asynczlibtransformstream::afterwork="">(*this); } SC::Result SC::AsyncZLibTransformStream::compressExecute(Span input, Span output) { SC_ASSERT_RELEASE(not finalizing); savedInput = input; savedOutput = output; finalizing = false; SC_TRY_MSG(eventLoop != nullptr, "AsyncZLibTransformStream::setEventLoop not called"); return asyncWork.start(*eventLoop); } SC::Result SC::AsyncZLibTransformStream::compressFinalize(Span output) { // Intentionally not resetting savedInput, that can contain leftover data to process savedOutput = output; finalizing = true; SC_TRY_MSG(eventLoop != nullptr, "AsyncZLibTransformStream::setEventLoop not called"); return asyncWork.start(*eventLoop); } SC::Result SC::AsyncZLibTransformStream::work() { if (finalizing) { return stream.finalize(savedOutput, streamEnded); } else { return stream.process(savedInput, savedOutput); } } void SC::AsyncZLibTransformStream::afterWork(AsyncLoopWork::Result&) { if (finalizing) { AsyncTransformStream::afterFinalize(savedOutput, streamEnded); } else { AsyncTransformStream::afterProcess(savedInput, savedOutput); } } #endif // SANE_CPP_ASYNCSTREAMS_IMPLEMENTATION</asynczlibtransformstream,></asynczlibtransformstream,></asynczlibtransformstream,></asynczlibtransformstream,></void(asyncbufferview::id)></self,></self,></uint8_t*></uint8_t*></uint8_t*></uint8_t*></asyncpipeline,></void(asyncbufferview::id)></asyncpipeline,></asyncpipeline,></asyncpipeline,></asyncwritablestream,></asyncpipeline,></asyncpipeline,></asyncpipeline,></asyncpipeline,></asyncpipeline,></void(asyncbufferview::id)></void(asyncbufferview::id)></self,></self,></void(asyncbufferview::id)></void(asyncbufferview::id)></void(asyncbufferview::id)></void(asyncbufferview::id)></char*></char*></char*></char*></char*></void(asyncbufferview::id)></asyncrequestwritablestream,></asyncrequestwritablestream,></asyncrequestreadablestream,></asyncrequestreadablestream,></void(asyncbufferview::id)></void(asyncbufferview::id)></void(asyncbufferview::id)></maxlisteners,></void(asyncbufferview::id)></void(asyncbufferview::id)></void(asyncbufferview::id)></result(span</result(span</bool()></void(asyncbufferview::id)></void(asyncbufferview::id)></void(asyncbufferview::id)></void(asyncbufferview::id)></maxlisteners,></void(asyncbufferview::id)></result(asyncbufferview::id,></maxlisteners,></maxlisteners,></result()></igrowablebuffer*(growablestorage&,></growablebuffer</growablebuffer</growablebuffer</void(t...)></t,></class,></void(t...)></class,></void(t...)>