Skip to content

Commit fd3260b

Browse files
authored
[DX-3050] add chip fanout router (#2504)
1 parent 451b0b4 commit fd3260b

16 files changed

Lines changed: 1178 additions & 183 deletions

File tree

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Fails when:
2+
# - vulnerabilities are found in the dependency tree with specified severity or grater
3+
#
4+
# Copied from: https://github.com/smartcontractkit/.github/blob/0cc355785130a83a540187b609c5521094baed92/actions/dependency-review/configs/default-vulnerability-check-high.yml
5+
vulnerability_check: true
6+
fail_on_severity: "high" # low, moderate, high, critical
7+
license_check: false
8+
allow-ghsas:
9+
- GHSA-x744-4wpc-v9h2 # we cannot fix until we migrate all Docker deps to Moby
10+
- GHSA-p436-gjf2-799p # we cannot fix until we migrate all Docker deps to Moby

.github/workflows/dependency-review.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@ jobs:
1818
- name: Vulnerability Check
1919
uses: smartcontractkit/.github/actions/dependency-review@0cc355785130a83a540187b609c5521094baed92 # dependency-review@1.0.0
2020
with:
21-
config-preset: default-vulnerability-check-high
21+
config-file: .github/configs/dependency-review.yml

book/src/SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
- [Ton](framework/components/blockchains/ton.md)
4848
- [Storage](framework/components/storage.md)
4949
- [S3](framework/components/storage/s3.md)
50+
- [Chip Router](framework/components/chiprouter/chip_router.md)
5051
- [Chip Ingress Set](framework/components/chipingresset/chip_ingress.md)
5152
- [Troubleshooting](framework/components/troubleshooting.md)
5253
- [Mono Repository Tooling](./monorepo-tools.md)
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Chip Router
2+
3+
`chiprouter` is a small CTF component that owns the fixed ChIP ingress port and fans incoming telemetry out to registered downstream subscribers.
4+
5+
It exists to keep the local CRE topology simple:
6+
- Chainlink nodes always publish to a single ingress owner on `50051`
7+
- lightweight test sinks subscribe behind the router
8+
- real ChIP / Beholder subscribes behind the same router
9+
10+
That removes the old split where some tests bound ingress directly while others started real ChIP.
11+
12+
## Ports
13+
14+
The component exposes:
15+
- admin HTTP: `50050`
16+
- ingress gRPC: `50051`
17+
18+
In the local CRE topology, real ChIP / Beholder typically subscribes downstream on `50053`.
19+
20+
## Image Contract
21+
22+
The component runs whatever image is provided in `chip_router.image`.
23+
24+
The expected local CRE convention is:
25+
- env TOMLs use a local alias such as `chip-router:<commit-sha>`
26+
- setup/pull logic is responsible for making that alias exist locally
27+
- remote ECR image names stay in setup/pull config and are retagged locally to the alias
28+
29+
## Runtime Behavior
30+
31+
The router:
32+
- exposes a health endpoint on `/health`
33+
- accepts subscriber registration over its admin API
34+
- forwards published ChIP ingress requests to all registered subscribers
35+
- is best-effort per subscriber, so one failing downstream does not block others
36+
37+
Host-based downstream subscribers should register host-reachable endpoints. In local CRE, host-local sink endpoints are normalized to the Docker host gateway before registration.

framework/.changeset/v0.15.14.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
- Remove default value for compatibility testing's `buildcmd` param
2+
- Add `CHiP router` component to fanout Beholder events

framework/cmd/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,6 @@ Be aware that any TODO requires your attention before your run the final test!
320320
Name: "buildcmd",
321321
Aliases: []string{"b"},
322322
Usage: "Environment build command",
323-
Value: "just cli",
324323
},
325324
&cli.StringFlag{
326325
Name: "envcmd",
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
FROM golang:1.25.3 AS builder
2+
3+
WORKDIR /src
4+
5+
COPY go.mod go.sum ./
6+
RUN go mod download
7+
8+
COPY . .
9+
10+
ARG TARGETOS=linux
11+
ARG TARGETARCH=amd64
12+
ARG CTF_LOG_LEVEL=info
13+
ENV CTF_LOG_LEVEL=${CTF_LOG_LEVEL}
14+
15+
RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} go build -trimpath -ldflags="-s -w" -o /out/chip-router ./cmd/chip-router
16+
17+
FROM gcr.io/distroless/static-debian12:nonroot
18+
19+
COPY --from=builder /out/chip-router /chip-router
20+
21+
EXPOSE 50050 50051
22+
23+
ENTRYPOINT ["/chip-router"]
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
package chiprouter
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"net/http"
9+
"strings"
10+
"time"
11+
12+
"github.com/docker/docker/api/types/container"
13+
"github.com/docker/go-connections/nat"
14+
"github.com/smartcontractkit/chainlink-testing-framework/framework"
15+
tc "github.com/testcontainers/testcontainers-go"
16+
tcwait "github.com/testcontainers/testcontainers-go/wait"
17+
)
18+
19+
const (
20+
DefaultGRPCPort = 50051
21+
DefaultAdminPort = 50050
22+
DefaultBeholderGRPCPort = 50053
23+
adminPathHealth = "/health"
24+
)
25+
26+
type Input struct {
27+
Image string `toml:"image" comment:"Chip router Docker image"`
28+
GRPCPort int `toml:"grpc_port" comment:"Chip router gRPC host/container port"`
29+
AdminPort int `toml:"admin_port" comment:"Chip router admin HTTP host/container port"`
30+
ContainerName string `toml:"container_name" comment:"Docker container name"`
31+
PullImage bool `toml:"pull_image" comment:"Whether to pull Chip router image or not"`
32+
LogLevel string `toml:"log_level" comment:"Chip router log level (trace, debug, info, warn, error)"`
33+
Out *Output `toml:"out" comment:"Chip router output"`
34+
}
35+
36+
type Output struct {
37+
UseCache bool `toml:"use_cache" comment:"Whether to reuse cached output"`
38+
ContainerName string `toml:"container_name" comment:"Docker container name"`
39+
ExternalGRPCURL string `toml:"grpc_external_url" comment:"Host-reachable gRPC endpoint"`
40+
InternalGRPCURL string `toml:"grpc_internal_url" comment:"Docker-network gRPC endpoint"`
41+
ExternalAdminURL string `toml:"admin_external_url" comment:"Host-reachable admin endpoint"`
42+
InternalAdminURL string `toml:"admin_internal_url" comment:"Docker-network admin endpoint"`
43+
}
44+
45+
type registerSubscriberRequest struct {
46+
Name string `json:"name"`
47+
Endpoint string `json:"endpoint"`
48+
}
49+
50+
type registerSubscriberResponse struct {
51+
ID string `json:"id"`
52+
}
53+
54+
type HealthResponse struct {
55+
}
56+
57+
func defaults(in *Input) {
58+
if in.GRPCPort == 0 {
59+
in.GRPCPort = DefaultGRPCPort
60+
}
61+
if in.AdminPort == 0 {
62+
in.AdminPort = DefaultAdminPort
63+
}
64+
if in.ContainerName == "" {
65+
in.ContainerName = framework.DefaultTCName("chip-router")
66+
}
67+
}
68+
69+
func New(in *Input) (*Output, error) {
70+
return NewWithContext(context.Background(), in)
71+
}
72+
73+
func NewWithContext(ctx context.Context, in *Input) (*Output, error) {
74+
if in.Out != nil && in.Out.UseCache {
75+
return in.Out, nil
76+
}
77+
78+
if strings.TrimSpace(in.Image) == "" {
79+
return nil, fmt.Errorf("chip router image must be provided")
80+
}
81+
82+
defaults(in)
83+
84+
grpcPort := fmt.Sprintf("%d/tcp", in.GRPCPort)
85+
adminPort := fmt.Sprintf("%d/tcp", in.AdminPort)
86+
87+
req := tc.ContainerRequest{
88+
Name: in.ContainerName,
89+
Image: in.Image,
90+
AlwaysPullImage: in.PullImage,
91+
Labels: framework.DefaultTCLabels(),
92+
Networks: []string{framework.DefaultNetworkName},
93+
NetworkAliases: map[string][]string{
94+
framework.DefaultNetworkName: {in.ContainerName},
95+
},
96+
ExposedPorts: []string{grpcPort, adminPort},
97+
Env: map[string]string{
98+
"CHIP_ROUTER_GRPC_ADDR": fmt.Sprintf("0.0.0.0:%d", in.GRPCPort),
99+
"CHIP_ROUTER_ADMIN_ADDR": fmt.Sprintf("0.0.0.0:%d", in.AdminPort),
100+
"CTF_LOG_LEVEL": in.LogLevel,
101+
},
102+
HostConfigModifier: func(h *container.HostConfig) {
103+
h.PortBindings = framework.MapTheSamePort(grpcPort, adminPort)
104+
h.ExtraHosts = append(h.ExtraHosts, "host.docker.internal:host-gateway")
105+
},
106+
WaitingFor: tcwait.ForAll(
107+
tcwait.ForListeningPort(nat.Port(grpcPort)).WithPollInterval(200*time.Millisecond),
108+
tcwait.ForHTTP(adminPathHealth).
109+
WithPort(nat.Port(adminPort)).
110+
WithStartupTimeout(1*time.Minute).
111+
WithPollInterval(200*time.Millisecond),
112+
),
113+
}
114+
115+
c, err := tc.GenericContainer(ctx, tc.GenericContainerRequest{
116+
ContainerRequest: req,
117+
Started: true,
118+
})
119+
if err != nil {
120+
return nil, err
121+
}
122+
123+
host, err := framework.GetHostWithContext(ctx, c)
124+
if err != nil {
125+
return nil, err
126+
}
127+
128+
out := &Output{
129+
UseCache: true,
130+
ContainerName: in.ContainerName,
131+
ExternalGRPCURL: fmt.Sprintf("%s:%d", host, in.GRPCPort),
132+
InternalGRPCURL: fmt.Sprintf("%s:%d", in.ContainerName, in.GRPCPort),
133+
ExternalAdminURL: fmt.Sprintf("http://%s:%d", host, in.AdminPort),
134+
InternalAdminURL: fmt.Sprintf("http://%s:%d", in.ContainerName, in.AdminPort),
135+
}
136+
in.Out = out
137+
return out, nil
138+
}
139+
140+
func Health(ctx context.Context, adminURL string) (*HealthResponse, error) {
141+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, strings.TrimRight(adminURL, "/")+adminPathHealth, nil)
142+
if err != nil {
143+
return nil, err
144+
}
145+
resp, err := http.DefaultClient.Do(req)
146+
if err != nil {
147+
return nil, err
148+
}
149+
defer resp.Body.Close()
150+
if resp.StatusCode != http.StatusOK {
151+
return nil, fmt.Errorf("chip router health request failed with status %s", resp.Status)
152+
}
153+
var out HealthResponse
154+
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
155+
return nil, err
156+
}
157+
return &out, nil
158+
}
159+
160+
func RegisterSubscriber(ctx context.Context, adminURL, name, endpoint string) (string, error) {
161+
body, err := json.Marshal(registerSubscriberRequest{Name: name, Endpoint: endpoint})
162+
if err != nil {
163+
return "", err
164+
}
165+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, strings.TrimRight(adminURL, "/")+"/subscribers", bytes.NewReader(body))
166+
if err != nil {
167+
return "", err
168+
}
169+
req.Header.Set("Content-Type", "application/json")
170+
resp, err := http.DefaultClient.Do(req)
171+
if err != nil {
172+
return "", err
173+
}
174+
defer resp.Body.Close()
175+
if resp.StatusCode != http.StatusOK {
176+
return "", fmt.Errorf("chip router register request failed with status %s", resp.Status)
177+
}
178+
var out registerSubscriberResponse
179+
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
180+
return "", err
181+
}
182+
if strings.TrimSpace(out.ID) == "" {
183+
return "", fmt.Errorf("chip router register response missing subscriber id")
184+
}
185+
return out.ID, nil
186+
}
187+
188+
func UnregisterSubscriber(ctx context.Context, adminURL, id string) error {
189+
if strings.TrimSpace(id) == "" {
190+
return nil
191+
}
192+
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, strings.TrimRight(adminURL, "/")+"/subscribers/"+id, nil)
193+
if err != nil {
194+
return err
195+
}
196+
resp, err := http.DefaultClient.Do(req)
197+
if err != nil {
198+
return err
199+
}
200+
defer resp.Body.Close()
201+
if resp.StatusCode != http.StatusNoContent {
202+
return fmt.Errorf("chip router unregister request failed with status %s", resp.Status)
203+
}
204+
return nil
205+
}

0 commit comments

Comments
 (0)