74#include <unordered_set>
154 using SplitCallback = std::function<void(
int split_index,
const std::string& split_filename)>;
170 using SystemClock = std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds>;
185 [[nodiscard]]
static std::shared_ptr<BagWriter>
create(
const std::string& path,
const Config& config = {});
200 [[nodiscard]]
static std::shared_ptr<BagWriter>
filter_get(
const std::string& path);
274 virtual int64_t
push(
const std::string& url,
const std::string& ser_type,
SchemaType schema_type,
275 ActionType action_type,
const Bytes& data, int64_t* microseconds_timestamp =
nullptr,
276 bool immediate =
false) = 0;
315 void get_url_meta(
const std::string& url,
const std::string& ser,
int& url_index,
int& ser_index)
const;
317 void get_url_meta(
int url_index,
int ser_index, std::string& url, std::string& ser)
const;
332 std::unique_ptr<struct BagWriterImpl> impl_;
Versatile byte buffer with small-buffer optimisation, ownership semantics and compression.
static std::string_view convert_action(ActionType type)
static const std::string & get_default_app_name()
BagWriter(const std::string &path, const Config &config={})
Constructs a BagWriter for path with the given config.
virtual int64_t push(const std::string &url, const std::string &ser_type, SchemaType schema_type, ActionType action_type, const Bytes &data, int64_t *microseconds_timestamp=nullptr, bool immediate=false)=0
Records one message to the bag.
void get_url_meta(int url_index, int ser_index, std::string &url, std::string &ser) const
virtual int get_split_index() const =0
Returns the zero-based index of the current split file.
std::function< void(int split_index, const std::string &split_filename)> SplitCallback
Callback fired when a split occurs.
Definition bag_writer.h:154
static std::shared_ptr< BagWriter > filter_get(const std::string &path)
Returns an existing writer for path, or creates and starts a new one.
virtual ~BagWriter()
Destructor – stops the recording loop and flushes pending writes.
CompressType
Compression algorithm applied to each recorded payload.
Definition bag_writer.h:107
@ kCompressAuto
Automatic algorithm selection.
Definition bag_writer.h:109
@ kCompressZstd
Zstandard compression.
Definition bag_writer.h:110
@ kCompressNone
No compression.
Definition bag_writer.h:108
@ kCompressLzav
LZAV built-in compression.
Definition bag_writer.h:112
@ kCompressLz4
LZ4 compression.
Definition bag_writer.h:111
virtual bool is_split_mode() const =0
Returns true if the writer is in split-file mode.
virtual void set_url_loss(const std::string &url, double loss)=0
Sets the expected message loss ratio for a given URL.
std::function< SchemaData(const std::string &ser_type, SchemaType schema_type)> SchemaCallback
Callback that resolves a serialisation type string to a SchemaData.
Definition bag_writer.h:165
static int32_t get_default_timezone_diff()
static std::string get_format_date(SystemClock *current=nullptr, bool file_format=false)
static BagWriter * global_get()
Returns the process-global BagWriter activated by the VLINK_BAG_PATH environment variable.
void get_url_meta(const std::string &url, const std::string &ser, int &url_index, int &ser_index) const
static SchemaPluginInterface * get_schema_interface()
static const std::string & get_default_tag_name()
virtual void register_schema_callback(SchemaCallback &&callback)=0
Registers a callback that resolves serialisation type strings to SchemaData.
virtual bool push_schema(const SchemaData &schema_data, bool immediate=false)=0
Embeds a SchemaData into the bag for later offline introspection.
std::chrono::time_point< std::chrono::system_clock, std::chrono::milliseconds > SystemClock
System clock type used for file-name timestamp generation.
Definition bag_writer.h:170
virtual void register_split_callback(SplitCallback &&callback, bool before)=0
Registers a callback invoked when a file split occurs.
static std::shared_ptr< BagWriter > create(const std::string &path, const Config &config={})
Creates a concrete BagWriter instance for path.
virtual bool is_dumping() const =0
Returns true if the writer is actively recording to disk.
Versatile 128-byte byte buffer with SBO, five ownership modes and compression helpers.
Definition bytes.h:113
MessageLoop()
Constructs a MessageLoop with kNormalType queue.
Abstract interface for runtime schema lookup and dynamic object creation.
Definition schema_plugin_interface.h:67
Platform-independent macro definitions for the VLink library.
#define VLINK_EXPORT
Definition macros.h:85
#define VLINK_DISALLOW_COPY_AND_ASSIGN(classname)
Deletes the copy constructor and copy-assignment operator of classname.
Definition macros.h:184
Single-threaded event loop with three queue types, timer management and task scheduling.
SchemaType
Coarse runtime schema family used by discovery, bag metadata, and proxy routing.
Definition types.h:184
ActionType
Identifies the type of message action for recording purposes.
Definition types.h:162
Configuration for recording behaviour, splitting, compression, and limits.
Definition bag_writer.h:122
int64_t start_timestamp
Override the bag start timestamp (ms since epoch).
Definition bag_writer.h:140
int64_t max_memory_size
Max in-memory cache size (bytes).
Definition bag_writer.h:139
Config()
Definition bag_writer.h:143
int64_t max_task_depth
Max pending write tasks in the queue.
Definition bag_writer.h:138
std::unordered_set< std::string > ignore_compress_urls
URLs whose payloads are never compressed.
Definition bag_writer.h:141
std::string tag_name
Optional tag name stored in the bag header.
Definition bag_writer.h:123
int64_t max_bytes_size
Max file bytes before splitting (if enable_limit).
Definition bag_writer.h:131
int64_t split_by_time
Split file every N milliseconds. 0 = disabled.
Definition bag_writer.h:133
int64_t compress_level
Compression level (codec-specific).
Definition bag_writer.h:137
CompressType compress
Compression algorithm.
Definition bag_writer.h:124
int64_t begin_time
Recording start timestamp (ms). 0 = now.
Definition bag_writer.h:134
bool optimize_on_exit
Run VACUUM/optimise on file close.
Definition bag_writer.h:129
int64_t cache_size
SQLite page cache size (bytes).
Definition bag_writer.h:135
int64_t split_by_size
Split file when it reaches this size (bytes).
Definition bag_writer.h:132
bool split_name_by_time
Append timestamp to split file names.
Definition bag_writer.h:127
bool sync_mode
Enable synchronous writes to disk.
Definition bag_writer.h:128
int64_t max_row_count
Max rows before splitting (if enable_limit).
Definition bag_writer.h:130
bool wal_mode
Enable SQLite WAL mode for crash resilience.
Definition bag_writer.h:125
int64_t compress_start_size
Minimum payload size (bytes) to compress.
Definition bag_writer.h:136
bool enable_limit
Enable max_row_count / max_bytes_size limits.
Definition bag_writer.h:126
Carries one serialized schema blob for runtime registration or embedding.
Definition types.h:246
Core type definitions shared across all VLink node implementations.