db: don't set machine heartbeat until first connection (#3019)

* db: don't set machine heartbeat until first connection

* cscli machines prune: if hearbeat is not set, look at creation date

* lint
This commit is contained in:
mmetc 2024-06-17 10:39:50 +02:00 committed by GitHub
parent 44a2014f62
commit 4521a98ecc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 41 additions and 25 deletions

View file

@ -414,7 +414,7 @@ func (cli *cliMachines) prune(duration time.Duration, notValidOnly bool, force b
}
if !notValidOnly {
if pending, err := cli.db.QueryLastValidatedHeartbeatLT(time.Now().UTC().Add(-duration)); err == nil {
if pending, err := cli.db.QueryMachinesInactiveSince(time.Now().UTC().Add(-duration)); err == nil {
machines = append(machines, pending...)
}
}

View file

@ -87,8 +87,6 @@ var (
UpdateDefaultUpdatedAt func() time.Time
// DefaultLastPush holds the default value on creation for the "last_push" field.
DefaultLastPush func() time.Time
// DefaultLastHeartbeat holds the default value on creation for the "last_heartbeat" field.
DefaultLastHeartbeat func() time.Time
// ScenariosValidator is a validator for the "scenarios" field. It is called by the builders before save.
ScenariosValidator func(string) error
// DefaultIsValidated holds the default value on creation for the "isValidated" field.

View file

@ -227,10 +227,6 @@ func (mc *MachineCreate) defaults() {
v := machine.DefaultLastPush()
mc.mutation.SetLastPush(v)
}
if _, ok := mc.mutation.LastHeartbeat(); !ok {
v := machine.DefaultLastHeartbeat()
mc.mutation.SetLastHeartbeat(v)
}
if _, ok := mc.mutation.IsValidated(); !ok {
v := machine.DefaultIsValidated
mc.mutation.SetIsValidated(v)

View file

@ -142,10 +142,6 @@ func init() {
machineDescLastPush := machineFields[2].Descriptor()
// machine.DefaultLastPush holds the default value on creation for the last_push field.
machine.DefaultLastPush = machineDescLastPush.Default.(func() time.Time)
// machineDescLastHeartbeat is the schema descriptor for last_heartbeat field.
machineDescLastHeartbeat := machineFields[3].Descriptor()
// machine.DefaultLastHeartbeat holds the default value on creation for the last_heartbeat field.
machine.DefaultLastHeartbeat = machineDescLastHeartbeat.Default.(func() time.Time)
// machineDescScenarios is the schema descriptor for scenarios field.
machineDescScenarios := machineFields[7].Descriptor()
// machine.ScenariosValidator is a validator for the "scenarios" field. It is called by the builders before save.

View file

@ -4,6 +4,7 @@ import (
"entgo.io/ent"
"entgo.io/ent/schema/edge"
"entgo.io/ent/schema/field"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
@ -25,7 +26,6 @@ func (Machine) Fields() []ent.Field {
Default(types.UtcNow).
Nillable().Optional(),
field.Time("last_heartbeat").
Default(types.UtcNow).
Nillable().Optional(),
field.String("machineId").
Unique().

View file

@ -13,8 +13,10 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/types"
)
const CapiMachineID = types.CAPIOrigin
const CapiListsMachineID = types.ListOrigin
const (
CapiMachineID = types.CAPIOrigin
CapiListsMachineID = types.ListOrigin
)
func (c *Client) CreateMachine(machineID *string, password *strfmt.Password, ipAddress string, isValidated bool, force bool, authType string) (*ent.Machine, error) {
hashPassword, err := bcrypt.GenerateFromPassword([]byte(*password), bcrypt.DefaultCost)
@ -30,6 +32,7 @@ func (c *Client) CreateMachine(machineID *string, password *strfmt.Password, ipA
if err != nil {
return nil, errors.Wrapf(QueryFail, "machine '%s': %s", *machineID, err)
}
if len(machineExist) > 0 {
if force {
_, err := c.Ent.Machine.Update().Where(machine.MachineIdEQ(*machineID)).SetPassword(string(hashPassword)).Save(c.CTX)
@ -37,12 +40,15 @@ func (c *Client) CreateMachine(machineID *string, password *strfmt.Password, ipA
c.Log.Warningf("CreateMachine : %s", err)
return nil, errors.Wrapf(UpdateFail, "machine '%s'", *machineID)
}
machine, err := c.QueryMachineByID(*machineID)
if err != nil {
return nil, errors.Wrapf(QueryFail, "machine '%s': %s", *machineID, err)
}
return machine, nil
}
return nil, errors.Wrapf(UserExists, "user '%s'", *machineID)
}
@ -54,7 +60,6 @@ func (c *Client) CreateMachine(machineID *string, password *strfmt.Password, ipA
SetIsValidated(isValidated).
SetAuthType(authType).
Save(c.CTX)
if err != nil {
c.Log.Warningf("CreateMachine : %s", err)
return nil, errors.Wrapf(InsertFail, "creating machine '%s'", *machineID)
@ -72,6 +77,7 @@ func (c *Client) QueryMachineByID(machineID string) (*ent.Machine, error) {
c.Log.Warningf("QueryMachineByID : %s", err)
return &ent.Machine{}, errors.Wrapf(UserNotExists, "user '%s'", machineID)
}
return machine, nil
}
@ -80,6 +86,7 @@ func (c *Client) ListMachines() ([]*ent.Machine, error) {
if err != nil {
return nil, errors.Wrapf(QueryFail, "listing machines: %s", err)
}
return machines, nil
}
@ -88,21 +95,21 @@ func (c *Client) ValidateMachine(machineID string) error {
if err != nil {
return errors.Wrapf(UpdateFail, "validating machine: %s", err)
}
if rets == 0 {
return fmt.Errorf("machine not found")
return errors.New("machine not found")
}
return nil
}
func (c *Client) QueryPendingMachine() ([]*ent.Machine, error) {
var machines []*ent.Machine
var err error
machines, err = c.Ent.Machine.Query().Where(machine.IsValidatedEQ(false)).All(c.CTX)
machines, err := c.Ent.Machine.Query().Where(machine.IsValidatedEQ(false)).All(c.CTX)
if err != nil {
c.Log.Warningf("QueryPendingMachine : %s", err)
return nil, errors.Wrapf(QueryFail, "querying pending machines: %s", err)
}
return machines, nil
}
@ -116,7 +123,7 @@ func (c *Client) DeleteWatcher(name string) error {
}
if nbDeleted == 0 {
return fmt.Errorf("machine doesn't exist")
return errors.New("machine doesn't exist")
}
return nil
@ -127,10 +134,12 @@ func (c *Client) BulkDeleteWatchers(machines []*ent.Machine) (int, error) {
for i, b := range machines {
ids[i] = b.ID
}
nbDeleted, err := c.Ent.Machine.Delete().Where(machine.IDIn(ids...)).Exec(c.CTX)
if err != nil {
return nbDeleted, err
}
return nbDeleted, nil
}
@ -139,6 +148,7 @@ func (c *Client) UpdateMachineLastHeartBeat(machineID string) error {
if err != nil {
return errors.Wrapf(UpdateFail, "updating machine last_heartbeat: %s", err)
}
return nil
}
@ -150,6 +160,7 @@ func (c *Client) UpdateMachineScenarios(scenarios string, ID int) error {
if err != nil {
return fmt.Errorf("unable to update machine in database: %s", err)
}
return nil
}
@ -160,6 +171,7 @@ func (c *Client) UpdateMachineIP(ipAddr string, ID int) error {
if err != nil {
return fmt.Errorf("unable to update machine IP in database: %s", err)
}
return nil
}
@ -170,6 +182,7 @@ func (c *Client) UpdateMachineVersion(ipAddr string, ID int) error {
if err != nil {
return fmt.Errorf("unable to update machine version in database: %s", err)
}
return nil
}
@ -178,17 +191,23 @@ func (c *Client) IsMachineRegistered(machineID string) (bool, error) {
if err != nil {
return false, err
}
if len(exist) == 1 {
return true, nil
}
if len(exist) > 1 {
return false, fmt.Errorf("more than one item with the same machineID in database")
return false, errors.New("more than one item with the same machineID in database")
}
return false, nil
}
func (c *Client) QueryLastValidatedHeartbeatLT(t time.Time) ([]*ent.Machine, error) {
return c.Ent.Machine.Query().Where(machine.LastHeartbeatLT(t), machine.IsValidatedEQ(true)).All(c.CTX)
func (c *Client) QueryMachinesInactiveSince(t time.Time) ([]*ent.Machine, error) {
return c.Ent.Machine.Query().Where(
machine.Or(
machine.And(machine.LastHeartbeatLT(t), machine.IsValidatedEQ(true)),
machine.And(machine.LastHeartbeatIsNil(), machine.CreatedAtLT(t)),
),
).All(c.CTX)
}

View file

@ -62,6 +62,13 @@ teardown() {
assert_output 1
}
@test "heartbeat is initially null" {
rune -0 cscli machines add foo --auto --file /dev/null
rune -0 cscli machines list -o json
rune -0 yq '.[] | select(.machineId == "foo") | .last_heartbeat' <(output)
assert_output null
}
@test "register, validate and then remove a machine" {
rune -0 cscli lapi register --machine CiTestMachineRegister -f /dev/null -o human
assert_stderr --partial "Successfully registered to Local API (LAPI)"