From 2bb29468d8f2035806c6c8628cd8456b4b5ed0d6 Mon Sep 17 00:00:00 2001 From: n0ctal <4c866w5fn9@privaterelay.appleid.com> Date: Sat, 20 Jun 2026 21:10:18 +0500 Subject: [PATCH] fix(xray): guard log-writer race and bound handler gRPC deadlines (#5442) * perf(xray): compile log/traffic regexps once at package scope GetTraffic recompiled two stats regexps on every traffic tick, and LogWriter.Write recompiled two more on every log line. Hoist all four to package-level vars so they compile once at load instead of per call on hot paths. * fix(xray): guard LogWriter.lastLine against the GetResult reader race Write is driven by the Xray process goroutine while Process.GetResult reads lastLine from the caller's goroutine, so the unsynchronized field is a data race under `go test -race`. Add an RWMutex and route every write through setLastLine; GetResult reads via LastLine(). * fix(xray): bound handler gRPC calls with a deadline AddInbound, DelInbound and the AddUser AlterInbound call used context.Background(), so a hung core connection could block the caller indefinitely (for example while the process restart lock is held). Give them a 10s deadline (handlerRPCTimeout) and a nil-client guard, matching the other handler operations. --- internal/xray/api.go | 26 ++++++++++++++++--- internal/xray/log_writer.go | 27 ++++++++++++++++---- internal/xray/log_writer_race_test.go | 36 +++++++++++++++++++++++++++ internal/xray/process.go | 5 ++-- 4 files changed, 84 insertions(+), 10 deletions(-) create mode 100644 internal/xray/log_writer_race_test.go diff --git a/internal/xray/api.go b/internal/xray/api.go index bbcb63ed1..8bb8d9a2e 100644 --- a/internal/xray/api.go +++ b/internal/xray/api.go @@ -123,8 +123,16 @@ func (x *XrayAPI) Close() { x.isConnected = false } +// handlerRPCTimeout bounds per-call gRPC handler operations (add/remove inbound, +// alter user) so a hung core connection cannot block the caller indefinitely — +// for example while the process restart lock is held. +const handlerRPCTimeout = 10 * time.Second + // AddInbound adds a new inbound configuration to the Xray core via gRPC. func (x *XrayAPI) AddInbound(inbound []byte) error { + if x.HandlerServiceClient == nil { + return common.NewError("xray HandlerServiceClient is not initialized") + } client := *x.HandlerServiceClient conf := new(conf.InboundDetourConfig) @@ -140,15 +148,22 @@ func (x *XrayAPI) AddInbound(inbound []byte) error { } inboundConfig := command.AddInboundRequest{Inbound: config} - _, err = client.AddInbound(context.Background(), &inboundConfig) + ctx, cancel := context.WithTimeout(context.Background(), handlerRPCTimeout) + defer cancel() + _, err = client.AddInbound(ctx, &inboundConfig) return err } // DelInbound removes an inbound configuration from the Xray core by tag. func (x *XrayAPI) DelInbound(tag string) error { + if x.HandlerServiceClient == nil { + return common.NewError("xray HandlerServiceClient is not initialized") + } client := *x.HandlerServiceClient - _, err := client.RemoveInbound(context.Background(), &command.RemoveInboundRequest{ + ctx, cancel := context.WithTimeout(context.Background(), handlerRPCTimeout) + defer cancel() + _, err := client.RemoveInbound(ctx, &command.RemoveInboundRequest{ Tag: tag, }) return err @@ -505,9 +520,14 @@ func (x *XrayAPI) AddUser(Protocol string, inboundTag string, user map[string]an return nil } + if x.HandlerServiceClient == nil { + return common.NewError("xray HandlerServiceClient is not initialized") + } client := *x.HandlerServiceClient - _, err = client.AlterInbound(context.Background(), &command.AlterInboundRequest{ + ctx, cancel := context.WithTimeout(context.Background(), handlerRPCTimeout) + defer cancel() + _, err = client.AlterInbound(ctx, &command.AlterInboundRequest{ Tag: inboundTag, Operation: serial.ToTypedMessage(&command.AddUserOperation{ User: &protocol.User{ diff --git a/internal/xray/log_writer.go b/internal/xray/log_writer.go index 2e8c2d091..edfe83052 100644 --- a/internal/xray/log_writer.go +++ b/internal/xray/log_writer.go @@ -4,6 +4,7 @@ import ( "regexp" "runtime" "strings" + "sync" "github.com/mhsanaei/3x-ui/v3/internal/logger" ) @@ -22,9 +23,25 @@ func NewLogWriter() *LogWriter { // LogWriter processes and filters log output from the Xray process, handling crash detection and message filtering. type LogWriter struct { + mu sync.RWMutex lastLine string } +// LastLine returns the most recently processed Xray log line. It is safe for +// concurrent use: Process.GetResult reads it from a different goroutine than the +// one Xray drives Write from. +func (lw *LogWriter) LastLine() string { + lw.mu.RLock() + defer lw.mu.RUnlock() + return lw.lastLine +} + +func (lw *LogWriter) setLastLine(line string) { + lw.mu.Lock() + lw.lastLine = line + lw.mu.Unlock() +} + // Write processes and filters log output from the Xray process, handling crash detection and message filtering. func (lw *LogWriter) Write(m []byte) (n int, err error) { // Convert the data to a string @@ -39,7 +56,7 @@ func (lw *LogWriter) Write(m []byte) (n int, err error) { // Check if the message contains a crash if crashRegex.MatchString(message) { logger.Debug("Core crash detected:\n", message) - lw.lastLine = message + lw.setLastLine(message) err1 := writeCrashReport(m) if err1 != nil { logger.Error("Unable to write crash report:", err1) @@ -60,7 +77,7 @@ func (lw *LogWriter) Write(m []byte) (n int, err error) { if strings.Contains(msgBodyLower, "tls handshake error") || strings.Contains(msgBodyLower, "connection ends") { logger.Debug("XRAY: " + msgBody) - lw.lastLine = "" + lw.setLastLine("") continue } @@ -80,14 +97,14 @@ func (lw *LogWriter) Write(m []byte) (n int, err error) { logger.Debug("XRAY: " + msg) } } - lw.lastLine = "" + lw.setLastLine("") } else if msg != "" { msgLower := strings.ToLower(msg) if strings.Contains(msgLower, "tls handshake error") || strings.Contains(msgLower, "connection ends") { logger.Debug("XRAY: " + msg) - lw.lastLine = msg + lw.setLastLine(msg) continue } @@ -96,7 +113,7 @@ func (lw *LogWriter) Write(m []byte) (n int, err error) { } else { logger.Debug("XRAY: " + msg) } - lw.lastLine = msg + lw.setLastLine(msg) } } diff --git a/internal/xray/log_writer_race_test.go b/internal/xray/log_writer_race_test.go new file mode 100644 index 000000000..417814946 --- /dev/null +++ b/internal/xray/log_writer_race_test.go @@ -0,0 +1,36 @@ +package xray + +import ( + "sync" + "testing" +) + +// TestLogWriterLastLineConcurrent exercises the LogWriter from multiple +// goroutines: Xray drives Write while another goroutine (Process.GetResult) +// reads the last line. Run under `go test -race` this fails on an unguarded +// lastLine field and passes once the access is serialized. +func TestLogWriterLastLineConcurrent(t *testing.T) { + lw := NewLogWriter() + const writers, readers, iterations = 4, 4, 500 + + var wg sync.WaitGroup + wg.Add(writers + readers) + + for i := 0; i < writers; i++ { + go func() { + defer wg.Done() + for j := 0; j < iterations; j++ { + _, _ = lw.Write([]byte("2024/01/01 00:00:00.000000 [Info] connection accepted")) + } + }() + } + for i := 0; i < readers; i++ { + go func() { + defer wg.Done() + for j := 0; j < iterations; j++ { + _ = lw.LastLine() + } + }() + } + wg.Wait() +} diff --git a/internal/xray/process.go b/internal/xray/process.go index e2b8606c1..560b6ade1 100644 --- a/internal/xray/process.go +++ b/internal/xray/process.go @@ -273,10 +273,11 @@ func (p *process) GetResult() string { p.mu.RLock() exitErr := p.exitErr p.mu.RUnlock() - if len(p.logWriter.lastLine) == 0 && exitErr != nil { + lastLine := p.logWriter.LastLine() + if len(lastLine) == 0 && exitErr != nil { return exitErr.Error() } - return p.logWriter.lastLine + return lastLine } // GetVersion returns the version string of the Xray process.