Reco // Nir Barak & Shay Nehmad
Go Israel November 2022 Meetup
We at Reco committed from day one to using Protobuf and schema-first design. This led to a simple and hard development process with a shift-left mentality. It’s worth your time to learn how to do it in your projects and companies, as well.
syntax="proto3";
package data;
import "google/protobuf/timestamp.proto";
import "validate/validate.proto";
enum Status {
UNSPECIFIED = 0;
IN_PROCESS = 1;
DONE = 2;
}
/* Terrific Proto message. The greatest protobuf message in the
history of protobuf messages, maybe ever. */
message Reco {
string id = 1;
repeated int32 count = 2;
map<string, bool> flags = 3;
optional bytes raw_data = 4;
oneof dto {
Status status = 5; // look, Ma! I'm referencing an enum!
string error = 6; // unless I’ve failed horribly and I’m an error
}
google.protobuf.Timestamp created_at = 7
[(validate.rules).timestamp.within.seconds = 86400]; // 1 day
}
Golang!
// Terrific Proto message. The best protobuf message in the history of protobuf messages, maybe ever.
type Reco struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Count []int32 `protobuf:"varint,2,rep,packed,name=count,proto3" json:"count,omitempty"`
Flags map[string]bool `protobuf:"bytes,3,rep,name=flags,proto3" json:"flags,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
RawData []byte `protobuf:"bytes,5,opt,name=raw_data,json=rawData,proto3,oneof" json:"raw_data,omitempty"`
// Types that are assignable to Dto:
// *Reco_Status
// *Reco_Error
Dto isReco_Dto `protobuf_oneof:"dto"`
CreatedAt *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` // 1 day
}
Python (betterproto)
@dataclass(eq=False, repr=False)
class Reco(betterproto.Message):
"""
Terrific Proto message. The best protobuf message in the history of
protobuf messages, maybe ever.
"""
id: str = betterproto.string_field(1)
count: List[int] = betterproto.int32_field(2)
flags: Dict[str, bool] = betterproto.map_field(3, betterproto.TYPE_STRING, betterproto.TYPE_BOOL)
raw_data: bytes = betterproto.bytes_field(5, group="_raw_data")
status: "Status" = betterproto.enum_field(6, group="dto")
error: str = betterproto.string_field(7, group="dto")
created_at: datetime = betterproto.message_field(8)
Scala (ScalaPB)
/** Terrific Proto message. The best protobuf message in the history of protobuf messages, maybe ever.
* @param createdAt 1 day
*/
@SerialVersionUID(0L)
final case class Reco(
id: _root_.scala.Predef.String = "",
count: _root_.scala.Seq[_root_.scala.Int] = _root_.scala.Seq.empty,
flags: _root_.scala.collection.immutable.Map[_root_.scala.Predef.String, _root_.scala.Boolean] = _root_.scala.collection.immutable.Map.empty,
rawData: _root_.scala.Option[_root_.com.google.protobuf.ByteString] = _root_.scala.None,
dto: data.models.reco.Reco.Dto = data.models.reco.Reco.Dto.Empty,
createdAt: _root_.scala.Option[com.google.protobuf.timestamp.Timestamp] = _root_.scala.None,
unknownFields: _root_.scalapb.UnknownFieldSet = _root_.scalapb.UnknownFieldSet.empty
) extends scalapb.GeneratedMessage with scalapb.lenses.Updatable[Reco]
ocer
's. Everybody knows proto at this point.
Services up: 2
json.Parse()
. When a change was introduced,
they just added a flag - what's a few if's between friends?
Services up: 2
Services up: 6
Services up: 4
Services up: 16
Services up: 6.5
Features working in prod: 24
CSV
's to collect
data. Their analysts spent 2 weeks proving that a string is indeed
NaN
.
Features working in prod: 7
oneof
to simulate
inheritance
protoc-gen-grpc-gateway
Working with APIs (and building µServices) with Protobuf and gRPC leads to multiple benefits, but the main one is schema-first design. As a team, we focus on what and why, before we focus on how.
Establishing a single source of truth through the use of schema-first design can help align software development. Identifying a common goal, and establishing a source for teams and processes to align themselves with is incredibly important
This strong design philosophy comes with added benefits:
Now let's take a look at HOW we build this pipeline.
📄 services/v1/user_service.proto
service UserService {
rpc AddUser(AddUserRequest) returns (AddUserResponse) {
📄 services/v1/user_service.proto
message AddUserRequest {
string email_address = 1 [(validate.rules).string.email = true];
string name = 2 [(validate.rules).string.min_len = 1];
string profile_photo_url = 3;
}
message AddUserResponse {
User user = 1;
}
Consumers 🍴
Providers 👨🍳
Gateways 📬
We need to generate enums, messages, clients and servers from the service definition to answer all of the needs we have discussed previously. This means multiple standards (gRPC and REST) and multiple languages (OpenAPI, TS, Go, Python).
A combination of Earthly 🌍 and buf
is used to manage
this in a way that works well.
Why Earthly? Containerized builds are a great way to ensure that all devs generate the same code from the same schema.
Why buf
? Dealing with proto
generation
code manually sucks. It's built for Google's monorepo, and
Google's monorepo only. Buf is declarative and super simple.
📄 buf.gen.yaml
plugins:
- name: go
out: gen/go
opt: paths=source_relative
- name: go-grpc
out: gen/go
opt: paths=source_relative
- name: validate
out: gen/go
opt:
- paths=source_relative
- lang=go
- name: python_betterproto
out: gen/python
opt: paths=source_relative
- name: js
out: gen/js
opt: import_style=commonjs
- name: grpc-gateway
out: gen/go
opt: paths=source_relative
- name: openapiv2
out: third_party/OpenAPI
Dependencies?
📄 buf.work.yaml
version: v1
directories:
- buf-exports/googleapis
- buf-exports/grpc-gateway
- buf-exports/protoc-gen-validate
- proto
The buf.work.yaml
file is used to define a workspace, where one or more modules can coexist and interoperate within a common directory.
📄 gen/go/reco/services/v1/user_service.pb.go
type AddUserRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
EmailAddress string `protobuf:"bytes,1,opt,name=email_address,json=emailAddress,proto3" json:"email_address,omitempty"`
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
ProfilePhotoUrl string `protobuf:"bytes,3,opt,name=profile_photo_url,json=profilePhotoUrl,proto3" json:"profile_photo_url,omitempty"`
}
📄 gen/go/reco/services/v1/user_service.pb.validate.go
func (m *AddUserRequest) validate(all bool) error {
if m == nil {
return nil
}
var errors []error
// [EDITED FOR SLIDE] Omitted email check
if utf8.RuneCountInString(m.GetName()) < 1 {
err := AddUserRequestValidationError{
field: "Name",
reason: "value length must be at least 1 runes",
}
if !all {
return err
}
errors = append(errors, err)
}
if len(errors) > 0 {
return AddUserRequestMultiError(errors)
}
return nil
}
📄 gen/go/reco/services/v1/user_service_grpc.pb.go
// UserServiceServer is the server API for UserService service.
// All implementations must embed UnimplementedUserServiceServer
// for forward compatibility
type UserServiceServer interface {
AddUser(context.Context, *AddUserRequest) (*AddUserResponse, error)
// ...
}
// UnimplementedUserServiceServer must be embedded to have forward compatible implementations.
type UnimplementedUserServiceServer struct {
}
func (UnimplementedUserServiceServer) AddUser(context.Context, *AddUserRequest) (*AddUserResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AddUser not implemented")
}
📄 gen/go/reco/services/v1/user_service_grpc.pb.go
// UserServiceClient is the client API for UserService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type UserServiceClient interface {
AddUser(ctx context.Context, in *AddUserRequest, opts ...grpc.CallOption) (*AddUserResponse, error)
}
type userServiceClient struct {
cc grpc.ClientConnInterface
}
func NewUserServiceClient(cc grpc.ClientConnInterface) UserServiceClient {
return &userServiceClient{cc}
}
func (c *userServiceClient) AddUser(ctx context.Context, in *AddUserRequest, opts ...grpc.CallOption) (*AddUserResponse, error) {
out := new(AddUserResponse)
err := c.cc.Invoke(ctx, "/reco.services.v1.UserService/AddUser", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
📄 gen/go/reco/services/v1/user_service.pb.gw.go
// RegisterUserServiceHandlerServer registers the http handlers for service UserService to "mux".
func RegisterUserServiceHandlerServer(ctx context.Context, mux *runtime.ServeMux, server UserServiceServer) error {
mux.Handle("POST", pattern_UserService_AddUser_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/reco.services.v1.UserService/AddUser", runtime.WithHTTPPathPattern("/api/v1/users"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_UserService_AddUser_0(rctx, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
ctx = runtime.NewServerMetadataContext(ctx, md)
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
forward_UserService_AddUser_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
📄 third_party/OpenAPI/reco/services/v1/user_service.swagger.json
"/api/v1/users": {
"post": {
"description": "Add a user to the server.",
"operationId": "UserService_AddUser",
"parameters": [
{
"in": "body",
"name": "body",
"required": true,
"schema": {
"$ref": "#/definitions/v1AddUserRequest"
}
}
],
"responses": {
"200": {
"description": "A successful response.",
"schema": {
"$ref": "#/definitions/v1AddUserResponse"
}
},
"default": {
"description": "An unexpected error response.",
"schema": {
"$ref": "#/definitions/googlerpcStatus"
}
}
},
"summary": "Add a user",
"tags": [
"Users"
]
}
}
"v1AddUserRequest": {
"properties": {
"emailAddress": {
"type": "string"
},
"name": {
"type": "string"
},
"profilePhotoUrl": {
"type": "string"
}
},
"type": "object"
},
📄 gen/typescript/swagger_clients/user_service/swagger_client/models/V1AddUserRequest.ts
export interface V1AddUserRequest {
emailAddress?: string;
name?: string;
profilePhotoUrl?: string;
}
export function V1AddUserRequestFromJSON(json: any): V1AddUserRequest {
return V1AddUserRequestFromJSONTyped(json, false);
}
export function V1AddUserRequestFromJSONTyped(json: any, ignoreDiscriminator: boolean): V1AddUserRequest {
if ((json === undefined) || (json === null)) {
return json;
}
return {
'emailAddress': !exists(json, 'emailAddress') ? undefined : json['emailAddress'],
'name': !exists(json, 'name') ? undefined : json['name'],
'profilePhotoUrl': !exists(json, 'profilePhotoUrl') ? undefined : json['profilePhotoUrl'],
};
}
export function V1AddUserRequestToJSON(value?: V1AddUserRequest | null): any {
if (value === undefined) {
return undefined;
}
if (value === null) {
return null;
}
return {
'emailAddress': value.emailAddress,
'name': value.name,
'profilePhotoUrl': value.profilePhotoUrl,
};
}
📄 gen/typescript/swagger_clients/user_service/swagger_client/apis/UsersApi.ts
export class UsersApi extends runtime.BaseAPI {
/**
* Add a user to the server.
* Add a user
*/
async userServiceAddUserRaw(requestParameters: UserServiceAddUserRequest, initOverrides?: RequestInit): Promise> {
if (requestParameters.body === null || requestParameters.body === undefined) {
throw new runtime.RequiredError('body','Required parameter requestParameters.body was null or undefined when calling userServiceAddUser.');
}
const queryParameters: any = {};
const headerParameters: runtime.HTTPHeaders = {};
headerParameters['Content-Type'] = 'application/json';
if (this.configuration && this.configuration.apiKey) {
headerParameters["Authorization"] = this.configuration.apiKey("Authorization"); // bearer authentication
}
const response = await this.request({
path: `/api/v1/users`,
method: 'POST',
headers: headerParameters,
query: queryParameters,
body: V1AddUserRequestToJSON(requestParameters.body),
}, initOverrides);
return new runtime.JSONApiResponse(response, (jsonValue) => V1AddUserResponseFromJSON(jsonValue));
}
So we looked at APIs. What about data?
oneof
!oneof
!)
message Entity {
string id = 1;
google.protobuf.Timestamp created_at = 2;
data.api.Source source = 3;
oneof dto {
data.models.Node node = 4;
data.models.Edge edge = 5;
}
}
/** Events of the user.
Note that the order of each event
is important! **/
message Changelog {
string id = 1;
data.api.EventType eventType = 3;
oneof event {
data.models.LoginEvent loginEvent = 5;
data.models.Request request = 6;
... // more event types
}
}
buf
You can reach out (and get the slides) at our R&D blog .