diff --git a/Dockerfile b/Dockerfile index c009ffb..f743afd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,12 +1,12 @@ -ARG DISTRO_VERSION="alpine" +FROM golang:1.20-alpine as builder -FROM golang:${DISTRO_VERSION} as builder +COPY src /fuse-device-plugin +WORKDIR /fuse-device-plugin -RUN apk add --update git -RUN git clone https://github.com/kuberenetes-learning-group/fuse-device-plugin.git --depth 1 -WORKDIR fuse-device-plugin -RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -ldflags "-s -w" . +RUN pwd && ls -laR + +RUN CGO_ENABLED=0 go build -ldflags "-s -w" . FROM scratch -COPY --from=builder /go/fuse-device-plugin/fuse-device-plugin /fuse-device-plugin -ENTRYPOINT ["/fuse-device-plugin", "--mounts_allowed", "5000"] +COPY --from=builder /fuse-device-plugin/fuse-device-plugin /fuse-device-plugin +ENTRYPOINT ["/fuse-device-plugin", "--mounts_allowed", "40"] diff --git a/README.md b/README.md index b0dff4a..20a9726 100644 --- a/README.md +++ b/README.md @@ -4,4 +4,5 @@ Public ECR repo: https://gallery.ecr.aws/zero-downtime/fuse-device-plugin # Resources - https://github.com/kuberenetes-learning-group/fuse-device-plugin +- https://github.com/cox96de/fuse-device-plugin - https://flavio.castelli.me/2020/09/16/build-multi-architecture-container-images-using-kubernetes/ diff --git a/fuse-device-plugin.yml b/fuse-device-plugin.yml new file mode 100644 index 0000000..7ac0d84 --- /dev/null +++ b/fuse-device-plugin.yml @@ -0,0 +1,28 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: fuse-device-plugin + namespace: kube-system +spec: + selector: + matchLabels: + name: fuse-device-plugin + template: + metadata: + labels: + name: fuse-device-plugin + spec: + containers: + - image: public.ecr.aws/zero-downtime/fuse-device-plugin:latest + name: fuse-device-plugin + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: ["ALL"] + volumeMounts: + - name: device-plugin + mountPath: /var/lib/kubelet/device-plugins + volumes: + - name: device-plugin + hostPath: + path: /var/lib/kubelet/device-plugins diff --git a/src/fuse.go b/src/fuse.go new file mode 100644 index 0000000..ba99779 --- /dev/null +++ b/src/fuse.go @@ -0,0 +1,29 @@ +package main + +import ( + "fmt" + "os" + + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" +) + +func getDevices(number int) []*pluginapi.Device { + hostname, _ := os.Hostname() + var devs []*pluginapi.Device + for i := 0; i < number; i++ { + devs = append(devs, &pluginapi.Device{ + ID: fmt.Sprintf("fuse-%s-%d", hostname, i), + Health: pluginapi.Healthy, + }) + } + return devs +} + +func deviceExists(devs []*pluginapi.Device, id string) bool { + for _, d := range devs { + if d.ID == id { + return true + } + } + return false +} diff --git a/src/go.mod b/src/go.mod new file mode 100644 index 0000000..d3e5802 --- /dev/null +++ b/src/go.mod @@ -0,0 +1,19 @@ +module fuse-device-plugin + +go 1.20 + +require ( + github.com/fsnotify/fsnotify v1.6.0 + golang.org/x/net v0.13.0 + google.golang.org/grpc v1.54.0 + k8s.io/kubelet v0.28.1 +) + +require ( + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect + golang.org/x/sys v0.10.0 // indirect + golang.org/x/text v0.11.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect + google.golang.org/protobuf v1.30.0 // indirect +) diff --git a/src/go.sum b/src/go.sum new file mode 100644 index 0000000..b87ae19 --- /dev/null +++ b/src/go.sum @@ -0,0 +1,55 @@ +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.13.0 h1:Nvo8UFsZ8X3BhAC9699Z1j7XQ3rsZnUUm7jfBEk1ueY= +golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= +golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= +google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +k8s.io/kubelet v0.28.1 h1:QRfx+jrzNgkLnMSw/nxGkAN7cjHPO446MDbjPITxLkk= +k8s.io/kubelet v0.28.1/go.mod h1:xYBbbJ0e2Rtb/hv+QFie448lFF81J990ImIptce2AHk= diff --git a/src/main.go b/src/main.go new file mode 100644 index 0000000..3ea517b --- /dev/null +++ b/src/main.go @@ -0,0 +1,79 @@ +package main + +import ( + "flag" + "log" + "os" + "syscall" + + "github.com/fsnotify/fsnotify" + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" +) + +func main() { + var ( + mountsAllowed = 5 + ) + flag.IntVar(&mountsAllowed, "mounts_allowed", 5, "maximum times the fuse device can be mounted") + flag.Parse() + + log.Println("Starting") + defer func() { log.Println("Stopped:") }() + + log.Println("Starting FS watcher.") + watcher, err := newFSWatcher(pluginapi.DevicePluginPath) + if err != nil { + log.Println("Failed to created FS watcher.") + os.Exit(1) + } + defer watcher.Close() + + log.Println("Starting OS watcher.") + sigs := newOSWatcher(syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + + restart := true + var devicePlugin *FuseDevicePlugin + +L: + for { + if restart { + if devicePlugin != nil { + if err := devicePlugin.Stop(); err != nil { + log.Printf("Failed to stop plugin: %+v", err) + } + } + + log.Printf("Max device count: %d", mountsAllowed) + devicePlugin = NewFuseDevicePlugin(mountsAllowed) + if err := devicePlugin.Serve(); err != nil { + log.Println("Could not contact Kubelet, retrying. Did you enable the device plugin feature gate?") + } else { + restart = false + } + } + + select { + case event := <-watcher.Events: + if event.Name == pluginapi.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create { + log.Printf("Inotify: %s created, restarting.", pluginapi.KubeletSocket) + restart = true + } + + case err := <-watcher.Errors: + log.Printf("Inotify: %s", err) + + case s := <-sigs: + switch s { + case syscall.SIGHUP: + log.Println("Received SIGHUP, restarting.") + restart = true + default: + log.Printf("Received signal \"%v\", shutting down.", s) + if err := devicePlugin.Stop(); err != nil { + log.Printf("Failed to stop plugin: %+v", err) + } + break L + } + } + } +} diff --git a/src/server.go b/src/server.go new file mode 100644 index 0000000..719d40f --- /dev/null +++ b/src/server.go @@ -0,0 +1,215 @@ +package main + +import ( + "fmt" + "log" + "net" + "os" + "path" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" +) + +const ( + resourceName = "github.com/fuse" + serverSock = pluginapi.DevicePluginPath + "fuse.sock" +) + +// FuseDevicePlugin implements the Kubernetes device plugin API +type FuseDevicePlugin struct { + devs []*pluginapi.Device + socket string + + stop chan interface{} + health chan *pluginapi.Device + + server *grpc.Server +} + +func NewFuseDevicePlugin(number int) *FuseDevicePlugin { + return &FuseDevicePlugin{ + devs: getDevices(number), + socket: serverSock, + + stop: make(chan interface{}), + health: make(chan *pluginapi.Device), + } +} + +func (m *FuseDevicePlugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { + return &pluginapi.DevicePluginOptions{}, nil +} + +func (m *FuseDevicePlugin) GetPreferredAllocation(context.Context, *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) { + return &pluginapi.PreferredAllocationResponse{}, nil +} + +func (m *FuseDevicePlugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { + return &pluginapi.PreStartContainerResponse{}, nil +} + +// dial establishes the gRPC communication with the registered device plugin. +func dial(unixSocketPath string, timeout time.Duration) (*grpc.ClientConn, error) { + c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(), + grpc.WithTimeout(timeout), + grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", addr, timeout) + }), + ) + + if err != nil { + return nil, err + } + + return c, nil +} + +// Start starts the gRPC server of the device plugin +func (m *FuseDevicePlugin) Start() error { + err := m.cleanup() + if err != nil { + return err + } + + sock, err := net.Listen("unix", m.socket) + if err != nil { + return err + } + + m.server = grpc.NewServer([]grpc.ServerOption{}...) + pluginapi.RegisterDevicePluginServer(m.server, m) + + go m.server.Serve(sock) + + // Wait for server to start by launching a blocking connexion + conn, err := dial(m.socket, 5*time.Second) + if err != nil { + return err + } + conn.Close() + + go m.healthcheck() + + return nil +} + +// Stop stops the gRPC server +func (m *FuseDevicePlugin) Stop() error { + if m.server == nil { + return nil + } + + m.server.Stop() + m.server = nil + close(m.stop) + + return m.cleanup() +} + +// Register registers the device plugin for the given resourceName with Kubelet. +func (m *FuseDevicePlugin) Register(kubeletEndpoint, resourceName string) error { + conn, err := dial(kubeletEndpoint, 5*time.Second) + if err != nil { + return err + } + defer conn.Close() + + client := pluginapi.NewRegistrationClient(conn) + reqt := &pluginapi.RegisterRequest{ + Version: pluginapi.Version, + Endpoint: path.Base(m.socket), + ResourceName: resourceName, + } + + _, err = client.Register(context.Background(), reqt) + if err != nil { + return err + } + return nil +} + +// ListAndWatch lists devices and update that list according to the health status +func (m *FuseDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { + if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs}); err != nil { + log.Printf("Failed to send message when start to list and watch: %+v", err) + } + + for { + select { + case <-m.stop: + return nil + case d := <-m.health: + d.Health = pluginapi.Unhealthy + if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs}); err != nil { + log.Printf("Failed to send message when plugin is to be health: %+v", err) + } + } + } +} + +func (m *FuseDevicePlugin) unhealthy(dev *pluginapi.Device) { + m.health <- dev +} + +// Allocate which return list of devices. +func (m *FuseDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { + devs := m.devs + var responses pluginapi.AllocateResponse + + for _, req := range reqs.ContainerRequests { + for _, id := range req.DevicesIDs { + if !deviceExists(devs, id) { + return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id) + } + + response := new(pluginapi.ContainerAllocateResponse) + response.Devices = []*pluginapi.DeviceSpec{ + { + ContainerPath: "/dev/fuse", + HostPath: "/dev/fuse", + Permissions: "rwm", + }, + } + + responses.ContainerResponses = append(responses.ContainerResponses, response) + } + } + + return &responses, nil +} + +func (m *FuseDevicePlugin) cleanup() error { + if err := os.Remove(m.socket); err != nil && !os.IsNotExist(err) { + return err + } + + return nil +} + +func (m *FuseDevicePlugin) healthcheck() { + for range m.stop { + return + } +} + +// Serve starts the gRPC server and register the device plugin to Kubelet +func (m *FuseDevicePlugin) Serve() error { + err := m.Start() + if err != nil { + log.Printf("Could not start device plugin: %s", err) + return err + } + log.Println("Starting to serve on", m.socket) + + err = m.Register(pluginapi.KubeletSocket, resourceName) + if err != nil { + log.Printf("Could not register device plugin: %s", err) + m.Stop() + return err + } + log.Println("Registered device plugin with Kubelet") + return nil +} diff --git a/src/watchers.go b/src/watchers.go new file mode 100644 index 0000000..a46f86a --- /dev/null +++ b/src/watchers.go @@ -0,0 +1,35 @@ +package main + +import ( + "log" + "os" + "os/signal" + + "github.com/fsnotify/fsnotify" +) + +func newFSWatcher(files ...string) (*fsnotify.Watcher, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Printf("new watcher meet error:%s\n", err.Error()) + return nil, err + } + + for _, f := range files { + err = watcher.Add(f) + if err != nil { + log.Printf("add watcher meet error:%s\n", err.Error()) + watcher.Close() + return nil, err + } + } + + return watcher, nil +} + +func newOSWatcher(sigs ...os.Signal) chan os.Signal { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, sigs...) + + return sigChan +} diff --git a/test-pod.yaml b/test-pod.yaml index cbd7854..818b662 100644 --- a/test-pod.yaml +++ b/test-pod.yaml @@ -16,9 +16,6 @@ spec: metadata: labels: app: podman - # can be removed once the podman upstream AppArmor profile is fixed / allows mount syscall - annotations: - container.apparmor.security.beta.kubernetes.io/podman: unconfined spec: containers: - name: podman