A glimpse at Kannika Armory's upcoming plugin system
Kannika Armory’s next release (v0.9) is the first version that will include a plugin system. Unlike traditional implementations, our approach requires no SDK and is not bound to any particular programming language, but remains very easy to use.
Why we need a plugin system
At Kannika, we develop a Backup & Restore solution – called Armory – for Kafka-like message brokers. This product already comes with a set of configurable filters you can enable when backing-up or restoring your data. There are timestamp filters, repartitioning filters, etc, but we can never hope to address all of our users' needs with a static set of transformation options.
One perfectly valid use-case that's brought up fairly often is data anonymization.
Say you are running a continuous backup of your production environment and you wish to make a copy of it (ie. restoring it) to your dev environment while stripping away identifying fields about your customers. This implies that Kannika Armory needs to be able to make sense of a record's payload and apply an arbitrary transformation to it. The payload could be anything: json, protobuf, custom binary format... It's impossible to foresee all cases. The only way to be able to handle all situations is to support loading and running arbitrary code the user provides. A plugin system is needed.
Ways to implement a plugin system
Kannika Armory is distributed as a native binary and there are only a handful of ways to load and execute user-supplied code at runtime, each with its advantages and drawbacks.
One of them is embedding an interpreter in your application such as Lua or Python. Users would write a transform(record) -> record
function and the interpreter would run that code. This is a simple and battle-tested solution that has been implemented in many products (nginx, blender, ...) but embedding every runtime into the software is not practicable. A choice has to be made, thus restricting languages that can be used to implement plugins to the runtimes that are embedded. This option also involves a lot of explicit serialization/deserialization from both the host and the guest to pass the record back and forth.
Another solution would be to accept plugins in the form of a native shared library. In this case, the C ABI (Application Binary Interface) becomes the common denominator. Interoperability with C is well supported in many languages but as anyone who has ever written bindings to a C library would know, producing such a library is not for the faint-hearted, and coming up with a good serialization strategy to pass data between language boundaries is also necessary in this case. Moreover, you might face incompatibilities between platforms (cross-compilation might be required), and the plugin would live in the same memory space as the host application, therefore a plugin misbehaving might crash the whole software.
hat's how plugin systems in native applications are commonly implemented.
One other way that addresses the main shortcomings of the two previous ideas is using a new development in the WebAssembly (Wasm) space called the WebAssembly Component Model.
Why The WebAssembly Component Model ?
WebAssembly is a fairly recent technology that defines a portable binary format for code that could be run in a web browser at near-native execution speed. As support for Wasm grew, developers were no longer constrained to using Javascript to implement complex logic on web pages. Instead, they could now implement it as a library using their preferred language, (pre-)compile it to Wasm, and invoke it in the client's browser with only a thin Javascript layer.
The properties of WebAssembly, however, make it very interesting beyond just the web, and stand-alone WebAssembly runtimes (Wasmtime, Wasmer, Wazero, ...) have emerged, allowing people to run Wasm code outside of the browser. If you embed such a runtime into your software, then you have the means to load and run arbitrary code, in a sandbox, to customize the software's behavior. ie. a plugin system.
One possible way to implement such a thing would be to establish an API for plugins your application accepts, and settle on a serialization format for complex types to pass data back and forth between the host system and the guest plugin. WebAssembly, as the name suggests, is fairly low level and you can only work with primitive types and memory management has to be done by hand.
In short, an ABI needs to be created and that's a lot of low-level, meticulous work. Typically, you'd write an SDK (that you'd have to maintain) for some set of popular languages (thus restricting your user's choice of language) to handle the gritty details and ease the development of plugins for your software. That's the approach Redpanda uses.
Wouldn't it be nice if we had a standard way to define that API and express complex types without thinking about serialization and maintaining SDKs ? Wouldn't it be even better if all you had to do was to download a specification file so that we could generate the code for our plugin and fill-in the blanks ? This is one of the reasons the WebAssembly Component Model has been designed and that's what we chose to use in Kannika Armory's plugin system.
How does it look in Kannika Armory ?
Let's circle back to our original issue and see how to implement that anonymization plugin in Python.
The WebAssembly Component Model defines an Interface Description Language (IDL) called the Wasm Interface Type (WIT) that lets us define the contract programmatically between Armory's core engine and the plugin.
A slightly simplified version of the contract that will be shipped in Armory's upcoming release looks like this:
package kannika:wasi-plugins@0.1.0;
world wasi-restore-plugin {
// Result of the plugin's transformation
variant outcome {
// The record was accepted by the filter.
accepted(kafka-record),
// The record has been rejected by the plugin's logic.
rejected(kafka-record),
// The record could not be processed for some reason.
error(string)
}
// Elements of a Kafka record
record kafka-record {
key: option<list<u8>>,
headers: list<kafka-record-header>,
partition: option<s32>,
offset: option<s64>,
timestamp: option<kafka-record-timestamp>,
payload: option<list<u8>>,
}
record kafka-record-header {
key: string,
val: option<list<u8>>,
}
record kafka-record-timestamp {
kind: timestamp-kind,
time: s64,
}
enum timestamp-kind {
none,
create-time,
log-append-time,
}
// The context in which the plugins is used.
// * `source-topic` is the original Kafka topic name;
// * `target-topic` is the Kafka topic we're restoring the record to.
record context {
source-topic: string,
target-topic: string,
}
// An initialization function.
//
// `cfg` is an optional yaml document provided to host application
// at launch that can be used to tune the plugin's behavior.
//
// This is called once before any call to `transform`.
export init: func(ctx: context, cfg: option<string>) -> result<_, string>;
// The filtering/transformation applied by the plugin.
export transform: func(rec: kafka-record) -> outcome;
}
Let's take a minute to appreciate how high-level and self-descriptive that is. This establishes two requirements for a plugin: it must export an init function and a transform function. It also defines a few complex types with optional fields, byte arrays, enums, etc.
From that WIT file alone, we can generate the skeleton for our plugin and we'll only have to fill the blanks:
pip install componentize-py
componentize-py -d wit -w wasi-restore-plugin bindings plugin
Here's the code that was generated:
# File: plugin/wasi_restore_plugin/__init__.py
from typing import TypeVar, Generic, Union, Optional, Protocol, Tuple, List, Any, Self
from enum import Flag, Enum, auto
from dataclasses import dataclass
from abc import abstractmethod
import weakref
from .types import Result, Ok, Err, Some
@dataclass
class KafkaRecordHeader:
key: str
val: Optional[bytes]
class TimestampKind(Enum):
NONE = 0
CREATE_TIME = 1
LOG_APPEND_TIME = 2
@dataclass
class KafkaRecordTimestamp:
kind: TimestampKind
time: int
@dataclass
class KafkaRecord:
"""
Elements of a Kafka record
"""
key: Optional[bytes]
headers: List[KafkaRecordHeader]
partition: Optional[int]
offset: Optional[int]
timestamp: Optional[KafkaRecordTimestamp]
payload: Optional[bytes]
@dataclass
class Outcome_Accepted:
value: KafkaRecord
@dataclass
class Outcome_Rejected:
value: KafkaRecord
@dataclass
class Outcome_Error:
value: str
Outcome = Union[Outcome_Accepted, Outcome_Rejected, Outcome_Error]
"""
Result of the plugin's transformation
"""
@dataclass
class Context:
"""
The context in which the plugins is used.
* `source-topic` is the original Kafka topic name;
* `target-topic` is the Kafka topic we're restoring the record to.
"""
source_topic: str
target_topic: str
class WasiRestorePlugin(Protocol):
@abstractmethod
def init(self, ctx: Context, cfg: Optional[str]) -> None:
"""
An initialization function.
`cfg` is an optional yaml document provided to host application
at launch that can be used to tune the plugin's behavior.
This is called once before any call to `transform`.
Raises: `wasi_restore_plugin.types.Err(wasi_restore_plugin.imports.str)`
"""
raise NotImplementedError
@abstractmethod
def transform(self, rec: KafkaRecord) -> Outcome:
"""
The filtering/transformation applied by the plugin.
"""
raise NotImplementedError
Pretty neat. All we have to do is implement the WasiRestorePlugin
class:
import wasi_restore_plugin
from wasi_restore_plugin import Context, KafkaRecord, Outcome, Outcome_Accepted
from wasi_restore_plugin.types import *
class WasiRestorePlugin(wasi_restore_plugin.WasiRestorePlugin):
def init(self, ctx: Context, cfg: Optional[str]) -> None:
return None
def transform(self, rec: KafkaRecord) -> Outcome:
return Outcome_Accepted(rec)
That's the minimal plugin implementation:
- the init function just returns without error;
- the transform function doesn't do anything but accept the record.
Here's the command to build the plugin:
componentize-py --wit-path restore.wit --world wasi-restore-plugin componentize plugin -o plugin.wasm
In Kannika Armory, you would make the produced plugin.wasm
file accessible in kubernetes with a persistent volume claim, and then use it in a Restore
like this:
apiVersion: kannika.io/v1alpha
kind: Restore
metadata:
name: anonymizing-restore
spec:
enabled: true
source: "my-gcs-bucket"
sink: "my-kafka-cluster"
config:
topics:
- target: "target-topic"
source: "source-topic"
plugins:
- wasiPluginRef:
claimName: plugins-store-pvc
path: plugin.wasm
Now it's time to actually implement the logic of our plugin. Let's pretend our records' payload are json documents stored in a Redpanda cluster:
$ rpk topic consume source-topic -o 0:1
{
"topic": "source-topic",
"value": "{ \"firstName\": \"John\", \"lastName\": \"Doe\", \"itemId\": 82, \"date\": \"2024-05-05\", \"price\": 42, \"qty\": 2 }",
"timestamp": 1724761999979,
"partition": 2,
"offset": 0
}
Our goal is to strip away name information from those records (firstName
and lastName
), so let's list those fields in our Restore
configuration:
apiVersion: kannika.io/v1alpha
kind: Restore
metadata:
name: anonymizing-restore
spec:
# <snip>
config:
topics:
- target: "target-topic"
source: "source-topic"
plugins:
- wasiPluginRef:
claimName: plugins-store-pvc
path: plugin.wasm
# An optional Yaml configuration to be passed as-is
# to the plugin at runtime.
# 👇
spec:
fieldsToStrip:
- firstName
- lastName
The plugin's implementation could then be something like the following:
import yaml
import json
import wasi_restore_plugin
from wasi_restore_plugin import Context, KafkaRecord, Outcome, Outcome_Accepted
from wasi_restore_plugin.types import *
# A global to store the configuration parsed in the init() function.
# We can't store state in `self`, hence why we use a global.
fields_to_strip = []
class WasiRestorePlugin(wasi_restore_plugin.WasiRestorePlugin):
def init(self, ctx: Context, cfg: Optional[str]) -> None:
global fields_to_strip
# If a configuration is provided, attempt to extract a list
# of fields to remove from the records' payload.
if cfg is not None:
cfg = yaml.safe_load(cfg)
fields_to_strip = cfg.get('fieldsToStrip', [])
def transform(self, rec: KafkaRecord) -> Outcome:
if len(fields_to_strip) == 0 or rec.payload is None:
return Outcome_Accepted(rec)
# Decode the payload as json
payload = rec.payload.decode('utf-8')
payload = json.loads(payload)
# Strip fields
for field in fields_to_strip:
payload.pop(field, None)
# Re-encode the payload
payload = json.dumps(payload)
rec.payload = payload.encode('utf-8')
# Let the updated record through
return Outcome_Accepted(rec)
Load that into the persistent volume, run your restore, and observe the result:
$ rpk topic consume target-topic -o +1
{
"topic": "target-topic",
"value": "{\"itemId\": 82, \"date\": \"2024-05-05\", \"price\": 42, \"qty\": 2}",
"timestamp": 1724761999979,
"partition": 2,
"offset": 1
}
Closing thoughts
The WebAssembly Component Model is still in its debuts but it has proven to be quite adequate in its current state for designing a complete plugin system with minimal effort. That approach is not bound to any particular language and leverages WebAssembly’s promise on performance. Head over to the documentation website for complete examples in other languages !
We’ve only scratched the surface of what is possible, however. In theory, this technology would even allow polyglot plugins where components implemented in different languages are linked together into a final wasm file.
It’ll be very interesting to see how far Kannika Armory users will take this.