11package bes
22
33import (
4+ "context"
45 "fmt"
6+ "io"
7+ "log/slog"
58 "net"
9+
10+ bb_bes "github.com/buildbarn/bb-portal/third_party/bazel/gen/bes"
11+
12+ "google.golang.org/genproto/googleapis/devtools/build/v1"
13+ "google.golang.org/grpc"
14+ "google.golang.org/grpc/codes"
15+ "google.golang.org/grpc/status"
16+ "google.golang.org/protobuf/encoding/protojson"
17+ "google.golang.org/protobuf/types/known/emptypb"
618)
719
820var host = "127.0.0.1"
921var port = 60242 // 0 for OS-allocated
1022
23+ type BuildEventServer struct {
24+ }
25+
1126func Listen () error {
1227 addr := fmt .Sprintf ("%s:%d" , host , port )
1328 listener , err := net .Listen ("tcp" , addr )
@@ -16,7 +31,92 @@ func Listen() error {
1631 }
1732 fmt .Println ("Bazel BES listener: grpc://" + listener .Addr ().String ())
1833
19- // TODO
34+ opts := []grpc.ServerOption {}
35+ grpcServer := grpc .NewServer (opts ... )
36+
37+ build .RegisterPublishBuildEventServer (grpcServer , newServer ())
38+ grpcServer .Serve (listener )
2039
2140 return nil
2241}
42+
43+ func newServer () build.PublishBuildEventServer {
44+ return BuildEventServer {}
45+ }
46+
47+ // PublishLifecycleEvent handles life cycle events.
48+ func (s BuildEventServer ) PublishLifecycleEvent (ctx context.Context , request * build.PublishLifecycleEventRequest ) (* emptypb.Empty , error ) {
49+ slog .DebugContext (ctx , "Received event" , "event" , protojson .Format (request .BuildEvent .GetEvent ()))
50+ return & emptypb.Empty {}, nil
51+ }
52+
53+ // PublishBuildToolEventStream handles a build tool event stream.
54+ // bktec thanks buildbarn/bb-portal for the basis of this :D
55+ func (s BuildEventServer ) PublishBuildToolEventStream (stream build.PublishBuildEvent_PublishBuildToolEventStreamServer ) error {
56+ ctx := stream .Context ()
57+
58+ slog .InfoContext (ctx , "Stream started" )
59+
60+ for {
61+ req , err := stream .Recv ()
62+ if err == io .EOF {
63+ slog .InfoContext (ctx , "Stream finished" )
64+ return nil
65+ } else if err != nil {
66+ slog .ErrorContext (ctx , "Recv failed" , "err" , err )
67+ return err
68+ }
69+
70+ streamID := req .OrderedBuildEvent .GetStreamId ()
71+ seq := req .OrderedBuildEvent .GetSequenceNumber ()
72+
73+ event := req .GetOrderedBuildEvent ().GetEvent ()
74+ slog .DebugContext (ctx , "stream event" , "seq" , seq , "event" , event )
75+
76+ if event .GetBazelEvent () == nil {
77+ slog .DebugContext (ctx , "not a bazel event" , seq , seq )
78+ continue
79+ }
80+
81+ var bazelEvent bb_bes.BuildEvent
82+ if err = event .GetBazelEvent ().UnmarshalTo (& bazelEvent ); err != nil {
83+ //return fmt.Errorf("unmarshaling bazel event: %w", err)
84+ slog .InfoContext (ctx , "error unmarshalling" )
85+ }
86+
87+ // slog.InfoContext(ctx, "unmarshalled bazel event", "event", &bazelEvent)
88+
89+ payload := bazelEvent .GetPayload ()
90+ if testResult , ok := payload .(* bb_bes.BuildEvent_TestResult ); ok {
91+ r := testResult .TestResult
92+ files := []string {}
93+ for _ , x := range r .GetTestActionOutput () {
94+ if x .GetName () == "test.xml" {
95+ files = append (files , x .GetUri ())
96+ }
97+ }
98+ slog .InfoContext (ctx , "TestResult" ,
99+ "status" , r .GetStatus (),
100+ "cached" , r .GetCachedLocally (),
101+ "dur" , r .GetTestAttemptDuration ().AsDuration ().String (),
102+ "files" , files ,
103+ )
104+ }
105+
106+ // ack
107+ ack := & build.PublishBuildToolEventStreamResponse {StreamId : streamID , SequenceNumber : seq }
108+ if err := stream .Send (ack ); err != nil {
109+ grpcErr := status .Convert (err )
110+ if grpcErr .Code () == codes .Unavailable &&
111+ grpcErr .Message () == "transport is closing" {
112+ return nil
113+ }
114+
115+ slog .ErrorContext (ctx , "ack failed" ,
116+ "err" , err ,
117+ "stream" , streamID ,
118+ "seq" , seq ,
119+ )
120+ }
121+ }
122+ }
0 commit comments