Streams — Unified Async I/O¶
Pulp's streams module provides one byte-oriented interface across files, memory buffers, named pipes, TCP sockets, and HTTP response bodies. It replaces the ad-hoc http_get / Socket::send / NamedPipe::read surfaces with a common contract that layers cleanly:
| Layer | Header | What it adds |
|---|---|---|
Stream |
pulp/runtime/stream.hpp |
Synchronous read/write/close/is_open with a typed StreamResult. |
AsyncStream |
pulp/runtime/async_stream.hpp |
Background worker + callbacks, bounded write queue with backpressure, cancellation token, optional executor for callback routing. |
TcpStream, HttpStream |
pulp/runtime/network_stream.hpp |
Network-backed Stream subclasses that plug into the same AsyncStream wrapper without additional adapter code. |
The layering matters: because AsyncStream only knows about Stream, adding a new transport (WebSocket, IPC channel, TLS sub-protocol) only requires implementing Stream::read and Stream::write. Backpressure, cancellation, and callback dispatch come for free.
StreamResult — why it isn't Result<size_t>¶
Every read/write returns StreamResult { size_t bytes; StreamError error; }. Callers normally check result.ok(); an Ok result with bytes == 0 means "end of stream" on read or "no room right now" on write.
StreamError is deliberately small:
| Value | Meaning |
|---|---|
Ok |
Operation succeeded. |
Closed |
Stream is not open (EOF / peer hung up / never opened). |
WouldBlock |
No data / no room right now. Non-blocking sockets use this. |
IoError |
OS-level failure. errno/GetLastError semantics at the backend. |
Invalid |
Bad argument or wrong-direction operation (e.g., writing to an HttpStream). |
The small enum is deliberate — consumers handle the same five cases across every backend, and AsyncStream can translate them into callbacks uniformly.
Synchronous Stream backends¶
FileStream¶
FileStream out("cache.bin", FileStream::Mode::Write);
const std::uint8_t payload[] = "hello";
out.write(payload, sizeof(payload));
out.flush();
Modes: Read, Write, Append, ReadWrite. The stream owns a std::FILE* for cross-platform consistency.
MemoryStream¶
An in-memory byte buffer that satisfies the Stream contract. Useful for tests and for round-tripping serialized state without hitting the filesystem.
PipeStream¶
Wraps NamedPipe, which in turn uses mkfifo on POSIX and CreateNamedPipe on Windows. The pipe is opened by the caller (create_server / connect_client) and handed to PipeStream for the unified interface.
TcpStream¶
TcpStream tcp;
if (!tcp.connect("127.0.0.1", 8080)) { /* error */ }
tcp.write(payload, sizeof(payload));
connect() performs a blocking DNS + TCP handshake. To keep that off the caller's thread, wrap in AsyncStream:
auto tcp = std::make_unique<TcpStream>();
tcp->connect("host", port); // still blocks here; move into a worker first
AsyncStream io(std::move(tcp));
io.start();
HttpStream¶
auto stream = HttpStream::get("https://example.com/data.json");
if (stream->status_code() == 200) {
std::uint8_t buf[4096]{};
while (true) {
auto r = stream->read(buf, sizeof(buf));
if (!r.ok()) break;
// process buf[0..r.bytes]
}
}
HTTPS is inherited from cpp-httplib, which ships with mbedTLS. HttpStream is currently read-only: calling write() returns StreamError::Invalid. Request-body streaming is Phase 4 of the streams feature plan.
AsyncStream — non-blocking with backpressure¶
AsyncStream wraps any Stream and runs a background worker that pumps read/write calls. Events surface as callbacks:
AsyncStream::Options opts;
opts.read_chunk = 8 * 1024; // bytes pulled per iteration
opts.write_high_water = 1 << 20; // 1 MiB pending-write limit
opts.executor = [&loop](auto fn) { loop.dispatch(std::move(fn)); };
AsyncStream io(std::make_unique<FileStream>("big.wav"), opts);
io.on_data([](const auto* d, auto n) { /* process chunk */ });
io.on_error([](StreamError e) { /* log and degrade */ });
io.on_close([] { /* flush UI state */ });
io.start();
Backpressure¶
write_async(buf, size, callback) returns false when the pending byte count would exceed write_high_water. Callers should wait for the on_drain callback (fired when the queue empties) before retrying. This keeps the write queue bounded so producers cannot exhaust memory when the network is slow.
Cancellation¶
Every AsyncStream owns a CancellationToken. Calling cancel() — or destroying the stream — sets the token, drains pending writes with StreamError::Closed, exits the worker, and fires on_close exactly once. The token can be shared out via cancellation_token() so user-level work (retries, coalescing writes) observes cancellation at the same moment the worker does.
Executor routing¶
AsyncStream intentionally does not know about pulp::events::EventLoop. Keeping runtime free of event-system dependencies avoids a library link cycle. Instead, callers pass an executor closure:
When executor is empty, callbacks fire inline on the worker thread — convenient for short-lived tools and tests.
Example¶
The stream-demo example in examples/stream-demo/ exercises all three layers: synchronous file I/O, an AsyncStream over a pre-populated file dispatched onto a pulp::events::EventLoop, and an optional HTTP GET via HttpStream:
./build/examples/stream-demo/pulp-stream-demo
./build/examples/stream-demo/pulp-stream-demo --http https://example.com
Message channels (Phase 4)¶
Bytewise transports stay in Stream. Structured messages — where one send matches one delivered message — live behind pulp::runtime::MessageChannel:
class MessageChannel {
virtual bool send(const uint8_t* data, size_t size);
virtual bool send_text(std::string_view);
virtual void on_message(MessageCallback);
virtual void on_closed(ChannelClosedCallback);
virtual void on_error(ChannelErrorCallback);
virtual void close();
virtual bool is_open() const;
};
Four implementations ship with this phase:
| Channel | Header | Transport |
|---|---|---|
WebSocketChannel |
pulp/runtime/websocket_channel.hpp |
RFC 6455 over TcpStream; client handshake, server handshake, text/binary frames, ping/pong/close. |
OscChannel |
pulp/osc/osc_channel.hpp |
UDP via the existing pulp::osc::Sender / Receiver. Messages are carried as encoded OSC packets. |
MemoryMessageChannel |
pulp/runtime/memory_message_channel.hpp |
In-process pair for tests and intra-process bridges. |
JsonRpcPeer |
pulp/runtime/json_rpc.hpp |
JSON-RPC 2.0 over any MessageChannel — symmetric peer that can send/serve requests and notifications. |
WebSocket¶
auto tcp = std::make_unique<TcpStream>();
tcp->connect("example.com", 80);
auto ws = WebSocketChannel::connect(std::move(tcp), "example.com", "/chat");
ws->on_message([](const Message& m) { /* m.kind is Text or Binary */ });
ws->send_text("hello");
- TLS is not currently handled at the channel layer — for
wss://, supply a TLS-wrapped TCP stream (futureSecureTcpStream). - Control frames (ping/pong/close) are handled internally; ping triggers an automatic pong.
- Payloads up to
options.max_payload(default 16 MiB) are accepted; larger frames fireon_errorand close the channel.
OSC¶
auto osc = OscChannel::open("127.0.0.1", /*remote=*/8000, /*local=*/9000);
osc->on_message([](const Message& m) {
auto decoded = pulp::osc::decode(m.payload.data(), m.payload.size());
});
osc->send(pulp::osc::Message("/synth/freq").add(440.f));
JSON-RPC¶
JsonRpcPeer is symmetric — either side can send requests, serve requests, emit notifications, and subscribe to notifications. It works over any MessageChannel:
JsonRpcPeer peer(*ws);
peer.register_method("add", [](std::string_view params) {
// params is a JSON-encoded array/object; return a JSON-encoded result
return JsonRpcResult::ok("42");
});
peer.send_request("echo", R"(["hi"])", [](const JsonRpcResult& r) {
if (r.error) { /* handle */ } else { /* r.result_json */ }
});
peer.notify("progress", R"({"percent":50})");
Errors follow JSON-RPC 2.0 §5: -32601 is reported for unknown methods, -32603 for handler exceptions, and custom codes can be returned via JsonRpcResult::fail.
Further reading¶
- Feature plan:
planning/next-features-plan.md§ Feature 3 - Ralph automation prompt:
planning/ralph-prompt-streams.md - Related headers:
pulp/runtime/socket.hpp,pulp/runtime/http.hpp,pulp/runtime/named_pipe.hpp,pulp/runtime/message_channel.hpp,pulp/osc/osc_channel.hpp