Developing specialized gears
fluxrig is designed as a modular orchestration engine where physical I/O and business logic are encapsulated as Gears.
Before dropping into Go code, remember the Orchestration Spectrum:
- Declarative Logic: The majority of business logic patternsincluding normalization, mapping, and alertingare most efficiently implemented using the Bento Gear. This declarative approach significantly reduces development overhead and long-term maintenance complexity.
- Specialized Protocols: For high-performance protocol drivers (ISO 8583, Modbus), binary packers, or customized network stacks, you develop Native Go Gears.
Prerequisites: The gear contract
A Gear is a modular plugin that satisfies the sdk.NativeGear interface. Unlike generic plugins, native Gears have direct access to the high-fidelity services of the hosting Rack, including structured logging, deterministic ID generation, and the OpenTelemetry signal path.
The lifecycle of a Gear follows a strict state machine to ensure zero-loss operations:
- Init: Strategic configuration mapping and validation.
- Start: Active lifecycle ignition (spawning listeners or dialers).
- Process: Sequential request/response transformation.
- Drain: Graceful input suppression (preparing for shutdown).
- Stop: Absolute resource reclamation.
To guarantee technical fidelity, the following tutorial uses production-grade code derived from the Simple TCP Gear (pkg/gears/native/io_tcp).
Step 1: The architecture contract
Production Gears in fluxrig separate concerns across multiple files: config.go for settings, gear.go for the interface, and implementation files (e.g., server.go).
Every native component must satisfy the sdk.NativeGear interface:
// pkg/gears/native/io_tcp/gear.go
package io_tcp
import (
"context"
"fmt"
"log/slog"
"github.com/jaab-tech/fluxrig/pkg/fluxMsg"
"github.com/jaab-tech/fluxrig/pkg/sdk"
)
// Gear is the outer wrapper that conforms to sdk.NativeGear
type Gear struct {
config *Config
log *slog.Logger
ctx sdk.GearContext
emit func(*fluxMsg.fluxMsg)
impl ModeImpl // Delegated to server or client logic
}
// Ensure interface compliance at compile time
var _ sdk.NativeGear = (*Gear)(nil)
TIP
Compilation Safety: The var _ sdk.NativeGear = (*Gear)(nil) declaration forces the compiler to verify implementation completeness during development.
Step 2: Strategic initialization
The Init method is executed during the Rack's boot sequence. This is where raw configuration is transformed into a validated technical state.
// Init loads configuration and prepares the gear.
func (g *Gear) Init(ctx sdk.GearContext) error {
g.ctx = ctx
g.log = ctx.Logger()
// Manual mapping from generic YAML/TOML
cfg, err := ParseConfig(ctx.Config())
if err != nil {
return fmt.Errorf("io_tcp: invalid config: %w", err)
}
// Hardened validation
if err := cfg.Validate(); err != nil {
return fmt.Errorf("io_tcp: validation failed: %w", err)
}
g.config = cfg
g.log.Info("initialized", "mode", cfg.Mode, "bind", cfg.Bind)
return nil
}
Step 3: Active lifecycle ignition
The Start method begins the Gear's operational life. It provides the emit callback, allowing the Gear to inject signals into the high-performance signal path.
func (g *Gear) Start(ctx context.Context, emit func(*fluxMsg.fluxMsg)) error {
g.log.Info("starting gear")
g.emit = emit
switch g.config.Mode {
case "server":
g.impl = NewServer(g.config, g.log, g.emit, g.ctx.IDGen())
case "client":
g.impl = NewClient(g.config, g.log, g.emit, g.ctx.IDGen())
default:
return fmt.Errorf("io_tcp: unknown mode %q", g.config.Mode)
}
return g.impl.Start(ctx)
}
Step 4: Networking resilience (The Accept Loop)
Robust networking at the edge requires protection against resource exhaustion. Below is the production implementation of the TCP acceptLoop, featuring exponential backoff to prevent spin-loops during OS failure states.
// pkg/gears/native/io_tcp/server.go
func (s *Server) acceptLoop() {
backoff := 5 * time.Millisecond
for {
conn, err := s.listener.Accept()
if err != nil {
if isActive(s.done) {
s.log.Error("accept failed", "error", err)
// Exponential backoff on Accept failure
time.Sleep(backoff)
if backoff < 1*time.Second {
backoff *= 2
}
}
continue
}
backoff = 5 * time.Millisecond // Reset on success
// Connection limits (DoS Protection)
if s.config.MaxConnections > 0 && s.activeConns.Load() >= int64(s.config.MaxConnections) {
s.log.Warn("max connections reached, rejecting")
_ = conn.Close()
continue
}
s.activeConns.Add(1)
go s.handleConn(conn)
}
}
Step 5: Signal ownership and metadata
In a high-concurrency runtime, memory safety is paramount. When reading from hardware, we must explicitly copy the buffer and attach traceable metadata.
func (s *Server) handleConn(conn net.Conn) {
// ... (Setup and ID generation)
scanner := bufio.NewScanner(conn)
for scanner.Scan() {
data := scanner.Bytes()
// Absolute Payload Ownership (Copy-on-Write)
payload := make([]byte, len(data))
copy(payload, data)
msg := fluxMsg.New()
msg.fluxID, _ = s.idGen.NextFluxID()
msg.RawPayload = payload
// Contextual Signal Metadata
msg.Metadata["conn.id"] = connID
msg.Metadata["peer.ip"], msg.Metadata["peer.port"], _ = net.SplitHostPort(conn.RemoteAddr().String())
s.emit(msg) // Signal Injection
}
}
Step 6: Replying to the source
The Process method handles asynchronous signals returning from the pipeline. We use the Signal Metadata to route the response back to the correct physical socket.
func (s *Server) Process(ctx context.Context, msg *fluxMsg.fluxMsg) (*fluxMsg.fluxMsg, error) {
// Resolve session from metadata
connID, ok := msg.Metadata["conn.id"]
if !ok {
return nil, fmt.Errorf("missing conn.id")
}
val, ok := s.conns.Load(connID)
if !ok {
return nil, fmt.Errorf("session %s lost", connID)
}
conn := val.(*Connection)
// High-Fidelity Signal Egress
if len(msg.RawPayload) > 0 {
_, err := conn.conn.Write(msg.RawPayload)
if err != nil {
return nil, fmt.Errorf("write error: %w", err)
}
}
return msg, nil
}
Step 7: Technical verification
Every specialized Gear must undergo High-Fidelity Unit Verification to ensure the logic remains resilient across releases.
// pkg/gears/native/my_gear/gear_test.go
func TestMyGear_Process(t *testing.T) {
g := &MyGear{}
// Mock the institutional context
ctx := sdk.NewMockGearContext(map[string]any{"threshold": 100})
_ = g.Init(ctx)
msg := fluxMsg.New()
msg.RawPayload = []byte("SIGNAL_DATA")
out, err := g.Process(context.Background(), msg)
assert.NoError(t, err)
assert.NotNil(t, out)
}
IMPORTANT
Zero-Loss Shutdown: Always implement the Drain and Stop hooks. Drain signalizes the logic to stop accepting new signals while finishing work-in-progress, ensuring a graceful institutional handover.
Integration: Deployment to the rack
Once registered in the Factory, deploy your Gear by defining it in fluxrig.toml:
[gears.ingress_gateway]
spec = "io_tcp"
config = { mode = "server", bind = "0.0.0.0:8000" }
[bus.routes.ingress_gateway]
targets = ["logic_processor"]
If you see your signal pulses flowing through the Observability Dashboard, you have successfully deployed a production-grade protocol driver to the Sovereign Edge.