Use updated GO dependencies and fixes from https://github.com/cox96de/fuse-device-plugin.git

This commit is contained in:
Stefan Reimer 2023-09-27 17:56:58 +00:00
parent 20b9025515
commit dc23e0323a
10 changed files with 469 additions and 11 deletions

View File

@ -1,12 +1,12 @@
ARG DISTRO_VERSION="alpine" FROM golang:1.20-alpine as builder
FROM golang:${DISTRO_VERSION} as builder COPY src /fuse-device-plugin
WORKDIR /fuse-device-plugin
RUN apk add --update git RUN pwd && ls -laR
RUN git clone https://github.com/kuberenetes-learning-group/fuse-device-plugin.git --depth 1
WORKDIR fuse-device-plugin RUN CGO_ENABLED=0 go build -ldflags "-s -w" .
RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -ldflags "-s -w" .
FROM scratch FROM scratch
COPY --from=builder /go/fuse-device-plugin/fuse-device-plugin /fuse-device-plugin COPY --from=builder /fuse-device-plugin/fuse-device-plugin /fuse-device-plugin
ENTRYPOINT ["/fuse-device-plugin", "--mounts_allowed", "5000"] ENTRYPOINT ["/fuse-device-plugin", "--mounts_allowed", "40"]

View File

@ -4,4 +4,5 @@ Public ECR repo: https://gallery.ecr.aws/zero-downtime/fuse-device-plugin
# Resources # Resources
- https://github.com/kuberenetes-learning-group/fuse-device-plugin - https://github.com/kuberenetes-learning-group/fuse-device-plugin
- https://github.com/cox96de/fuse-device-plugin
- https://flavio.castelli.me/2020/09/16/build-multi-architecture-container-images-using-kubernetes/ - https://flavio.castelli.me/2020/09/16/build-multi-architecture-container-images-using-kubernetes/

28
fuse-device-plugin.yml Normal file
View File

@ -0,0 +1,28 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fuse-device-plugin
namespace: kube-system
spec:
selector:
matchLabels:
name: fuse-device-plugin
template:
metadata:
labels:
name: fuse-device-plugin
spec:
containers:
- image: public.ecr.aws/zero-downtime/fuse-device-plugin:latest
name: fuse-device-plugin
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop: ["ALL"]
volumeMounts:
- name: device-plugin
mountPath: /var/lib/kubelet/device-plugins
volumes:
- name: device-plugin
hostPath:
path: /var/lib/kubelet/device-plugins

29
src/fuse.go Normal file
View File

@ -0,0 +1,29 @@
package main
import (
"fmt"
"os"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
func getDevices(number int) []*pluginapi.Device {
hostname, _ := os.Hostname()
var devs []*pluginapi.Device
for i := 0; i < number; i++ {
devs = append(devs, &pluginapi.Device{
ID: fmt.Sprintf("fuse-%s-%d", hostname, i),
Health: pluginapi.Healthy,
})
}
return devs
}
func deviceExists(devs []*pluginapi.Device, id string) bool {
for _, d := range devs {
if d.ID == id {
return true
}
}
return false
}

19
src/go.mod Normal file
View File

@ -0,0 +1,19 @@
module fuse-device-plugin
go 1.20
require (
github.com/fsnotify/fsnotify v1.6.0
golang.org/x/net v0.13.0
google.golang.org/grpc v1.54.0
k8s.io/kubelet v0.28.1
)
require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)

55
src/go.sum Normal file
View File

@ -0,0 +1,55 @@
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.13.0 h1:Nvo8UFsZ8X3BhAC9699Z1j7XQ3rsZnUUm7jfBEk1ueY=
golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
k8s.io/kubelet v0.28.1 h1:QRfx+jrzNgkLnMSw/nxGkAN7cjHPO446MDbjPITxLkk=
k8s.io/kubelet v0.28.1/go.mod h1:xYBbbJ0e2Rtb/hv+QFie448lFF81J990ImIptce2AHk=

79
src/main.go Normal file
View File

@ -0,0 +1,79 @@
package main
import (
"flag"
"log"
"os"
"syscall"
"github.com/fsnotify/fsnotify"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
func main() {
var (
mountsAllowed = 5
)
flag.IntVar(&mountsAllowed, "mounts_allowed", 5, "maximum times the fuse device can be mounted")
flag.Parse()
log.Println("Starting")
defer func() { log.Println("Stopped:") }()
log.Println("Starting FS watcher.")
watcher, err := newFSWatcher(pluginapi.DevicePluginPath)
if err != nil {
log.Println("Failed to created FS watcher.")
os.Exit(1)
}
defer watcher.Close()
log.Println("Starting OS watcher.")
sigs := newOSWatcher(syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
restart := true
var devicePlugin *FuseDevicePlugin
L:
for {
if restart {
if devicePlugin != nil {
if err := devicePlugin.Stop(); err != nil {
log.Printf("Failed to stop plugin: %+v", err)
}
}
log.Printf("Max device count: %d", mountsAllowed)
devicePlugin = NewFuseDevicePlugin(mountsAllowed)
if err := devicePlugin.Serve(); err != nil {
log.Println("Could not contact Kubelet, retrying. Did you enable the device plugin feature gate?")
} else {
restart = false
}
}
select {
case event := <-watcher.Events:
if event.Name == pluginapi.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
log.Printf("Inotify: %s created, restarting.", pluginapi.KubeletSocket)
restart = true
}
case err := <-watcher.Errors:
log.Printf("Inotify: %s", err)
case s := <-sigs:
switch s {
case syscall.SIGHUP:
log.Println("Received SIGHUP, restarting.")
restart = true
default:
log.Printf("Received signal \"%v\", shutting down.", s)
if err := devicePlugin.Stop(); err != nil {
log.Printf("Failed to stop plugin: %+v", err)
}
break L
}
}
}
}

215
src/server.go Normal file
View File

@ -0,0 +1,215 @@
package main
import (
"fmt"
"log"
"net"
"os"
"path"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
const (
resourceName = "github.com/fuse"
serverSock = pluginapi.DevicePluginPath + "fuse.sock"
)
// FuseDevicePlugin implements the Kubernetes device plugin API
type FuseDevicePlugin struct {
devs []*pluginapi.Device
socket string
stop chan interface{}
health chan *pluginapi.Device
server *grpc.Server
}
func NewFuseDevicePlugin(number int) *FuseDevicePlugin {
return &FuseDevicePlugin{
devs: getDevices(number),
socket: serverSock,
stop: make(chan interface{}),
health: make(chan *pluginapi.Device),
}
}
func (m *FuseDevicePlugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
return &pluginapi.DevicePluginOptions{}, nil
}
func (m *FuseDevicePlugin) GetPreferredAllocation(context.Context, *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
return &pluginapi.PreferredAllocationResponse{}, nil
}
func (m *FuseDevicePlugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
return &pluginapi.PreStartContainerResponse{}, nil
}
// dial establishes the gRPC communication with the registered device plugin.
func dial(unixSocketPath string, timeout time.Duration) (*grpc.ClientConn, error) {
c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithTimeout(timeout),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
if err != nil {
return nil, err
}
return c, nil
}
// Start starts the gRPC server of the device plugin
func (m *FuseDevicePlugin) Start() error {
err := m.cleanup()
if err != nil {
return err
}
sock, err := net.Listen("unix", m.socket)
if err != nil {
return err
}
m.server = grpc.NewServer([]grpc.ServerOption{}...)
pluginapi.RegisterDevicePluginServer(m.server, m)
go m.server.Serve(sock)
// Wait for server to start by launching a blocking connexion
conn, err := dial(m.socket, 5*time.Second)
if err != nil {
return err
}
conn.Close()
go m.healthcheck()
return nil
}
// Stop stops the gRPC server
func (m *FuseDevicePlugin) Stop() error {
if m.server == nil {
return nil
}
m.server.Stop()
m.server = nil
close(m.stop)
return m.cleanup()
}
// Register registers the device plugin for the given resourceName with Kubelet.
func (m *FuseDevicePlugin) Register(kubeletEndpoint, resourceName string) error {
conn, err := dial(kubeletEndpoint, 5*time.Second)
if err != nil {
return err
}
defer conn.Close()
client := pluginapi.NewRegistrationClient(conn)
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: path.Base(m.socket),
ResourceName: resourceName,
}
_, err = client.Register(context.Background(), reqt)
if err != nil {
return err
}
return nil
}
// ListAndWatch lists devices and update that list according to the health status
func (m *FuseDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs}); err != nil {
log.Printf("Failed to send message when start to list and watch: %+v", err)
}
for {
select {
case <-m.stop:
return nil
case d := <-m.health:
d.Health = pluginapi.Unhealthy
if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs}); err != nil {
log.Printf("Failed to send message when plugin is to be health: %+v", err)
}
}
}
}
func (m *FuseDevicePlugin) unhealthy(dev *pluginapi.Device) {
m.health <- dev
}
// Allocate which return list of devices.
func (m *FuseDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
devs := m.devs
var responses pluginapi.AllocateResponse
for _, req := range reqs.ContainerRequests {
for _, id := range req.DevicesIDs {
if !deviceExists(devs, id) {
return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id)
}
response := new(pluginapi.ContainerAllocateResponse)
response.Devices = []*pluginapi.DeviceSpec{
{
ContainerPath: "/dev/fuse",
HostPath: "/dev/fuse",
Permissions: "rwm",
},
}
responses.ContainerResponses = append(responses.ContainerResponses, response)
}
}
return &responses, nil
}
func (m *FuseDevicePlugin) cleanup() error {
if err := os.Remove(m.socket); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
func (m *FuseDevicePlugin) healthcheck() {
for range m.stop {
return
}
}
// Serve starts the gRPC server and register the device plugin to Kubelet
func (m *FuseDevicePlugin) Serve() error {
err := m.Start()
if err != nil {
log.Printf("Could not start device plugin: %s", err)
return err
}
log.Println("Starting to serve on", m.socket)
err = m.Register(pluginapi.KubeletSocket, resourceName)
if err != nil {
log.Printf("Could not register device plugin: %s", err)
m.Stop()
return err
}
log.Println("Registered device plugin with Kubelet")
return nil
}

35
src/watchers.go Normal file
View File

@ -0,0 +1,35 @@
package main
import (
"log"
"os"
"os/signal"
"github.com/fsnotify/fsnotify"
)
func newFSWatcher(files ...string) (*fsnotify.Watcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Printf("new watcher meet error:%s\n", err.Error())
return nil, err
}
for _, f := range files {
err = watcher.Add(f)
if err != nil {
log.Printf("add watcher meet error:%s\n", err.Error())
watcher.Close()
return nil, err
}
}
return watcher, nil
}
func newOSWatcher(sigs ...os.Signal) chan os.Signal {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, sigs...)
return sigChan
}

View File

@ -16,9 +16,6 @@ spec:
metadata: metadata:
labels: labels:
app: podman app: podman
# can be removed once the podman upstream AppArmor profile is fixed / allows mount syscall
annotations:
container.apparmor.security.beta.kubernetes.io/podman: unconfined
spec: spec:
containers: containers:
- name: podman - name: podman