VLink 2.0.0
A high-performance communication middleware
Loading...
Searching...
No Matches
schema_plugin_base.h
Go to the documentation of this file.
1/*
2 * Copyright (C) 2026 by Thun Lu. All rights reserved.
3 * Author: Thun Lu <thun.lu@zohomail.cn>
4 * Repo: https://github.com/thun-res/vlink
5 * _ __ __ _ __
6 * | | / / / / (_) ____ / /__
7 * | | / / / / / / / __ \ / //_/
8 * | |/ / / /___ / / / / / / / ,<
9 * |___/ /_____/ /_/ /_/ /_/ /_/|_|
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 */
23
24/**
25 * @file schema_plugin_base.h
26 * @brief Default schema plugin implementation built around linked protobuf metadata and embedded BFBS blobs.
27 *
28 * @details
29 * @c SchemaPluginBase intentionally keeps the protobuf side aligned with the
30 * previous protobuf-only runtime behaviour:
31 * - protobuf descriptors are resolved directly from
32 * @c google::protobuf::DescriptorPool::generated_pool()
33 * - protobuf schema payloads are serialised from the linked @c FileDescriptor graph
34 * - dynamic protobuf messages are created via @c DynamicMessageFactory
35 *
36 * FlatBuffers differs because there is no global descriptor pool. Therefore the
37 * concrete plugin/library must explicitly register compiled-in BFBS blobs into
38 * the process-local @c FlatbuffersRegistry through
39 * @c FlatbuffersRegistry::register_schema() or the convenience macro
40 * @c VLINK_REGISTER_FLATBUFFERS.
41 * A common pattern is to generate headers with
42 * @c flatc --bfbs-gen-embed and register the emitted @c *BinarySchema helpers
43 * at translation-unit scope inside the plugin/library.
44 *
45 * This base class does not read @c VLINK_PROTO_DIR, @c VLINK_FBS_DIR, or any
46 * schema files from the file system.
47 */
48
49#pragma once
50
51#include <algorithm>
52#include <cctype>
53#include <mutex>
54#include <queue>
55#include <string>
56#include <string_view>
57#include <unordered_map>
58#include <unordered_set>
59#include <vector>
60
61#include "../base/helpers.h"
63
64//
66#include "./protobuf_registry.h"
67
68namespace vlink {
69
70/**
71 * @class SchemaPluginBase
72 * @brief Default mixed-schema plugin base class for Protobuf and FlatBuffers.
73 *
74 * @details
75 * The intended usage model is:
76 * 1. Protobuf types are already linked into the same plugin/library, so lookups go
77 * straight to the generated descriptor pool.
78 * 2. FlatBuffers BFBS blobs are generated at build time and explicitly registered
79 * into the static registry owned by the current plugin/library.
80 *
81 * This keeps protobuf behaviour compatible with the previous protobuf-only
82 * runtime implementation while filling the missing FlatBuffers reflection and
83 * parser hooks.
84 */
86 protected:
87 /**
88 * @brief Constructs the base plugin and initialises the VLink memory pool.
89 */
91
92 /**
93 * @brief Destroys cached runtime objects owned by the plugin.
94 */
95 ~SchemaPluginBase() override;
96
97 /**
98 * @brief Finds one schema blob constrained by schema family.
99 *
100 * @param name Serialization type or fully-qualified message name.
101 * @param schema_type Coarse schema family hint, or @c SchemaType::kUnknown for
102 * family-agnostic lookup.
103 * @return Matching @c SchemaData, or an empty schema when not found.
104 */
105 [[nodiscard]] SchemaData search_schema(const std::string& name,
106 SchemaType schema_type = SchemaType::kUnknown) override;
107
108 /**
109 * @brief Returns all cached schemas filtered by schema family.
110 *
111 * @param schema_type Schema family to filter on; @c SchemaType::kUnknown returns all.
112 * @return Vector of cached schemas belonging to @p schema_type.
113 */
114 [[nodiscard]] std::vector<SchemaData> get_all_schemas(SchemaType schema_type = SchemaType::kUnknown) override;
115
116 /**
117 * @brief Looks up a Protobuf descriptor by fully-qualified type name.
118 *
119 * @param name Fully-qualified Protobuf message type.
120 * @return Opaque descriptor pointer, or @c nullptr if not found.
121 */
122 [[nodiscard]] ProtobufDescriptorPtr search_protobuf_descriptor(const std::string& name) override;
123
124 /**
125 * @brief Creates a cached Protobuf dynamic message prototype for a type.
126 *
127 * @param name Fully-qualified Protobuf message type.
128 * @return Opaque message pointer, or @c nullptr if not found.
129 */
130 [[nodiscard]] ProtobufMessagePtr create_protobuf_message(const std::string& name) override;
131
132 /**
133 * @brief Finds the BFBS reflection schema for a FlatBuffers root type.
134 *
135 * @param name Fully-qualified FlatBuffers root type.
136 * @return Opaque @c reflection::Schema handle, or @c nullptr if not found.
137 */
138 [[nodiscard]] FlatbuffersSchemaPtr search_flatbuffers_schema(const std::string& name) override;
139
140 /**
141 * @brief Creates a FlatBuffers parser preloaded with the named root type.
142 *
143 * @param name Fully-qualified FlatBuffers root type.
144 * @return Opaque @c flatbuffers::Parser handle, or @c nullptr if not found.
145 */
146 [[nodiscard]] FlatbuffersParserPtr create_flatbuffers_parser(const std::string& name) override;
147
148 private:
149 static std::string normalize_schema_encoding(std::string_view encoding);
150
151 static bool is_flatbuffers_schema_type(std::string_view encoding);
152
153 bool cache_schema_data_locked(const SchemaData& schema);
154
155#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
156 ProtobufDescriptorPtr find_protobuf_descriptor_locked(const std::string& name);
157
158 static SchemaData build_protobuf_schema_data(const google::protobuf::Descriptor& descriptor);
159#endif
160
161#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
162 void import_all_flatbuffers_schema_data_locked();
163
164 bool import_flatbuffers_schema_data_locked(const std::string& name);
165
166 const SchemaData* find_cached_flatbuffers_schema_locked(const std::string& name) const;
167
168 void clear_flatbuffers_parser_cache_locked(const std::string& name);
169#endif
170
171#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
172 google::protobuf::DynamicMessageFactory factory_;
173#endif
174 std::unordered_map<std::string, ProtobufDescriptorPtr> protobuf_descriptor_map_;
175 std::unordered_map<std::string, std::vector<SchemaData>> schema_map_;
176 std::unordered_map<std::string, ProtobufMessagePtr> protobuf_message_map_;
177 std::unordered_map<std::string, FlatbuffersParserPtr> flatbuffers_parser_map_;
178 std::mutex mtx_;
179
181};
182
183////////////////////////////////////////////////////////////////
184/// Details
185////////////////////////////////////////////////////////////////
186
188#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
189 : factory_(google::protobuf::DescriptorPool::generated_pool())
190#endif
191{
193}
194
196#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
197 for (auto& [name, ptr] : protobuf_message_map_) {
198 (void)name;
199 delete reinterpret_cast<google::protobuf::Message*>(ptr);
200 }
201#endif
202
203 for (auto& [name, ptr] : flatbuffers_parser_map_) {
204 (void)name;
205#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
206 delete reinterpret_cast<flatbuffers::Parser*>(ptr);
207#endif
208 }
209
210 protobuf_message_map_.clear();
211 flatbuffers_parser_map_.clear();
212}
213
214inline SchemaData SchemaPluginBase::search_schema(const std::string& name, SchemaType schema_type) {
215 std::lock_guard lock(mtx_);
216
217 auto iter = schema_map_.find(name);
218
219 if (iter != schema_map_.end()) {
220 if (schema_type != SchemaType::kUnknown) {
221 for (const auto& schema : iter->second) {
222 if (schema.schema_type == schema_type) {
223 return schema;
224 }
225 }
226 }
227 }
228
229 SchemaData schema;
230 schema.name = name;
231
232 if (schema_type == SchemaType::kZeroCopy && Helpers::has_startwith(name, "vlink::zerocopy::")) {
233 schema.encoding = "vlink_msg";
235 (void)cache_schema_data_locked(schema);
236 return schema;
237 }
238
239 if (schema_type == SchemaType::kUnknown) {
240 std::vector<SchemaData> matches;
241 matches.reserve(3);
242
243 [[maybe_unused]] bool has_cached_protobuf = false;
244 [[maybe_unused]] bool has_cached_flatbuffers = false;
245 [[maybe_unused]] bool has_cached_zerocopy = false;
246
247 if (iter != schema_map_.end()) {
248 for (const auto& cached_schema : iter->second) {
249 switch (cached_schema.schema_type) {
251 has_cached_protobuf = true;
252 break;
254 has_cached_flatbuffers = true;
255 break;
257 has_cached_zerocopy = true;
258 break;
259 default:
260 break;
261 }
262
263 matches.emplace_back(cached_schema);
264 }
265 }
266
267#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
268 if (!has_cached_protobuf) {
269 if (auto* desc_ptr = find_protobuf_descriptor_locked(name); desc_ptr != nullptr) {
270 auto* desc = reinterpret_cast<google::protobuf::Descriptor*>(desc_ptr);
271 auto proto_schema = build_protobuf_schema_data(*desc);
272
273 if (!proto_schema.encoding.empty()) {
274 (void)cache_schema_data_locked(proto_schema);
275 matches.emplace_back(std::move(proto_schema));
276 }
277 }
278 }
279#endif
280
281#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
282 if (!has_cached_flatbuffers) {
283 if (const auto* cached_schema = find_cached_flatbuffers_schema_locked(name)) {
284 matches.emplace_back(*cached_schema);
285 }
286 }
287#endif
288
289 if (!has_cached_zerocopy && Helpers::has_startwith(name, "vlink::zerocopy::")) {
290 SchemaData zerocopy_schema;
291 zerocopy_schema.name = name;
292 zerocopy_schema.encoding = "vlink_msg";
293 zerocopy_schema.schema_type = SchemaType::kZeroCopy;
294 (void)cache_schema_data_locked(zerocopy_schema);
295 matches.emplace_back(std::move(zerocopy_schema));
296 }
297
298 if (matches.size() == 1) {
299 return matches.front();
300 }
301
302 return {};
303 }
304
305#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
306 if (schema_type == SchemaType::kProtobuf) {
307 auto* desc_ptr = find_protobuf_descriptor_locked(name);
308
309 if (desc_ptr != nullptr) {
310 auto* desc = reinterpret_cast<google::protobuf::Descriptor*>(desc_ptr);
311 schema = build_protobuf_schema_data(*desc);
312
313 if (!schema.encoding.empty()) {
314 (void)cache_schema_data_locked(schema);
315 return schema;
316 }
317 }
318 }
319#endif
320
321#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
322 if (schema_type == SchemaType::kFlatbuffers) {
323 if (const auto* cached_schema = find_cached_flatbuffers_schema_locked(name)) {
324 return *cached_schema;
325 }
326 }
327#endif
328
329 return schema;
330}
331
332inline std::vector<SchemaData> SchemaPluginBase::get_all_schemas(SchemaType schema_type) {
333 std::lock_guard lock(mtx_);
334
335#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
336 if (schema_type == SchemaType::kUnknown || schema_type == SchemaType::kFlatbuffers) {
337 import_all_flatbuffers_schema_data_locked();
338 }
339#endif
340
341 size_t schema_count = 0;
342
343 for (const auto& [name, items] : schema_map_) {
344 (void)name;
345 schema_count += items.size();
346 }
347
348 std::vector<SchemaData> schemas;
349 schemas.reserve(schema_count);
350
351 for (const auto& [name, items] : schema_map_) {
352 (void)name;
353
354 for (const auto& schema : items) {
355 if (schema_type != SchemaType::kUnknown && schema.schema_type != schema_type) {
356 continue;
357 }
358
359 schemas.emplace_back(schema);
360 }
361 }
362
363 return schemas;
364}
365
367 const std::string& name) {
368 std::lock_guard lock(mtx_);
369
370#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
371 return find_protobuf_descriptor_locked(name);
372#else
373 (void)name;
374 return nullptr;
375#endif
376}
377
379 std::lock_guard lock(mtx_);
380
381 auto iter = protobuf_message_map_.find(name);
382
383 if (iter != protobuf_message_map_.end()) {
384 return iter->second;
385 }
386
387#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
388 auto* desc_ptr = find_protobuf_descriptor_locked(name);
389
390 if VUNLIKELY (!desc_ptr) {
391 return nullptr;
392 }
393
394 const auto* prototype = factory_.GetPrototype(reinterpret_cast<google::protobuf::Descriptor*>(desc_ptr));
395
396 if VUNLIKELY (!prototype) {
397 return nullptr;
398 }
399
400 auto* target_ptr = reinterpret_cast<ProtobufMessagePtr>(prototype->New());
401 protobuf_message_map_.emplace(name, target_ptr);
402 return target_ptr;
403#else
404 (void)name;
405 return nullptr;
406#endif
407}
408
410 const std::string& name) {
411 std::lock_guard lock(mtx_);
412
413#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
414 const auto* schema = find_cached_flatbuffers_schema_locked(name);
415
416 if (!schema) {
417 return nullptr;
418 }
419
420 flatbuffers::Verifier verifier(schema->data.data(), schema->data.size());
421
422 if VUNLIKELY (!reflection::VerifySchemaBuffer(verifier)) {
423 return nullptr;
424 }
425
426 return reinterpret_cast<FlatbuffersSchemaPtr>(
427 const_cast<reflection::Schema*>(reflection::GetSchema(schema->data.data())));
428#else
429 (void)name;
430 return nullptr;
431#endif
432}
433
435 const std::string& name) {
436 std::lock_guard lock(mtx_);
437
438 auto iter = flatbuffers_parser_map_.find(name);
439
440 if (iter != flatbuffers_parser_map_.end()) {
441 return iter->second;
442 }
443
444#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
445 const auto* schema = find_cached_flatbuffers_schema_locked(name);
446
447 if (!schema) {
448 return nullptr;
449 }
450
451 auto parser = std::make_unique<flatbuffers::Parser>();
452
453 if VUNLIKELY (!parser->Deserialize(reinterpret_cast<const uint8_t*>(schema->data.data()), schema->data.size())) {
454 return nullptr;
455 }
456
457 if VUNLIKELY (!parser->SetRootType(name.c_str())) {
458 return nullptr;
459 }
460
461 auto* target_ptr = reinterpret_cast<FlatbuffersParserPtr>(parser.release());
462 flatbuffers_parser_map_.emplace(name, target_ptr);
463 return target_ptr;
464#else
465 (void)name;
466 return nullptr;
467#endif
468}
469
470inline std::string SchemaPluginBase::normalize_schema_encoding(std::string_view encoding) {
471 std::string normalized{encoding};
472 std::transform(normalized.begin(), normalized.end(), normalized.begin(),
473 [](unsigned char c) { return static_cast<char>(std::tolower(c)); });
474
475 if (normalized == "proto") {
476 return "protobuf";
477 }
478
479 if (normalized == "flatbuffer" || normalized == "flatbuffers" || normalized == "fbs" || normalized == "bfbs") {
480 return "flatbuffers";
481 }
482
483 return normalized;
484}
485
486inline bool SchemaPluginBase::is_flatbuffers_schema_type(std::string_view encoding) {
487 return normalize_schema_encoding(encoding) == "flatbuffers";
488}
489
490inline bool SchemaPluginBase::cache_schema_data_locked(const SchemaData& schema) {
491 if VUNLIKELY (schema.name.empty() || schema.encoding.empty()) {
492 return false;
493 }
494
495#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
496 if (is_flatbuffers_schema_type(schema.encoding)) {
497 flatbuffers::Verifier verifier(schema.data.data(), schema.data.size());
498
499 if VUNLIKELY (!reflection::VerifySchemaBuffer(verifier)) {
500 return false;
501 }
502
503 clear_flatbuffers_parser_cache_locked(schema.name);
504 }
505#endif
506
507 auto cached_schema_type = SchemaData::is_valid_type(schema.schema_type) ? schema.schema_type : SchemaType::kUnknown;
508
509 if VUNLIKELY (!SchemaData::is_real_type(cached_schema_type)) {
510 return false;
511 }
512
513 auto& cached_schemas = schema_map_[schema.name];
514
515 for (auto& cached_schema : cached_schemas) {
516 if (cached_schema.schema_type == cached_schema_type) {
517 cached_schema = schema;
518 return true;
519 }
520 }
521
522 cached_schemas.emplace_back(schema);
523 return true;
524}
525
526#ifdef VLINK_HAS_SCHEMA_PLUGIN_PROTOBUF
527inline SchemaPluginInterface::ProtobufDescriptorPtr SchemaPluginBase::find_protobuf_descriptor_locked(
528 const std::string& name) {
529 auto iter = protobuf_descriptor_map_.find(name);
530
531 if (iter != protobuf_descriptor_map_.end()) {
532 return iter->second;
533 }
534
535 const auto* desc = google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(name);
536
537 if VUNLIKELY (!desc) {
538 return nullptr;
539 }
540
541 auto* target_ptr = reinterpret_cast<ProtobufDescriptorPtr>(const_cast<google::protobuf::Descriptor*>(desc));
542 protobuf_descriptor_map_.emplace(name, target_ptr);
543 return target_ptr;
544}
545
546inline SchemaData SchemaPluginBase::build_protobuf_schema_data(const google::protobuf::Descriptor& descriptor) {
547 SchemaData schema;
548 schema.name = descriptor.full_name();
549 schema.encoding = "protobuf";
550 schema.schema_type = SchemaType::kProtobuf;
551
552 if VUNLIKELY (!descriptor.file()) {
553 schema.encoding.clear();
554 schema.schema_type = SchemaType::kUnknown;
555 return schema;
556 }
557
558 google::protobuf::FileDescriptorSet proto_fd_set;
559 std::queue<const google::protobuf::FileDescriptor*> to_add;
560
561 to_add.push(descriptor.file());
562
563#if GOOGLE_PROTOBUF_VERSION >= 6030000
564 std::unordered_set<std::string_view> seen_dependencies;
565#else
566 std::unordered_set<std::string> seen_dependencies;
567#endif
568
569 seen_dependencies.insert(descriptor.file()->name());
570
571 while (!to_add.empty()) {
572 const auto* next = to_add.front();
573 to_add.pop();
574
575 next->CopyTo(proto_fd_set.add_file());
576
577 for (int i = 0; i < next->dependency_count(); ++i) {
578 const auto* dep = next->dependency(i);
579
580 if (dep == nullptr || seen_dependencies.find(dep->name()) != seen_dependencies.end()) {
581 continue;
582 }
583
584 seen_dependencies.insert(dep->name());
585 to_add.push(dep);
586 }
587 }
588
589 schema.data = Bytes::create(proto_fd_set.ByteSizeLong());
590
591 if VUNLIKELY (!proto_fd_set.SerializeToArray(schema.data.data(), schema.data.size())) {
592 schema.encoding.clear();
593 schema.schema_type = SchemaType::kUnknown;
594 schema.data.clear();
595 }
596
597 return schema;
598}
599#endif
600
601#ifdef VLINK_HAS_SCHEMA_PLUGIN_FLATBUFFERS
602inline void SchemaPluginBase::import_all_flatbuffers_schema_data_locked() {
603 for (const auto& schema : FlatbuffersRegistry::get().get_all_schemas()) {
604 (void)cache_schema_data_locked(schema);
605 }
606}
607
608inline bool SchemaPluginBase::import_flatbuffers_schema_data_locked(const std::string& name) {
609 auto schema = FlatbuffersRegistry::get().search_schema(name);
610
611 if (!is_flatbuffers_schema_type(schema.encoding) || schema.data.empty()) {
612 return false;
613 }
614
615 return cache_schema_data_locked(schema);
616}
617
618inline const SchemaData* SchemaPluginBase::find_cached_flatbuffers_schema_locked(const std::string& name) const {
619 auto* self = const_cast<SchemaPluginBase*>(this);
620 auto iter = self->schema_map_.find(name);
621
622 if (iter == self->schema_map_.end()) {
623 (void)self->import_flatbuffers_schema_data_locked(name);
624 iter = self->schema_map_.find(name);
625 }
626
627 if (iter == self->schema_map_.end()) {
628 return nullptr;
629 }
630
631 for (const auto& schema : iter->second) {
632 if (schema.schema_type != SchemaType::kFlatbuffers) {
633 continue;
634 }
635
636 if (!is_flatbuffers_schema_type(schema.encoding) || schema.data.empty()) {
637 return nullptr;
638 }
639
640 return &schema;
641 }
642
643 return nullptr;
644}
645
646inline void SchemaPluginBase::clear_flatbuffers_parser_cache_locked(const std::string& name) {
647 auto iter = flatbuffers_parser_map_.find(name);
648
649 if (iter == flatbuffers_parser_map_.end()) {
650 return;
651 }
652
653 delete reinterpret_cast<flatbuffers::Parser*>(iter->second);
654 flatbuffers_parser_map_.erase(iter);
655}
656#endif
657
658} // namespace vlink
Process-local registry for BFBS blobs compiled into the current library.
String, number, hash and formatting utility functions.
#define VUNLIKELY(...)
Shorthand alias for VLINK_UNLIKELY. Hints that the expression is unlikely true.
Definition macros.h:302
#define VLINK_DISALLOW_COPY_AND_ASSIGN(classname)
Deletes the copy constructor and copy-assignment operator of classname.
Definition macros.h:184
Protobuf runtime include wrapper for schema-plugin support.
Runtime schema plugin interface for Protobuf and FlatBuffers metadata loading.
Definition serializer-inl.h:139