Skip to content

Commit 4dbaead

Browse files
committed
feat(pipeline): per-pipeline packet_length with global fallback
Add optional packet_length config to each pipeline. When set, it overrides the global server.config.packet_length for that pipeline's sessions in both RPC chunking (upload/download/pipe) and parser-level packet size validation. Zero value falls back to global default. Includes unit tests for Session.GetPacketLength() and MaleficParser.maxPacketLen() fallback logic.
1 parent f2ea4e1 commit 4dbaead

13 files changed

Lines changed: 235 additions & 70 deletions

File tree

helper/implanttypes/pipeline.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,10 +231,11 @@ type PipelineParams struct {
231231
Tls *TlsConfig `json:"tls,omitempty"`
232232
Secure *SecureConfig `json:"secure,omitempty"`
233233
// HTTP pipeline specific params
234-
Headers map[string][]string `json:"headers,omitempty"`
235-
ErrorPage string `json:"error_page,omitempty" gorm:"-"`
236-
BodyPrefix string `json:"body_prefix,omitempty"`
237-
BodySuffix string `json:"body_suffix,omitempty"`
234+
Headers map[string][]string `json:"headers,omitempty"`
235+
ErrorPage string `json:"error_page,omitempty" gorm:"-"`
236+
BodyPrefix string `json:"body_prefix,omitempty"`
237+
BodySuffix string `json:"body_suffix,omitempty"`
238+
PacketLength int `json:"packet_length,omitempty" yaml:"packet_length,omitempty"`
238239
}
239240

240241
func (params *PipelineParams) String() string {

server/internal/configs/listener.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type TcpPipelineConfig struct {
3838
TlsConfig *TlsConfig `config:"tls" yaml:"tls"`
3939
EncryptionConfig implanttypes.EncryptionsConfig `config:"encryption" yaml:"encryption"`
4040
SecureConfig *implanttypes.SecureConfig `config:"secure" yaml:"secure"` // Age 密码学安全配置
41+
PacketLength int `config:"packet_length" yaml:"packet_length"`
4142
}
4243

4344
type AutoBuildConfig struct {
@@ -58,6 +59,7 @@ func (tcp *TcpPipelineConfig) ToProtobuf(lisId string) (*clientpb.Pipeline, erro
5859
Enable: tcp.Enable,
5960
Parser: tcp.Parser,
6061
Type: consts.TCPPipeline,
62+
PacketLength: uint32(tcp.PacketLength),
6163
Body: &clientpb.Pipeline_Tcp{
6264
Tcp: &clientpb.TCPPipeline{
6365
Host: tcp.Host,
@@ -75,6 +77,7 @@ type BindPipelineConfig struct {
7577
Name string `config:"name" default:"bind" yaml:"name"`
7678
TlsConfig *TlsConfig `config:"tls" yaml:"tls"`
7779
EncryptionConfig implanttypes.EncryptionsConfig `config:"encryption" yaml:"encryption"`
80+
PacketLength int `config:"packet_length" yaml:"packet_length"`
7881
}
7982

8083
func (pipeline *BindPipelineConfig) ToProtobuf(lisId string) (*clientpb.Pipeline, error) {
@@ -87,6 +90,7 @@ func (pipeline *BindPipelineConfig) ToProtobuf(lisId string) (*clientpb.Pipeline
8790
Enable: pipeline.Enable,
8891
ListenerId: lisId,
8992
Parser: consts.ImplantMalefic,
93+
PacketLength: uint32(pipeline.PacketLength),
9094
Body: &clientpb.Pipeline_Bind{
9195
Bind: &clientpb.BindPipeline{},
9296
},
@@ -108,6 +112,7 @@ type HttpPipelineConfig struct {
108112
ErrorPage string `config:"error_page" yaml:"error_page"`
109113
BodyPrefix string `config:"body_prefix" yaml:"body_prefix"`
110114
BodySuffix string `config:"body_suffix" yaml:"body_suffix"`
115+
PacketLength int `config:"packet_length" yaml:"packet_length"`
111116
}
112117

113118
func (http *HttpPipelineConfig) ToProtobuf(lisId string) (*clientpb.Pipeline, error) {
@@ -139,6 +144,7 @@ func (http *HttpPipelineConfig) ToProtobuf(lisId string) (*clientpb.Pipeline, er
139144
Enable: http.Enable,
140145
Parser: http.Parser,
141146
Type: consts.HTTPPipeline,
147+
PacketLength: uint32(http.PacketLength),
142148
Body: &clientpb.Pipeline_Http{
143149
Http: &clientpb.HTTPPipeline{
144150
Host: http.Host,

server/internal/core/pipeline.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func FromPipeline(pipeline *clientpb.Pipeline) *PipelineConfig {
6464
TLSConfig: implanttypes.FromTls(pipeline.Tls),
6565
Encryption: implanttypes.FromEncryptions(pipeline.GetEncryption()),
6666
SecureConfig: implanttypes.FromSecure(pipeline.Secure),
67+
PacketLength: int(pipeline.PacketLength),
6768
}
6869
}
6970

@@ -73,6 +74,7 @@ type PipelineConfig struct {
7374
TLSConfig *implanttypes.TlsConfig
7475
Encryption implanttypes.EncryptionsConfig
7576
SecureConfig *implanttypes.SecureConfig
77+
PacketLength int
7678
}
7779

7880
func (p *PipelineConfig) WrapConn(conn io.ReadWriteCloser) (*cryptostream.Conn, error) {
@@ -83,7 +85,7 @@ func (p *PipelineConfig) WrapConn(conn io.ReadWriteCloser) (*cryptostream.Conn,
8385
if err != nil {
8486
return nil, err
8587
}
86-
return cryptostream.WrapPeekConn(conn, crys, p.Parser)
88+
return cryptostream.WrapPeekConn(conn, crys, p.Parser, uint32(p.PacketLength))
8789
}
8890

8991
// WrapBindConn wraps a connection for bind mode without pre-reading
@@ -96,7 +98,7 @@ func (p *PipelineConfig) WrapBindConn(conn io.ReadWriteCloser) (*cryptostream.Co
9698
if err != nil {
9799
return nil, err
98100
}
99-
return cryptostream.WrapBindConn(conn, crys)
101+
return cryptostream.WrapBindConn(conn, crys, uint32(p.PacketLength))
100102
}
101103

102104
//

server/internal/core/session.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,15 @@ func (s *Session) GetPipelineEncryptionKey() string {
625625
return encryptions[0].Key
626626
}
627627

628+
// GetPacketLength returns the per-pipeline packet length or falls back to global config.
629+
func (s *Session) GetPacketLength() int {
630+
pipeline, ok := Listeners.Find(s.PipelineID)
631+
if ok && pipeline != nil && pipeline.PacketLength > 0 {
632+
return int(pipeline.PacketLength)
633+
}
634+
return config.Int(consts.ConfigMaxPacketLength)
635+
}
636+
628637
func (s *Session) Update(req *clientpb.RegisterSession) {
629638
s.Name = req.RegisterData.Name
630639
s.PipelineID = req.PipelineId
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package core
2+
3+
import (
4+
"testing"
5+
6+
"github.com/chainreactors/IoM-go/consts"
7+
"github.com/chainreactors/IoM-go/proto/client/clientpb"
8+
"github.com/gookit/config/v2"
9+
)
10+
11+
func TestGetPacketLengthWithPipelineConfig(t *testing.T) {
12+
withIsolatedListeners(t)
13+
withIsolatedBroker(t)
14+
15+
config.Set(consts.ConfigMaxPacketLength, 10485760)
16+
t.Cleanup(func() { config.Set(consts.ConfigMaxPacketLength, 0) })
17+
18+
listener := NewListener("test-lis", "10.0.0.1")
19+
Listeners.Add(listener)
20+
listener.AddPipeline(&clientpb.Pipeline{
21+
Name: "pipe-custom",
22+
ListenerId: "test-lis",
23+
PacketLength: 2048,
24+
})
25+
26+
sess := newTestSession("pkt-test")
27+
sess.PipelineID = "pipe-custom"
28+
29+
got := sess.GetPacketLength()
30+
if got != 2048 {
31+
t.Fatalf("GetPacketLength() = %d, want 2048", got)
32+
}
33+
}
34+
35+
func TestGetPacketLengthFallsBackToGlobal(t *testing.T) {
36+
withIsolatedListeners(t)
37+
withIsolatedBroker(t)
38+
39+
config.Set(consts.ConfigMaxPacketLength, 10485760)
40+
t.Cleanup(func() { config.Set(consts.ConfigMaxPacketLength, 0) })
41+
42+
listener := NewListener("test-lis2", "10.0.0.1")
43+
Listeners.Add(listener)
44+
listener.AddPipeline(&clientpb.Pipeline{
45+
Name: "pipe-default",
46+
ListenerId: "test-lis2",
47+
// PacketLength is 0 (unset)
48+
})
49+
50+
sess := newTestSession("pkt-fallback")
51+
sess.PipelineID = "pipe-default"
52+
53+
got := sess.GetPacketLength()
54+
if got != 10485760 {
55+
t.Fatalf("GetPacketLength() = %d, want 10485760 (global default)", got)
56+
}
57+
}
58+
59+
func TestGetPacketLengthNoPipeline(t *testing.T) {
60+
withIsolatedListeners(t)
61+
withIsolatedBroker(t)
62+
63+
config.Set(consts.ConfigMaxPacketLength, 10485760)
64+
t.Cleanup(func() { config.Set(consts.ConfigMaxPacketLength, 0) })
65+
66+
sess := newTestSession("pkt-nopipe")
67+
sess.PipelineID = "nonexistent"
68+
69+
got := sess.GetPacketLength()
70+
if got != 10485760 {
71+
t.Fatalf("GetPacketLength() = %d, want 10485760 (global default)", got)
72+
}
73+
}

server/internal/db/models/pipeline.go

Lines changed: 57 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@ type Pipeline struct {
3131

3232
func pipelineParamsFromProto(pipeline *clientpb.Pipeline) *implanttypes.PipelineParams {
3333
return &implanttypes.PipelineParams{
34-
Parser: pipeline.Parser,
35-
Tls: implanttypes.FromTls(pipeline.Tls),
36-
Encryption: implanttypes.FromEncryptions(pipeline.Encryption),
37-
Secure: implanttypes.FromSecure(pipeline.Secure),
34+
Parser: pipeline.Parser,
35+
Tls: implanttypes.FromTls(pipeline.Tls),
36+
Encryption: implanttypes.FromEncryptions(pipeline.Encryption),
37+
Secure: implanttypes.FromSecure(pipeline.Secure),
38+
PacketLength: int(pipeline.PacketLength),
3839
}
3940
}
4041

@@ -80,16 +81,21 @@ func (pipeline *Pipeline) ToProtobuf() *clientpb.Pipeline {
8081
if pipeline == nil {
8182
return nil
8283
}
84+
var packetLength uint32
85+
if pipeline.PipelineParams != nil {
86+
packetLength = uint32(pipeline.PipelineParams.PacketLength)
87+
}
8388
switch pipeline.Type {
8489
case consts.TCPPipeline:
8590
return &clientpb.Pipeline{
86-
Name: pipeline.Name,
87-
ListenerId: pipeline.ListenerId,
88-
Enable: pipeline.Enable,
89-
Parser: pipeline.Parser,
90-
Ip: pipeline.IP,
91-
Type: consts.TCPPipeline,
92-
CertName: pipeline.CertName,
91+
Name: pipeline.Name,
92+
ListenerId: pipeline.ListenerId,
93+
Enable: pipeline.Enable,
94+
Parser: pipeline.Parser,
95+
Ip: pipeline.IP,
96+
Type: consts.TCPPipeline,
97+
CertName: pipeline.CertName,
98+
PacketLength: packetLength,
9399
Body: &clientpb.Pipeline_Tcp{
94100
Tcp: &clientpb.TCPPipeline{
95101
Name: pipeline.Name,
@@ -104,13 +110,14 @@ func (pipeline *Pipeline) ToProtobuf() *clientpb.Pipeline {
104110
}
105111
case consts.HTTPPipeline:
106112
return &clientpb.Pipeline{
107-
Name: pipeline.Name,
108-
ListenerId: pipeline.ListenerId,
109-
Enable: pipeline.Enable,
110-
Parser: pipeline.Parser,
111-
Ip: pipeline.IP,
112-
Type: consts.HTTPPipeline,
113-
CertName: pipeline.CertName,
113+
Name: pipeline.Name,
114+
ListenerId: pipeline.ListenerId,
115+
Enable: pipeline.Enable,
116+
Parser: pipeline.Parser,
117+
Ip: pipeline.IP,
118+
Type: consts.HTTPPipeline,
119+
CertName: pipeline.CertName,
120+
PacketLength: packetLength,
114121
Body: &clientpb.Pipeline_Http{
115122
Http: &clientpb.HTTPPipeline{
116123
Name: pipeline.Name,
@@ -126,13 +133,14 @@ func (pipeline *Pipeline) ToProtobuf() *clientpb.Pipeline {
126133
}
127134
case consts.BindPipeline:
128135
return &clientpb.Pipeline{
129-
Name: pipeline.Name,
130-
ListenerId: pipeline.ListenerId,
131-
Enable: pipeline.Enable,
132-
Parser: pipeline.Parser,
133-
Ip: pipeline.IP,
134-
CertName: pipeline.CertName,
135-
Type: consts.BindPipeline,
136+
Name: pipeline.Name,
137+
ListenerId: pipeline.ListenerId,
138+
Enable: pipeline.Enable,
139+
Parser: pipeline.Parser,
140+
Ip: pipeline.IP,
141+
CertName: pipeline.CertName,
142+
Type: consts.BindPipeline,
143+
PacketLength: packetLength,
136144
Body: &clientpb.Pipeline_Bind{
137145
Bind: &clientpb.BindPipeline{
138146
Name: pipeline.Name,
@@ -145,13 +153,14 @@ func (pipeline *Pipeline) ToProtobuf() *clientpb.Pipeline {
145153
}
146154
case consts.WebsitePipeline:
147155
return &clientpb.Pipeline{
148-
Name: pipeline.Name,
149-
ListenerId: pipeline.ListenerId,
150-
Ip: pipeline.IP,
151-
Enable: pipeline.Enable,
152-
Parser: pipeline.Parser,
153-
CertName: pipeline.CertName,
154-
Type: consts.WebsitePipeline,
156+
Name: pipeline.Name,
157+
ListenerId: pipeline.ListenerId,
158+
Ip: pipeline.IP,
159+
Enable: pipeline.Enable,
160+
Parser: pipeline.Parser,
161+
CertName: pipeline.CertName,
162+
Type: consts.WebsitePipeline,
163+
PacketLength: packetLength,
155164
Body: &clientpb.Pipeline_Web{
156165
Web: &clientpb.Website{
157166
Name: pipeline.Name,
@@ -167,13 +176,14 @@ func (pipeline *Pipeline) ToProtobuf() *clientpb.Pipeline {
167176
}
168177
case consts.RemPipeline:
169178
return &clientpb.Pipeline{
170-
Name: pipeline.Name,
171-
ListenerId: pipeline.ListenerId,
172-
Enable: pipeline.Enable,
173-
Parser: pipeline.Parser,
174-
Type: consts.RemPipeline,
175-
Ip: pipeline.IP,
176-
CertName: pipeline.CertName,
179+
Name: pipeline.Name,
180+
ListenerId: pipeline.ListenerId,
181+
Enable: pipeline.Enable,
182+
Parser: pipeline.Parser,
183+
Type: consts.RemPipeline,
184+
Ip: pipeline.IP,
185+
CertName: pipeline.CertName,
186+
PacketLength: packetLength,
177187
Body: &clientpb.Pipeline_Rem{
178188
Rem: &clientpb.REM{
179189
Name: pipeline.Name,
@@ -197,13 +207,14 @@ func (pipeline *Pipeline) ToProtobuf() *clientpb.Pipeline {
197207
params = string(data)
198208
}
199209
return &clientpb.Pipeline{
200-
Name: pipeline.Name,
201-
ListenerId: pipeline.ListenerId,
202-
Enable: pipeline.Enable,
203-
Parser: pipeline.Parser,
204-
Ip: pipeline.IP,
205-
Type: pipeline.Type,
206-
CertName: pipeline.CertName,
210+
Name: pipeline.Name,
211+
ListenerId: pipeline.ListenerId,
212+
Enable: pipeline.Enable,
213+
Parser: pipeline.Parser,
214+
Ip: pipeline.IP,
215+
Type: pipeline.Type,
216+
CertName: pipeline.CertName,
217+
PacketLength: packetLength,
207218
Body: &clientpb.Pipeline_Custom{
208219
Custom: &clientpb.CustomPipeline{
209220
Name: pipeline.Name,

server/internal/parser/malefic/parser.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,19 @@ func NewMaleficParser() *MaleficParser {
3434
}
3535

3636
type MaleficParser struct {
37-
StartDelimiter byte
38-
EndDelimiter byte
39-
keyPair *clientpb.KeyPair // Age 密钥对,用于加解密
40-
privateKeys []string
37+
StartDelimiter byte
38+
EndDelimiter byte
39+
MaxPacketLength uint32
40+
keyPair *clientpb.KeyPair // Age 密钥对,用于加解密
41+
privateKeys []string
42+
}
43+
44+
// maxPacketLen returns the per-pipeline limit or falls back to global config.
45+
func (parser *MaleficParser) maxPacketLen() uint32 {
46+
if parser.MaxPacketLength > 0 {
47+
return parser.MaxPacketLength
48+
}
49+
return uint32(config.Uint(consts.ConfigMaxPacketLength))
4150
}
4251

4352
// WithSecure 设置 Age 密钥对用于加解密,返回新的 parser 实例
@@ -92,8 +101,8 @@ func (parser *MaleficParser) readHeader(conn io.ReadWriteCloser) (uint32, uint32
92101
}
93102
sessionId := ParseSid(header)
94103
length := binary.LittleEndian.Uint32(header[MsgSessionEnd:])
95-
if length > uint32(config.Uint(consts.ConfigMaxPacketLength))+consts.KB*16 {
96-
return 0, 0, fmt.Errorf("%w,expect: %d, recv: %d", types.ErrPacketTooLarge, config.Int(consts.ConfigMaxPacketLength), length)
104+
if length > parser.maxPacketLen()+consts.KB*16 {
105+
return 0, 0, fmt.Errorf("%w,expect: %d, recv: %d", types.ErrPacketTooLarge, parser.maxPacketLen(), length)
97106
}
98107

99108
return sessionId, length + 1, nil
@@ -105,8 +114,8 @@ func (parser *MaleficParser) ReadHeader(conn io.ReadWriteCloser) (uint32, uint32
105114
return 0, 0, err
106115
}
107116
//logs.Log.Debugf("%v read packet from %s , %d bytes", sid, conn.RemoteAddr(), length)
108-
if length > uint32(config.Uint(consts.ConfigMaxPacketLength))+consts.KB*16+1 {
109-
return 0, 0, fmt.Errorf("%w,expect: %d, recv: %d", types.ErrPacketTooLarge, config.Int(consts.ConfigMaxPacketLength), length)
117+
if length > parser.maxPacketLen()+consts.KB*16+1 {
118+
return 0, 0, fmt.Errorf("%w,expect: %d, recv: %d", types.ErrPacketTooLarge, parser.maxPacketLen(), length)
110119
}
111120
return sid, length, nil
112121
}

0 commit comments

Comments
 (0)