57#include <unordered_map>
58#include <unordered_set>
149 static std::string normalize_schema_encoding(std::string_view encoding);
151 static bool is_flatbuffers_schema_type(std::string_view encoding);
153 bool cache_schema_data_locked(
const SchemaData& schema);
155#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
158 static SchemaData build_protobuf_schema_data(
const google::protobuf::Descriptor& descriptor);
161#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
162 void import_all_flatbuffers_schema_data_locked();
164 bool import_flatbuffers_schema_data_locked(
const std::string& name);
166 const SchemaData* find_cached_flatbuffers_schema_locked(
const std::string& name)
const;
168 void clear_flatbuffers_parser_cache_locked(
const std::string& name);
171#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
172 google::protobuf::DynamicMessageFactory factory_;
174 std::unordered_map<std::string, ProtobufDescriptorPtr> protobuf_descriptor_map_;
175 std::unordered_map<std::string, std::vector<SchemaData>> schema_map_;
176 std::unordered_map<std::string, ProtobufMessagePtr> protobuf_message_map_;
177 std::unordered_map<std::string, FlatbuffersParserPtr> flatbuffers_parser_map_;
188#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
189 : factory_(google::protobuf::DescriptorPool::generated_pool())
196#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
197 for (
auto& [name, ptr] : protobuf_message_map_) {
199 delete reinterpret_cast<google::protobuf::Message*
>(ptr);
203 for (
auto& [name, ptr] : flatbuffers_parser_map_) {
205#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
206 delete reinterpret_cast<flatbuffers::Parser*
>(ptr);
210 protobuf_message_map_.clear();
211 flatbuffers_parser_map_.clear();
215 std::lock_guard lock(mtx_);
217 auto iter = schema_map_.find(name);
219 if (iter != schema_map_.end()) {
221 for (
const auto& schema : iter->second) {
222 if (schema.schema_type == schema_type) {
235 (void)cache_schema_data_locked(schema);
240 std::vector<SchemaData> matches;
243 [[maybe_unused]]
bool has_cached_protobuf =
false;
244 [[maybe_unused]]
bool has_cached_flatbuffers =
false;
245 [[maybe_unused]]
bool has_cached_zerocopy =
false;
247 if (iter != schema_map_.end()) {
248 for (
const auto& cached_schema : iter->second) {
249 switch (cached_schema.schema_type) {
251 has_cached_protobuf =
true;
254 has_cached_flatbuffers =
true;
257 has_cached_zerocopy =
true;
263 matches.emplace_back(cached_schema);
267#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
268 if (!has_cached_protobuf) {
269 if (
auto* desc_ptr = find_protobuf_descriptor_locked(name); desc_ptr !=
nullptr) {
270 auto* desc =
reinterpret_cast<google::protobuf::Descriptor*
>(desc_ptr);
271 auto proto_schema = build_protobuf_schema_data(*desc);
273 if (!proto_schema.encoding.empty()) {
274 (void)cache_schema_data_locked(proto_schema);
275 matches.emplace_back(std::move(proto_schema));
281#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
282 if (!has_cached_flatbuffers) {
283 if (
const auto* cached_schema = find_cached_flatbuffers_schema_locked(name)) {
284 matches.emplace_back(*cached_schema);
291 zerocopy_schema.
name = name;
292 zerocopy_schema.
encoding =
"vlink_msg";
294 (void)cache_schema_data_locked(zerocopy_schema);
295 matches.emplace_back(std::move(zerocopy_schema));
298 if (matches.size() == 1) {
299 return matches.front();
305#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
307 auto* desc_ptr = find_protobuf_descriptor_locked(name);
309 if (desc_ptr !=
nullptr) {
310 auto* desc =
reinterpret_cast<google::protobuf::Descriptor*
>(desc_ptr);
311 schema = build_protobuf_schema_data(*desc);
314 (void)cache_schema_data_locked(schema);
321#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
323 if (
const auto* cached_schema = find_cached_flatbuffers_schema_locked(name)) {
324 return *cached_schema;
333 std::lock_guard lock(mtx_);
335#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
337 import_all_flatbuffers_schema_data_locked();
341 size_t schema_count = 0;
343 for (
const auto& [name, items] : schema_map_) {
345 schema_count += items.size();
348 std::vector<SchemaData> schemas;
349 schemas.reserve(schema_count);
351 for (
const auto& [name, items] : schema_map_) {
354 for (
const auto& schema : items) {
359 schemas.emplace_back(schema);
367 const std::string& name) {
368 std::lock_guard lock(mtx_);
370#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
371 return find_protobuf_descriptor_locked(name);
379 std::lock_guard lock(mtx_);
381 auto iter = protobuf_message_map_.find(name);
383 if (iter != protobuf_message_map_.end()) {
387#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
388 auto* desc_ptr = find_protobuf_descriptor_locked(name);
394 const auto* prototype = factory_.GetPrototype(
reinterpret_cast<google::protobuf::Descriptor*
>(desc_ptr));
401 protobuf_message_map_.emplace(name, target_ptr);
410 const std::string& name) {
411 std::lock_guard lock(mtx_);
413#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
414 const auto* schema = find_cached_flatbuffers_schema_locked(name);
422 if VUNLIKELY (!reflection::VerifySchemaBuffer(verifier)) {
427 const_cast<reflection::Schema*
>(reflection::GetSchema(schema->data.data())));
435 const std::string& name) {
436 std::lock_guard lock(mtx_);
438 auto iter = flatbuffers_parser_map_.find(name);
440 if (iter != flatbuffers_parser_map_.end()) {
444#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
445 const auto* schema = find_cached_flatbuffers_schema_locked(name);
451 auto parser = std::make_unique<flatbuffers::Parser>();
453 if VUNLIKELY (!parser->Deserialize(
reinterpret_cast<const uint8_t*
>(schema->data.data()), schema->data.size())) {
457 if VUNLIKELY (!parser->SetRootType(name.c_str())) {
462 flatbuffers_parser_map_.emplace(name, target_ptr);
470inline std::string SchemaPluginBase::normalize_schema_encoding(std::string_view encoding) {
471 std::string normalized{encoding};
472 std::transform(normalized.begin(), normalized.end(), normalized.begin(),
473 [](
unsigned char c) { return static_cast<char>(std::tolower(c)); });
475 if (normalized ==
"proto") {
479 if (normalized ==
"flatbuffer" || normalized ==
"flatbuffers" || normalized ==
"fbs" || normalized ==
"bfbs") {
480 return "flatbuffers";
486inline bool SchemaPluginBase::is_flatbuffers_schema_type(std::string_view encoding) {
487 return normalize_schema_encoding(encoding) ==
"flatbuffers";
490inline bool SchemaPluginBase::cache_schema_data_locked(
const SchemaData& schema) {
491 if VUNLIKELY (schema.name.empty() || schema.encoding.empty()) {
495#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
496 if (is_flatbuffers_schema_type(schema.encoding)) {
497 flatbuffers::Verifier verifier(schema.data.data(), schema.data.size());
499 if VUNLIKELY (!reflection::VerifySchemaBuffer(verifier)) {
503 clear_flatbuffers_parser_cache_locked(schema.name);
513 auto& cached_schemas = schema_map_[schema.name];
515 for (
auto& cached_schema : cached_schemas) {
516 if (cached_schema.schema_type == cached_schema_type) {
517 cached_schema = schema;
522 cached_schemas.emplace_back(schema);
526#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
528 const std::string& name) {
529 auto iter = protobuf_descriptor_map_.find(name);
531 if (iter != protobuf_descriptor_map_.end()) {
535 const auto* desc = google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(name);
541 auto* target_ptr =
reinterpret_cast<ProtobufDescriptorPtr>(
const_cast<google::protobuf::Descriptor*
>(desc));
542 protobuf_descriptor_map_.emplace(name, target_ptr);
546inline SchemaData SchemaPluginBase::build_protobuf_schema_data(
const google::protobuf::Descriptor& descriptor) {
548 schema.
name = descriptor.full_name();
549 schema.encoding =
"protobuf";
553 schema.encoding.clear();
558 google::protobuf::FileDescriptorSet proto_fd_set;
559 std::queue<const google::protobuf::FileDescriptor*> to_add;
561 to_add.push(descriptor.file());
563#if GOOGLE_PROTOBUF_VERSION >= 6030000
564 std::unordered_set<std::string_view> seen_dependencies;
566 std::unordered_set<std::string> seen_dependencies;
569 seen_dependencies.insert(descriptor.file()->name());
571 while (!to_add.empty()) {
572 const auto* next = to_add.front();
575 next->CopyTo(proto_fd_set.add_file());
577 for (
int i = 0; i < next->dependency_count(); ++i) {
578 const auto* dep = next->dependency(i);
580 if (dep ==
nullptr || seen_dependencies.find(dep->name()) != seen_dependencies.end()) {
584 seen_dependencies.insert(dep->name());
591 if VUNLIKELY (!proto_fd_set.SerializeToArray(schema.data.data(), schema.data.size())) {
592 schema.encoding.clear();
601#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
602inline void SchemaPluginBase::import_all_flatbuffers_schema_data_locked() {
603 for (
const auto& schema : FlatbuffersRegistry::get().
get_all_schemas()) {
604 (void)cache_schema_data_locked(schema);
608inline bool SchemaPluginBase::import_flatbuffers_schema_data_locked(
const std::string& name) {
609 auto schema = FlatbuffersRegistry::get().search_schema(name);
611 if (!is_flatbuffers_schema_type(schema.encoding) || schema.data.empty()) {
615 return cache_schema_data_locked(schema);
618inline const SchemaData* SchemaPluginBase::find_cached_flatbuffers_schema_locked(
const std::string& name)
const {
620 auto iter = self->schema_map_.find(name);
622 if (iter == self->schema_map_.end()) {
623 (void)self->import_flatbuffers_schema_data_locked(name);
624 iter = self->schema_map_.find(name);
627 if (iter == self->schema_map_.end()) {
631 for (
const auto& schema : iter->second) {
636 if (!is_flatbuffers_schema_type(schema.encoding) || schema.data.empty()) {
646inline void SchemaPluginBase::clear_flatbuffers_parser_cache_locked(
const std::string& name) {
647 auto iter = flatbuffers_parser_map_.find(name);
649 if (iter == flatbuffers_parser_map_.end()) {
653 delete reinterpret_cast<flatbuffers::Parser*
>(iter->second);
654 flatbuffers_parser_map_.erase(iter);
static void init_memory_pool() noexcept
Initialises the global thread-safe memory pool for Bytes allocations.
static Bytes create(size_t size, uint8_t offset=0) noexcept
Creates an owned Bytes buffer of the given size.
~SchemaPluginBase() override
Destroys cached runtime objects owned by the plugin.
Definition schema_plugin_base.h:195
FlatbuffersParserPtr create_flatbuffers_parser(const std::string &name) override
Creates a FlatBuffers parser preloaded with the named root type.
Definition schema_plugin_base.h:434
ProtobufMessagePtr create_protobuf_message(const std::string &name) override
Creates a cached Protobuf dynamic message prototype for a type.
Definition schema_plugin_base.h:378
SchemaData search_schema(const std::string &name, SchemaType schema_type=SchemaType::kUnknown) override
Finds one schema blob constrained by schema family.
Definition schema_plugin_base.h:214
FlatbuffersSchemaPtr search_flatbuffers_schema(const std::string &name) override
Finds the BFBS reflection schema for a FlatBuffers root type.
Definition schema_plugin_base.h:409
SchemaPluginBase()
Constructs the base plugin and initialises the VLink memory pool.
Definition schema_plugin_base.h:187
std::vector< SchemaData > get_all_schemas(SchemaType schema_type=SchemaType::kUnknown) override
Returns all cached schemas filtered by schema family.
Definition schema_plugin_base.h:332
ProtobufDescriptorPtr search_protobuf_descriptor(const std::string &name) override
Looks up a Protobuf descriptor by fully-qualified type name.
Definition schema_plugin_base.h:366
void * ProtobufMessagePtr
Opaque pointer type for a google::protobuf::Message instance.
Definition schema_plugin_interface.h:91
void * FlatbuffersSchemaPtr
Opaque pointer type for a FlatBuffers reflection::Schema instance.
Definition schema_plugin_interface.h:100
void * ProtobufDescriptorPtr
Opaque pointer type for a google::protobuf::Descriptor.
Definition schema_plugin_interface.h:83
SchemaPluginInterface()=default
void * FlatbuffersParserPtr
Opaque pointer type for a runtime FlatBuffers Parser instance.
Definition schema_plugin_interface.h:108
Process-local registry for BFBS blobs compiled into the current library.
String, number, hash and formatting utility functions.
#define VUNLIKELY(...)
Shorthand alias for VLINK_UNLIKELY. Hints that the expression is unlikely true.
Definition macros.h:302
#define VLINK_DISALLOW_COPY_AND_ASSIGN(classname)
Deletes the copy constructor and copy-assignment operator of classname.
Definition macros.h:184
bool has_startwith(const std::string &str, const char(&target)[SizeT]) noexcept
Compile-time check whether str starts with the literal target.
Definition helpers.h:376
SchemaType
Coarse runtime schema family used by discovery, bag metadata, and proxy routing.
Definition types.h:184
@ kUnknown
Decode category is not known.
Definition types.h:185
@ kZeroCopy
Decode using VLink zero-copy message structs.
Definition types.h:187
@ kProtobuf
Decode using the Protocol Buffers stack.
Definition types.h:188
@ kFlatbuffers
Decode using the FlatBuffers stack.
Definition types.h:189
Protobuf runtime include wrapper for schema-plugin support.
Runtime schema plugin interface for Protobuf and FlatBuffers metadata loading.
Definition serializer-inl.h:139
Carries one serialized schema blob for runtime registration or embedding.
Definition types.h:246
SchemaType schema_type
Coarse runtime schema family derived from encoding.
Definition types.h:249
static bool is_real_type(SchemaType schema_type) noexcept
Returns whether a schema type carries concrete runtime schema metadata.
static bool is_valid_type(SchemaType schema_type) noexcept
Returns whether a schema type enum value is within the supported range.
std::string encoding
Schema encoding identifier (e.g. "protobuf" or "flatbuffers").
Definition types.h:248
std::string name
Schema subject name, typically a fully-qualified message or table type.
Definition types.h:247