update checks for wrapped errors (#3117)

* errors.Is()
* extract function isBrokenConnection()
This commit is contained in:
mmetc 2024-11-04 11:21:48 +01:00 committed by GitHub
parent 1616991398
commit 57521114bd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 32 additions and 21 deletions

View file

@ -348,10 +348,6 @@ issues:
- errorlint
text: "type switch on error will fail on wrapped errors. Use errors.As to check for specific errors"
- linters:
- errorlint
text: "comparing with .* will fail on wrapped errors. Use errors.Is to check for a specific error"
- linters:
- nosprintfhostport
text: "host:port in url should be constructed with net.JoinHostPort and not directly with fmt.Sprintf"

View file

@ -85,6 +85,7 @@ func (ac *AuthCache) Get(apiKey string) (time.Time, bool) {
ac.mu.RLock()
expiration, exists := ac.APIKeys[apiKey]
ac.mu.RUnlock()
return expiration, exists
}
@ -128,6 +129,7 @@ func (w *AppsecSource) UnmarshalConfig(yamlConfig []byte) error {
if w.config.ListenSocket != "" && w.config.ListenAddr == "" {
w.config.Name = w.config.ListenSocket
}
if w.config.ListenSocket == "" {
w.config.Name = fmt.Sprintf("%s%s", w.config.ListenAddr, w.config.Path)
}
@ -153,6 +155,7 @@ func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLe
if err != nil {
return fmt.Errorf("unable to parse appsec configuration: %w", err)
}
w.logger = logger
w.metricsLevel = MetricsLevel
w.logger.Tracef("Appsec configuration: %+v", w.config)
@ -211,10 +214,12 @@ func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLe
AppsecRuntime: &wrt,
Labels: w.config.Labels,
}
err := runner.Init(appsecCfg.GetDataDir())
if err != nil {
return fmt.Errorf("unable to initialize runner: %w", err)
}
w.AppsecRunners[nbRoutine] = runner
}
@ -222,6 +227,7 @@ func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLe
// We don´t use the wrapper provided by coraza because we want to fully control what happens when a rule match to send the information in crowdsec
w.mux.HandleFunc(w.config.Path, w.appsecHandler)
return nil
}
@ -243,10 +249,12 @@ func (w *AppsecSource) OneShotAcquisition(_ context.Context, _ chan types.Event,
func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
w.outChan = out
t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/appsec/live")
w.logger.Infof("%d appsec runner to start", len(w.AppsecRunners))
for _, runner := range w.AppsecRunners {
runner.outChan = out
t.Go(func() error {
@ -254,6 +262,7 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types.
return runner.Run(t)
})
}
t.Go(func() error {
if w.config.ListenSocket != "" {
w.logger.Infof("creating unix socket %s", w.config.ListenSocket)
@ -268,10 +277,11 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types.
} else {
err = w.server.Serve(listener)
}
if err != nil && err != http.ErrServerClosed {
if err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("appsec server failed: %w", err)
}
}
return nil
})
t.Go(func() error {
@ -288,6 +298,7 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types.
return fmt.Errorf("appsec server failed: %w", err)
}
}
return nil
})
<-t.Dying()
@ -297,6 +308,7 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types.
w.server.Shutdown(ctx)
return nil
})
return nil
}
@ -391,6 +403,7 @@ func (w *AppsecSource) appsecHandler(rw http.ResponseWriter, r *http.Request) {
logger.Debugf("Response: %+v", appsecResponse)
rw.WriteHeader(statusCode)
body, err := json.Marshal(appsecResponse)
if err != nil {
logger.Errorf("unable to serialize response: %s", err)

View file

@ -94,7 +94,7 @@ func (w *WinEventLogSource) getXMLEvents(config *winlog.SubscribeConfig, publish
2000, // Timeout in milliseconds to wait.
0, // Reserved. Must be zero.
&returned) // The number of handles in the array that are set by the API.
if err == windows.ERROR_NO_MORE_ITEMS {
if errors.Is(err, windows.ERROR_NO_MORE_ITEMS) {
return nil, err
} else if err != nil {
return nil, fmt.Errorf("wevtapi.EvtNext failed: %v", err)
@ -188,7 +188,7 @@ func (w *WinEventLogSource) getEvents(out chan types.Event, t *tomb.Tomb) error
}
if status == syscall.WAIT_OBJECT_0 {
renderedEvents, err := w.getXMLEvents(w.evtConfig, publisherCache, subscription, 500)
if err == windows.ERROR_NO_MORE_ITEMS {
if errors.Is(err, windows.ERROR_NO_MORE_ITEMS) {
windows.ResetEvent(w.evtConfig.SignalEvent)
} else if err != nil {
w.logger.Errorf("getXMLEvents failed: %v", err)
@ -411,7 +411,7 @@ OUTER_LOOP:
return nil
default:
evts, err := w.getXMLEvents(w.evtConfig, publisherCache, handle, 500)
if err == windows.ERROR_NO_MORE_ITEMS {
if errors.Is(err, windows.ERROR_NO_MORE_ITEMS) {
log.Info("No more items")
break OUTER_LOOP
} else if err != nil {

View file

@ -46,20 +46,11 @@ type APIServer struct {
consoleConfig *csconfig.ConsoleConfig
}
func recoverFromPanic(c *gin.Context) {
err := recover()
if err == nil {
return
}
// Check for a broken connection, as it is not really a
// condition that warrants a panic stack trace.
brokenPipe := false
func isBrokenConnection(err any) bool {
if ne, ok := err.(*net.OpError); ok {
if se, ok := ne.Err.(*os.SyscallError); ok {
if strings.Contains(strings.ToLower(se.Error()), "broken pipe") || strings.Contains(strings.ToLower(se.Error()), "connection reset by peer") {
brokenPipe = true
return true
}
}
}
@ -79,11 +70,22 @@ func recoverFromPanic(c *gin.Context) {
errors.Is(strErr, errClosedBody) ||
errors.Is(strErr, errHandlerComplete) ||
errors.Is(strErr, errStreamClosed) {
brokenPipe = true
return true
}
}
if brokenPipe {
return false
}
func recoverFromPanic(c *gin.Context) {
err := recover()
if err == nil {
return
}
// Check for a broken connection, as it is not really a
// condition that warrants a panic stack trace.
if isBrokenConnection(err) {
log.Warningf("client %s disconnected: %s", c.ClientIP(), err)
c.Abort()
} else {