How to use protobuf to shift-left your entire schema management for multiple disciplines

Reco // Nir Barak & Shay Nehmad
Gophercon Israel February 2023

Who are we?

We at Reco committed from day one to using Protobuf and schema-first design. This led to a simple and hard development process with a shift-left mentality. It’s worth your time to learn how to do it in your projects and companies, as well.

What are we going to talk about?

  1. Protobuf in a nutshell, and why we like it
  2. "A tale of two companies"
  3. APIs and µServices
  4. Data pipelines

Protobuf in a nutshell

  • Language-neutral data interchange format
  • Declarative - structured & strongly typed
  • Maintainable - forwards/backwards compatible
  • DevEx - inheritance, imports, commenting
  • GitOps - documentation, validations, linters

Protobuf in a nutshell

TLDR; just look at it

              package data;

              import "google/protobuf/timestamp.proto";
              import "validate/validate.proto";

              enum Status {
                 UNSPECIFIED = 0;
                 IN_PROCESS = 1;
                 DONE = 2;

              /* Terrific Proto message. The greatest protobuf message in the
                 history of protobuf messages, maybe ever. */
              message Reco {
                 string id = 1;
                 repeated int32 count = 2;
                 map<string, bool> flags = 3;
                 optional bytes raw_data = 4;
                 oneof dto {
                     Status status = 5; // look, Ma! I'm referencing an enum!
                     string error = 6; // unless I’ve failed horribly and I’m an error
                 google.protobuf.Timestamp created_at = 7
                    [(validate.rules).timestamp.within.seconds = 86400]; // 1 day



            // Terrific Proto message. The best protobuf message in the history of protobuf messages, maybe ever.
            type Reco struct {
              state         protoimpl.MessageState
              sizeCache     protoimpl.SizeCache
              unknownFields protoimpl.UnknownFields

              Id      string          `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
              Count   []int32         `protobuf:"varint,2,rep,packed,name=count,proto3" json:"count,omitempty"`
              Flags   map[string]bool `protobuf:"bytes,3,rep,name=flags,proto3" json:"flags,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
              RawData []byte          `protobuf:"bytes,5,opt,name=raw_data,json=rawData,proto3,oneof" json:"raw_data,omitempty"`
              // Types that are assignable to Dto:
              // *Reco_Status
              // *Reco_Error
              Dto       isReco_Dto             `protobuf_oneof:"dto"`
              CreatedAt *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` // 1 day


Python (betterproto)

          @dataclass(eq=False, repr=False)
          class Reco(betterproto.Message):
             Terrific Proto message. The best protobuf message in the history of
             protobuf messages, maybe ever.

             id: str = betterproto.string_field(1)
             count: List[int] = betterproto.int32_field(2)
             flags: Dict[str, bool] = betterproto.map_field(3, betterproto.TYPE_STRING, betterproto.TYPE_BOOL)
             raw_data: bytes = betterproto.bytes_field(5, group="_raw_data")
             status: "Status" = betterproto.enum_field(6, group="dto")
             error: str = betterproto.string_field(7, group="dto")
             created_at: datetime = betterproto.message_field(8)


Scala (ScalaPB)

            /** Terrific Proto message. The best protobuf message in the history of protobuf messages, maybe ever.
             * @param createdAt 1 day
            final case class Reco(
               id: _root_.scala.Predef.String = "",
               count: _root_.scala.Seq[_root_.scala.Int] = _root_.scala.Seq.empty,
               flags: _root_.scala.collection.immutable.Map[_root_.scala.Predef.String, _root_.scala.Boolean] = _root_.scala.collection.immutable.Map.empty,
               rawData: _root_.scala.Option[] = _root_.scala.None,
               dto: data.models.reco.Reco.Dto = data.models.reco.Reco.Dto.Empty,
               createdAt: _root_.scala.Option[] = _root_.scala.None,
               unknownFields: _root_.scalapb.UnknownFieldSet = _root_.scalapb.UnknownFieldSet.empty
               ) extends scalapb.GeneratedMessage with scalapb.lenses.Updatable[Reco]

Required gratuitous benchmark

Source: How Uber Engineering Evaluated JSON Encoding and Compression Algorithms to Put the Squeeze on Trip Data

Why we really, really (really) like Protobuf

  • Supported Everywhere (Go, Python, Scala… Rust!)
  • Native Kafka support - no more Schema Registry!
  • Forwards and backwards compatible
  • Validations, linters, zero-values (oh my!)
  • Imports! Optionals! Inheritance! (well, inheritance-ish. We’ll get to that)

What's in it for me?

Have Kafka? Use Proto
  • Backwards compatible - ignore new fields
  • Forwards compatible - zero values (a la Go)
  • Multi-schema topics - oneof to simulate inheritance
  • Topic definitions - single source of truth
Have a DB? Use Proto
  • Schema enforcement, strongly typed
  • Simple mapping Proto <=> Go (we use Bun)
  • Runtime validations
Serving UI? Use... REST
  • State of gRPC in the browser...
  • Still, build it in gRPC!
  • Use protoc-gen-grpc-gateway
  • Generate OpenAPI specs
  • Generate client libraries from OpenAPI specs

A tale of two companies...

Day 0...

Day 1: "Getting Started"

reco 👼
Reco started setting up protobuf and gRPC. They paid the price in the beginning.
ocer 😈
Ocer just use jsons, nothing will block us! Their first service was online a week faster.

Day 10: "First service"

reco 👼
Reco's first service is online, and look exactly the same as ocer's. Everybody knows proto at this point.
ocer 😈
First service has been up for a week. New requirements are being added and the team is trying to answer each one.

Day 30: "Multiple services"

Day 30: "Multiple services"

reco 👼
Reco added a new service, and it's already connected to the old one via gRPC API. When a change is introduced to the API, it's propagated to the new services.
Services up: 2
ocer 😈
Ocer added a new service, and connecting the first time was a simple json.Parse(). When a change was introduced, they just added a flag.
Services up: 2

Day 100: "Polyglot"

reco 👼
Reco needed to add Python to the stack for an existing service that was originally Go. They did it in a few hours.
Services up: 6
ocer 😈
Ocer needed to add Python to the stack for an existing service that was originally Go. It took 4 weeks.
Services up: 4

Day 200: "API changes"

reco 👼
An API change was introduced, and it propagated to all services, clients, data pipelines, etc. automatically. The team is out for a beer.
Services up: 16
ocer 😈
An API change was introduced. Customer support budget in the company was increased 200%.
Services up: 6.5

Day 365: "Data science"

reco 👼
The data scientists are using gRPC with services and proto schemas to collect data from DBs and Topics.
Features working in prod:
ocer 😈
Ocer's data science team export CSV's to collect data. Their analysts spent 2 weeks proving string is NaN.
Features working in prod:

APIs and µServices

Working with APIs (and building µServices) with Protobuf and gRPC leads to multiple benefits, but the main one is schema-first design. As a team, we focus on what and why, before we focus on how.

Establishing a single source of truth through the use of schema-first design can help align software development. Identifying a common goal, and establishing a source for teams and processes to align themselves with is incredibly important
Source: Nordic APIS, "Using A Schema-First Design As Your Single Source of Truth"

This strong design philosophy comes with added benefits:

  • Division of labor
  • Backwards-or-forwards compatible, if you want it
  • Single source of truth, checked into git: shift-left. The compiler/transpiler/type checker validates you.
  • Testability and traceability - we have best practices for all! Don't reinvent wheels.

Now let's take a look at HOW we build this pipeline.

graph LR g(gRPC file)

📄 services/v1/user_service.proto

              service UserService {
                  rpc AddUser(AddUserRequest) returns (AddUserResponse) {

📄 services/v1/user_service.proto

              message AddUserRequest {
                string email_address = 1 [(validate.rules) = true];
                string name = 2 [(validate.rules).string.min_len = 1];
                string profile_photo_url = 3;
              message AddUserResponse {
                User user = 1;

Which components of the stack use the definitions?

Consumers 🍴

Providers 👨‍🍳

Gateways 📬

We need to generate enums, messages, clients and servers from the service definition to answer all of the needs we have discussed previously. This means multiple standards (gRPC and REST) and multiple languages (OpenAPI, TS, Go, Python).

A combination of Earthly 🌍 and buf is used to manage this in a way that works well.

Why Earthly? Containerized builds are a great way to ensure that all devs generate the same code from the same schema.

Why buf? Dealing with proto generation code manually sucks. It's built for Google's monorepo, and Google's monorepo only. Buf is declarative and super simple.

APIs and µServices

📄 buf.gen.yaml

  - name: go
    out: gen/go
    opt: paths=source_relative
  - name: go-grpc
    out: gen/go
    opt: paths=source_relative
  - name: validate
    out: gen/go
      - paths=source_relative
      - lang=go
  - name: python_betterproto
    out: gen/python
    opt: paths=source_relative
  - name: js
    out: gen/js
    opt: import_style=commonjs
  - name: grpc-gateway
    out: gen/go
    opt: paths=source_relative
  - name: openapiv2
    out: third_party/OpenAPI

              version: v1
                - buf-exports/googleapis
                - buf-exports/grpc-gateway
                - buf-exports/protoc-gen-validate
                - proto              

The file is used to define a workspace, where one or more modules can coexist and interoperate within a common directory.

graph LR g(gRPC file) g-->|buf: go|go(go structs) g-->|buf: go-grpc|grpc(go gRPC clients and servers) g-->|buf: validate|valiadtion(go validation middleware) g-->|buf: grpc-gateway|gateway(gRPC gateway layers) g-->|buf: openapiv2|openapi(OpenAPI .json files) g-->|buf: python_betterproto|python(Python clients, servers, types)

📄 gen/go/reco/services/v1/user_service.pb.go

            type AddUserRequest struct {
                state         protoimpl.MessageState
                sizeCache     protoimpl.SizeCache
                unknownFields protoimpl.UnknownFields
                EmailAddress    string `protobuf:"bytes,1,opt,name=email_address,json=emailAddress,proto3" json:"email_address,omitempty"`
                Name            string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
                ProfilePhotoUrl string `protobuf:"bytes,3,opt,name=profile_photo_url,json=profilePhotoUrl,proto3" json:"profile_photo_url,omitempty"`

📄 gen/go/reco/services/v1/user_service.pb.validate.go

            func (m *AddUserRequest) validate(all bool) error {
               if m == nil {
                   return nil
               var errors []error
               // [EDITED FOR SLIDE] Omitted email check

               if utf8.RuneCountInString(m.GetName()) < 1 {
                   err := AddUserRequestValidationError{
                       field:  "Name",
                       reason: "value length must be at least 1 runes",
                   if !all {
                       return err
                   errors = append(errors, err)
               if len(errors) > 0 {
                   return AddUserRequestMultiError(errors)
               return nil

📄 gen/go/reco/services/v1/user_service_grpc.pb.go

              // UserServiceServer is the server API for UserService service.
              // All implementations must embed UnimplementedUserServiceServer
              // for forward compatibility
              type UserServiceServer interface {
                  AddUser(context.Context, *AddUserRequest) (*AddUserResponse, error)
                  // ...
              // UnimplementedUserServiceServer must be embedded to have forward compatible implementations.
              type UnimplementedUserServiceServer struct {
              func (UnimplementedUserServiceServer) AddUser(context.Context, *AddUserRequest) (*AddUserResponse, error) {
                  return nil, status.Errorf(codes.Unimplemented, "method AddUser not implemented")

📄 gen/go/reco/services/v1/user_service_grpc.pb.go

// UserServiceClient is the client API for UserService service.
// For semantics around ctx use and closing/ending streaming RPCs, please refer to
type UserServiceClient interface {
    AddUser(ctx context.Context, in *AddUserRequest, opts ...grpc.CallOption) (*AddUserResponse, error)

type userServiceClient struct {
    cc grpc.ClientConnInterface

func NewUserServiceClient(cc grpc.ClientConnInterface) UserServiceClient {
    return &userServiceClient{cc}

func (c *userServiceClient) AddUser(ctx context.Context, in *AddUserRequest, opts ...grpc.CallOption) (*AddUserResponse, error) {
    out := new(AddUserResponse)
    err :=, "/", in, out, opts...)
    if err != nil {
        return nil, err
    return out, nil

📄 gen/go/reco/services/v1/

// RegisterUserServiceHandlerServer registers the http handlers for service UserService to "mux".
func RegisterUserServiceHandlerServer(ctx context.Context, mux *runtime.ServeMux, server UserServiceServer) error {
    mux.Handle("POST", pattern_UserService_AddUser_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
        ctx, cancel := context.WithCancel(req.Context())
        defer cancel()
        var stream runtime.ServerTransportStream
        ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
        inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
        rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/", runtime.WithHTTPPathPattern("/api/v1/users"))
        if err != nil {
            runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
        resp, md, err := local_request_UserService_AddUser_0(rctx, inboundMarshaler, server, req, pathParams)
        md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
        ctx = runtime.NewServerMetadataContext(ctx, md)
        if err != nil {
            runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)

        forward_UserService_AddUser_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)


📄 third_party/OpenAPI/reco/services/v1/user_service.swagger.json

"/api/v1/users": {
  "post": {
    "description": "Add a user to the server.",
    "operationId": "UserService_AddUser",
    "parameters": [
        "in": "body",
        "name": "body",
        "required": true,
        "schema": {
          "$ref": "#/definitions/v1AddUserRequest"
    "responses": {
      "200": {
        "description": "A successful response.",
        "schema": {
          "$ref": "#/definitions/v1AddUserResponse"
      "default": {
        "description": "An unexpected error response.",
        "schema": {
          "$ref": "#/definitions/googlerpcStatus"
    "summary": "Add a user",
    "tags": [
"v1AddUserRequest": {
  "properties": {
    "emailAddress": {
      "type": "string"
    "name": {
      "type": "string"
    "profilePhotoUrl": {
      "type": "string"
  "type": "object"

graph LR g(gRPC file) g-->|buf: go|go(go structs) g-->|buf: go-grpc|grpc(go gRPC clients and servers) g-->|buf: validate|valiadtion(go validation middleware) g-->|buf: grpc-gateway|gateway(gRPC gateway layers) grpc-->|microgen|micro(µServices skeleton) gateway-->|go mod vendor|gatewayimpl(HTTP gateway implementation) g-->|buf: openapiv2|openapi(OpenAPI .json files) openapi-->|openapi-generator|ts(TypeScript client + server libraries) g-->|buf: python_betterproto|python(Python clients, servers, types)

📄 gen/typescript/swagger_clients/user_service/swagger_client/models/V1AddUserRequest.ts

export interface V1AddUserRequest {
    emailAddress?: string;
    name?: string;
    profilePhotoUrl?: string;

export function V1AddUserRequestFromJSON(json: any): V1AddUserRequest {
  return V1AddUserRequestFromJSONTyped(json, false);

export function V1AddUserRequestFromJSONTyped(json: any, ignoreDiscriminator: boolean): V1AddUserRequest {
  if ((json === undefined) || (json === null)) {
    return json;
  return {
    'emailAddress': !exists(json, 'emailAddress') ? undefined : json['emailAddress'],
    'name': !exists(json, 'name') ? undefined : json['name'],
    'profilePhotoUrl': !exists(json, 'profilePhotoUrl') ? undefined : json['profilePhotoUrl'],

export function V1AddUserRequestToJSON(value?: V1AddUserRequest | null): any {
  if (value === undefined) {
    return undefined;
  if (value === null) {
    return null;
  return {
    'emailAddress': value.emailAddress,
    'profilePhotoUrl': value.profilePhotoUrl,

📄 gen/typescript/swagger_clients/user_service/swagger_client/apis/UsersApi.ts

export class UsersApi extends runtime.BaseAPI {
     * Add a user to the server.
     * Add a user
    async userServiceAddUserRaw(requestParameters: UserServiceAddUserRequest, initOverrides?: RequestInit): Promise> {
        if (requestParameters.body === null || requestParameters.body === undefined) {
            throw new runtime.RequiredError('body','Required parameter requestParameters.body was null or undefined when calling userServiceAddUser.');

        const queryParameters: any = {};

        const headerParameters: runtime.HTTPHeaders = {};

        headerParameters['Content-Type'] = 'application/json';

        if (this.configuration && this.configuration.apiKey) {
            headerParameters["Authorization"] = this.configuration.apiKey("Authorization"); // bearer authentication

        const response = await this.request({
            path: `/api/v1/users`,
            method: 'POST',
            headers: headerParameters,
            query: queryParameters,
            body: V1AddUserRequestToJSON(requestParameters.body),
        }, initOverrides);

        return new runtime.JSONApiResponse(response, (jsonValue) => V1AddUserResponseFromJSON(jsonValue));

So we looked at APIs. What about data?

Data Pipeline @ Reco


Multi-schema topics

(Trust me, I'm a data engineer)

But... Why?

  • Because we can - thanks to the magic of oneof!
  • Enforces ordering (e.g. nodes before edges, changelogs, etc)
  • Simple scaling and balancing across partitions
  • Code-only migrations (new event type? Add it to the oneof!)
              message Entity {
                  string id = 1;
                  google.protobuf.Timestamp created_at = 2;
                  data.api.Source source = 3;
                  oneof dto {
                      data.models.Node node = 4;
                      data.models.Edge edge = 5;
              /** Events of the user.
                  Note that the order of each event
                  is important! **/
              message Changelog {
                  string id = 1;
                  data.api.EventType eventType = 3;
                  oneof event {
                      data.models.LoginEvent loginEvent = 5;
                      data.models.Request request = 6;
                      ... // more event types

But Also... Why Not?

  • Observability - harder to know how much of which type
  • Replayability - need to filter by type (can be done quick at the key-level)
  • Naming - if your topic does all the things, it should be named as such!

Why not JSON? CSV? Avro? XML?

  • JSON - a terrible serialisation tool
    • Hard to read, no commenting, no static types
  • CSV - only marginally less horrible
    • Inferred types? Schema enforcement? Over-the-wire?
  • Avro - like JSON, but typed
    • Missing parts - IDL, Schema Registry
  • XML - well, it’s XML
Comparison of data-serialization formats

Call To Action

1. Proto-fy your schemas
2. Invest in the build pipeline
3. Give us a call, we can help

Thank you! 🙏

You can reach out and get the slides @Reco's R&D blog

Nir Barak
Shay Nehmad