How to use protobuf to shift-left your entire schema management for multiple disciplines

Reco // Nir Barak & Shay Nehmad
Go Israel November 2022 Meetup

Who are we?

We at Reco committed from day one to using Protobuf and schema-first design. This led to a simple and hard development process with a shift-left mentality. It’s worth your time to learn how to do it in your projects and companies, as well.

What are we going to talk about?

  1. Protobuf in a nutshell
  2. "A tale of two companies"
  3. APIs and µServices
  4. Data pipelines

Protobuf in a nutshell

  • Language-neutral data interchange format
  • Declarative - structured & strongly typed
  • Maintainable - forwards/backwards compatible
  • DevEx - inheritance, imports, commenting
  • GitOps - documentation, validations, linters

Protobuf in a nutshell

TLDR; just look at it


              syntax="proto3";
              package data;

              import "google/protobuf/timestamp.proto";
              import "validate/validate.proto";

              enum Status {
                 UNSPECIFIED = 0;
                 IN_PROCESS = 1;
                 DONE = 2;
              }

              /* Terrific Proto message. The greatest protobuf message in the
                 history of protobuf messages, maybe ever. */
              message Reco {
                 string id = 1;
                 repeated int32 count = 2;
                 map<string, bool> flags = 3;
                 optional bytes raw_data = 4;
                 oneof dto {
                     Status status = 5; // look, Ma! I'm referencing an enum!
                     string error = 6; // unless I’ve failed horribly and I’m an error
                 }
                 google.protobuf.Timestamp created_at = 7
                    [(validate.rules).timestamp.within.seconds = 86400]; // 1 day
              }
        

Generated

Golang!


            // Terrific Proto message. The best protobuf message in the history of protobuf messages, maybe ever.
            type Reco struct {
              state         protoimpl.MessageState
              sizeCache     protoimpl.SizeCache
              unknownFields protoimpl.UnknownFields

              Id      string          `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
              Count   []int32         `protobuf:"varint,2,rep,packed,name=count,proto3" json:"count,omitempty"`
              Flags   map[string]bool `protobuf:"bytes,3,rep,name=flags,proto3" json:"flags,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
              RawData []byte          `protobuf:"bytes,5,opt,name=raw_data,json=rawData,proto3,oneof" json:"raw_data,omitempty"`
              // Types that are assignable to Dto:
              // *Reco_Status
              // *Reco_Error
              Dto       isReco_Dto             `protobuf_oneof:"dto"`
              CreatedAt *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` // 1 day
            }
        

Generated

Python (betterproto)


          @dataclass(eq=False, repr=False)
          class Reco(betterproto.Message):
             """
             Terrific Proto message. The best protobuf message in the history of
             protobuf messages, maybe ever.
             """

             id: str = betterproto.string_field(1)
             count: List[int] = betterproto.int32_field(2)
             flags: Dict[str, bool] = betterproto.map_field(3, betterproto.TYPE_STRING, betterproto.TYPE_BOOL)
             raw_data: bytes = betterproto.bytes_field(5, group="_raw_data")
             status: "Status" = betterproto.enum_field(6, group="dto")
             error: str = betterproto.string_field(7, group="dto")
             created_at: datetime = betterproto.message_field(8)
        

Generated

Scala (ScalaPB)


            /** Terrific Proto message. The best protobuf message in the history of protobuf messages, maybe ever.
             * @param createdAt 1 day
             */
            @SerialVersionUID(0L)
            final case class Reco(
               id: _root_.scala.Predef.String = "",
               count: _root_.scala.Seq[_root_.scala.Int] = _root_.scala.Seq.empty,
               flags: _root_.scala.collection.immutable.Map[_root_.scala.Predef.String, _root_.scala.Boolean] = _root_.scala.collection.immutable.Map.empty,
               rawData: _root_.scala.Option[_root_.com.google.protobuf.ByteString] = _root_.scala.None,
               dto: data.models.reco.Reco.Dto = data.models.reco.Reco.Dto.Empty,
               createdAt: _root_.scala.Option[com.google.protobuf.timestamp.Timestamp] = _root_.scala.None,
               unknownFields: _root_.scalapb.UnknownFieldSet = _root_.scalapb.UnknownFieldSet.empty
               ) extends scalapb.GeneratedMessage with scalapb.lenses.Updatable[Reco]
        

Required gratuitous benchmark


Source: How Uber Engineering Evaluated JSON Encoding and Compression Algorithms to Put the Squeeze on Trip Data

A tale of two companies...

A tale of two companies...

Day 0...

A tale of two companies...

Day 1: "Getting Started"

reco 👼
Reco started setting up protobuf and gRPC. They paid the price in the beginning.
ocer 😈
Ocer just use jsons, nothing will block us! Their first service was online a week faster.

A tale of two companies...

Day 10: "First service"

reco 👼
Reco's first service is online, and look exactly the same as ocer's. Everybody knows proto at this point.
ocer 😈
First service has been up for a week. New requirements are being added and the team is trying to answer each one.

A tale of two companies...

Day 30: "Multiple services"

A tale of two companies...

Day 30: "Multiple services"

reco 👼
Reco added a new service, and it's already connected to the old one via gRPC API. When a change is introduced to the API, it's propagated to the new services.
Services up: 2
ocer 😈
Ocer added a new service, and connecting the first time was a simple json.Parse(). When a change was introduced, they just added a flag - what's a few if's between friends?
Services up: 2

A tale of two companies...

Day 100: "Polyglot"

reco 👼
Reco needed to add Python to the stack for an existing service that was originally Go. They did it in a few hours.
Services up: 6
ocer 😈
Ocer needed to add Python to the stack for an existing service that was originally Go. It took 4 weeks.
Services up: 4

A tale of two companies...

Day 200: "API changes"

reco 👼
An API change was introduced, and it was propagated to all the services, clients, data pipelines, and documentation automatically. The team is out for a beer.
Services up: 16
ocer 😈
An API change was introduced. Customer support budget in the company was increased 200%.
Services up: 6.5

A tale of two companies...

Day 365: "Data science"

reco 👼
The data science team are using gRPC to work with services and proto schemas to collect data from Databases and Topics. Notebooks work.
Features working in prod:
24
ocer 😈
Ocer's data science team export CSV's to collect data. Their analysts spent 2 weeks proving that a string is indeed NaN.
Features working in prod:
7

Why we really, really (really) like Protobuf

  • Supported Everywhere (Go, Python, Scala… Rust!)
  • Native Kafka support - no more Schema Registry!
  • Forwards and backwards compatible
  • Validations, linters, zero-values (oh my!)
  • Imports! Optionals! Inheritance! (well, inheritance-ish. We’ll get to that)

What's in it for me?

Have Kafka? Use Proto
  • Backwards compatible - ignore new fields
  • Forwards compatible - zero values (a la Go)
  • Multi-schema topics - oneof to simulate inheritance
  • Topic definitions - single source of truth
Have a DB? Use Proto
  • Schema enforcement, strongly typed
  • Simple mapping Proto <=> Go (we use Bun)
  • Runtime validations
Serving UI? Use... REST
  • State of gRPC in the browser...
  • Still, build it in gRPC!
  • Use protoc-gen-grpc-gateway
  • Generate OpenAPI specs
  • Generate client libraries from OpenAPI specs

APIs and µServices

APIs and µServices

Working with APIs (and building µServices) with Protobuf and gRPC leads to multiple benefits, but the main one is schema-first design. As a team, we focus on what and why, before we focus on how.

APIs and µServices

Establishing a single source of truth through the use of schema-first design can help align software development. Identifying a common goal, and establishing a source for teams and processes to align themselves with is incredibly important
Source: Nordic APIS, "Using A Schema-First Design As Your Single Source of Truth"

APIs and µServices

This strong design philosophy comes with added benefits:

  • Division of labor
  • Backwards-or-forwards compatible, if you want it
  • Single source of truth, checked into git: shift-left. The compiler/transpiler/type checker validates you.
  • Testability and traceability - we have best practices for all! Don't reinvent wheels.

APIs and µServices

Now let's take a look at HOW we build this pipeline.

APIs and µServices

graph LR g(gRPC file)

APIs and µServices

📄 services/v1/user_service.proto

            
            
              service UserService {
                  rpc AddUser(AddUserRequest) returns (AddUserResponse) {
        
      

APIs and µServices

📄 services/v1/user_service.proto

            
              message AddUserRequest {
                string email_address = 1 [(validate.rules).string.email = true];
                string name = 2 [(validate.rules).string.min_len = 1];
                string profile_photo_url = 3;
              }
              
              message AddUserResponse {
                User user = 1;
              }
        
      

APIs and µServices

Which components of the stack use the definitions?

Consumers 🍴

Providers 👨‍🍳

Gateways 📬

APIs and µServices

We need to generate enums, messages, clients and servers from the service definition to answer all of the needs we have discussed previously. This means multiple standards (gRPC and REST) and multiple languages (OpenAPI, TS, Go, Python).


A combination of Earthly 🌍 and buf is used to manage this in a way that works well.

APIs and µServices

Why Earthly? Containerized builds are a great way to ensure that all devs generate the same code from the same schema.

APIs and µServices

Why buf? Dealing with proto generation code manually sucks. It's built for Google's monorepo, and Google's monorepo only. Buf is declarative and super simple.

APIs and µServices

📄 buf.gen.yaml

            
plugins:
  - name: go
    out: gen/go
    opt: paths=source_relative
  - name: go-grpc
    out: gen/go
    opt: paths=source_relative
  - name: validate
    out: gen/go
    opt:
      - paths=source_relative
      - lang=go
  - name: python_betterproto
    out: gen/python
    opt: paths=source_relative
  - name: js
    out: gen/js
    opt: import_style=commonjs
  - name: grpc-gateway
    out: gen/go
    opt: paths=source_relative
  - name: openapiv2
    out: third_party/OpenAPI
            
          

APIs and µServices

Dependencies?

📄 buf.work.yaml

            
              version: v1
              directories:
                - buf-exports/googleapis
                - buf-exports/grpc-gateway
                - buf-exports/protoc-gen-validate
                - proto              
            
          

The buf.work.yaml file is used to define a workspace, where one or more modules can coexist and interoperate within a common directory.

APIs and µServices

graph LR g(gRPC file) g-->|buf: go|go(go structs) g-->|buf: go-grpc|grpc(go gRPC clients and servers) g-->|buf: validate|valiadtion(go validation middleware) g-->|buf: grpc-gateway|gateway(gRPC gateway layers) g-->|buf: openapiv2|openapi(OpenAPI .json files) g-->|buf: python_betterproto|python(Python clients, servers, types)

APIs and µServices

📄 gen/go/reco/services/v1/user_service.pb.go

            
            type AddUserRequest struct {
                state         protoimpl.MessageState
                sizeCache     protoimpl.SizeCache
                unknownFields protoimpl.UnknownFields
             
                EmailAddress    string `protobuf:"bytes,1,opt,name=email_address,json=emailAddress,proto3" json:"email_address,omitempty"`
                Name            string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
                ProfilePhotoUrl string `protobuf:"bytes,3,opt,name=profile_photo_url,json=profilePhotoUrl,proto3" json:"profile_photo_url,omitempty"`
            }
            
          

APIs and µServices

📄 gen/go/reco/services/v1/user_service.pb.validate.go

            
            func (m *AddUserRequest) validate(all bool) error {
               if m == nil {
                   return nil
               }
          
               var errors []error
          
               // [EDITED FOR SLIDE] Omitted email check

               if utf8.RuneCountInString(m.GetName()) < 1 {
                   err := AddUserRequestValidationError{
                       field:  "Name",
                       reason: "value length must be at least 1 runes",
                   }
                   if !all {
                       return err
                   }
                   errors = append(errors, err)
              }    
               if len(errors) > 0 {
                   return AddUserRequestMultiError(errors)
               }
               return nil
            }
            
          

APIs and µServices

📄 gen/go/reco/services/v1/user_service_grpc.pb.go

            
              // UserServiceServer is the server API for UserService service.
              // All implementations must embed UnimplementedUserServiceServer
              // for forward compatibility
              type UserServiceServer interface {
                  AddUser(context.Context, *AddUserRequest) (*AddUserResponse, error)
                  // ...
              }
              
              // UnimplementedUserServiceServer must be embedded to have forward compatible implementations.
              type UnimplementedUserServiceServer struct {
              }
 
              func (UnimplementedUserServiceServer) AddUser(context.Context, *AddUserRequest) (*AddUserResponse, error) {
                  return nil, status.Errorf(codes.Unimplemented, "method AddUser not implemented")
              }
            
          

APIs and µServices

📄 gen/go/reco/services/v1/user_service_grpc.pb.go

            
// UserServiceClient is the client API for UserService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type UserServiceClient interface {
    AddUser(ctx context.Context, in *AddUserRequest, opts ...grpc.CallOption) (*AddUserResponse, error)
}

type userServiceClient struct {
    cc grpc.ClientConnInterface
}

func NewUserServiceClient(cc grpc.ClientConnInterface) UserServiceClient {
    return &userServiceClient{cc}
}

func (c *userServiceClient) AddUser(ctx context.Context, in *AddUserRequest, opts ...grpc.CallOption) (*AddUserResponse, error) {
    out := new(AddUserResponse)
    err := c.cc.Invoke(ctx, "/reco.services.v1.UserService/AddUser", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}
            
          

APIs and µServices

📄 gen/go/reco/services/v1/user_service.pb.gw.go

            
// RegisterUserServiceHandlerServer registers the http handlers for service UserService to "mux".
func RegisterUserServiceHandlerServer(ctx context.Context, mux *runtime.ServeMux, server UserServiceServer) error {
    mux.Handle("POST", pattern_UserService_AddUser_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
        ctx, cancel := context.WithCancel(req.Context())
        defer cancel()
        var stream runtime.ServerTransportStream
        ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
        inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
        rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/reco.services.v1.UserService/AddUser", runtime.WithHTTPPathPattern("/api/v1/users"))
        if err != nil {
            runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
            return
        }
        resp, md, err := local_request_UserService_AddUser_0(rctx, inboundMarshaler, server, req, pathParams)
        md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
        ctx = runtime.NewServerMetadataContext(ctx, md)
        if err != nil {
            runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
            return
        }

        forward_UserService_AddUser_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)

    })
            
          

APIs and µServices

📄 third_party/OpenAPI/reco/services/v1/user_service.swagger.json

            
"/api/v1/users": {
  "post": {
    "description": "Add a user to the server.",
    "operationId": "UserService_AddUser",
    "parameters": [
      {
        "in": "body",
        "name": "body",
        "required": true,
        "schema": {
          "$ref": "#/definitions/v1AddUserRequest"
        }
      }
    ],
    "responses": {
      "200": {
        "description": "A successful response.",
        "schema": {
          "$ref": "#/definitions/v1AddUserResponse"
        }
      },
      "default": {
        "description": "An unexpected error response.",
        "schema": {
          "$ref": "#/definitions/googlerpcStatus"
        }
      }
    },
    "summary": "Add a user",
    "tags": [
      "Users"
    ]
  }
}
"v1AddUserRequest": {
  "properties": {
    "emailAddress": {
      "type": "string"
    },
    "name": {
      "type": "string"
    },
    "profilePhotoUrl": {
      "type": "string"
    }
  },
  "type": "object"
},
            
          

APIs and µServices

graph LR g(gRPC file) g-->|buf: go|go(go structs) g-->|buf: go-grpc|grpc(go gRPC clients and servers) g-->|buf: validate|valiadtion(go validation middleware) g-->|buf: grpc-gateway|gateway(gRPC gateway layers) grpc-->|microgen|micro(µServices skeleton) gateway-->|go mod vendor|gatewayimpl(HTTP gateway implementation) g-->|buf: openapiv2|openapi(OpenAPI .json files) openapi-->|openapi-generator|ts(TypeScript client + server libraries) g-->|buf: python_betterproto|python(Python clients, servers, types)

APIs and µServices

📄 gen/typescript/swagger_clients/user_service/swagger_client/models/V1AddUserRequest.ts

            
export interface V1AddUserRequest {
    emailAddress?: string;
    name?: string;
    profilePhotoUrl?: string;
}

export function V1AddUserRequestFromJSON(json: any): V1AddUserRequest {
  return V1AddUserRequestFromJSONTyped(json, false);
}

export function V1AddUserRequestFromJSONTyped(json: any, ignoreDiscriminator: boolean): V1AddUserRequest {
  if ((json === undefined) || (json === null)) {
    return json;
  }
  return {
    'emailAddress': !exists(json, 'emailAddress') ? undefined : json['emailAddress'],
    'name': !exists(json, 'name') ? undefined : json['name'],
    'profilePhotoUrl': !exists(json, 'profilePhotoUrl') ? undefined : json['profilePhotoUrl'],
  };
}

export function V1AddUserRequestToJSON(value?: V1AddUserRequest | null): any {
  if (value === undefined) {
    return undefined;
  }
  if (value === null) {
    return null;
  }
  return {
    'emailAddress': value.emailAddress,
    'name': value.name,
    'profilePhotoUrl': value.profilePhotoUrl,
  };
}
            
          

APIs and µServices

📄 gen/typescript/swagger_clients/user_service/swagger_client/apis/UsersApi.ts

            
export class UsersApi extends runtime.BaseAPI {
    /**
     * Add a user to the server.
     * Add a user
     */
    async userServiceAddUserRaw(requestParameters: UserServiceAddUserRequest, initOverrides?: RequestInit): Promise> {
        if (requestParameters.body === null || requestParameters.body === undefined) {
            throw new runtime.RequiredError('body','Required parameter requestParameters.body was null or undefined when calling userServiceAddUser.');
        }

        const queryParameters: any = {};

        const headerParameters: runtime.HTTPHeaders = {};

        headerParameters['Content-Type'] = 'application/json';

        if (this.configuration && this.configuration.apiKey) {
            headerParameters["Authorization"] = this.configuration.apiKey("Authorization"); // bearer authentication
        }

        const response = await this.request({
            path: `/api/v1/users`,
            method: 'POST',
            headers: headerParameters,
            query: queryParameters,
            body: V1AddUserRequestToJSON(requestParameters.body),
        }, initOverrides);

        return new runtime.JSONApiResponse(response, (jsonValue) => V1AddUserResponseFromJSON(jsonValue));
    }
            
          

So we looked at APIs. What about data?

Data Pipeline @ Reco

Architecture

Multi-schema topics

(Trust me, I'm a data engineer)

But... Why?

  • Because we can - thanks to the magic of oneof!
  • Enforces ordering (e.g. nodes before edges, changelogs, etc)
  • Simple scaling and balancing across partitions
  • Code-only migrations (new event type? Add it to the oneof!)
            
              message Entity {
                  string id = 1;
                  google.protobuf.Timestamp created_at = 2;
                  data.api.Source source = 3;
                  oneof dto {
                      data.models.Node node = 4;
                      data.models.Edge edge = 5;
                  }
              }
            
          
            
              /** Events of the user.
                  Note that the order of each event
                  is important! **/
              message Changelog {
                  string id = 1;
                  data.api.EventType eventType = 3;
                  oneof event {
                      data.models.LoginEvent loginEvent = 5;
                      data.models.Request request = 6;
                      ... // more event types
                  }
              }
            
          

But Also... Why Not?

But Also... Why Not?

  • Observability - harder to know how much of which type
  • Replayability - need to filter by type (can be done quick at the key-level)
  • Naming - if your topic does all the things, it should be named as such!

Why not JSON? CSV? Avro? XML?

  • JSON - a terrible serialisation tool
    • Hard to read, no commenting, no static types
  • CSV - only marginally less horrible
    • Inferred types? Schema enforcement? Over-the-wire?
  • Avro - like JSON, but typed
    • Missing parts - IDL, Schema Registry
  • XML - well, it’s XML
Comparison of data-serialization formats

Call To Action

1. Proto-fy your schemas
2. Auto-generate with buf
3. Give us a call, we can help

Thank you! 🙏

You can reach out (and get the slides) at our R&D blog .

Nir
Shay