package main import ( "fmt" "log" "net" "os" "path" "time" "golang.org/x/net/context" "google.golang.org/grpc" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" ) const ( resourceName = "github.com/fuse" serverSock = pluginapi.DevicePluginPath + "fuse.sock" ) // FuseDevicePlugin implements the Kubernetes device plugin API type FuseDevicePlugin struct { devs []*pluginapi.Device socket string stop chan interface{} health chan *pluginapi.Device server *grpc.Server } func NewFuseDevicePlugin(number int) *FuseDevicePlugin { return &FuseDevicePlugin{ devs: getDevices(number), socket: serverSock, stop: make(chan interface{}), health: make(chan *pluginapi.Device), } } func (m *FuseDevicePlugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { return &pluginapi.DevicePluginOptions{}, nil } func (m *FuseDevicePlugin) GetPreferredAllocation(context.Context, *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) { return &pluginapi.PreferredAllocationResponse{}, nil } func (m *FuseDevicePlugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { return &pluginapi.PreStartContainerResponse{}, nil } // dial establishes the gRPC communication with the registered device plugin. func dial(unixSocketPath string, timeout time.Duration) (*grpc.ClientConn, error) { c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(timeout), grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { return net.DialTimeout("unix", addr, timeout) }), ) if err != nil { return nil, err } return c, nil } // Start starts the gRPC server of the device plugin func (m *FuseDevicePlugin) Start() error { err := m.cleanup() if err != nil { return err } sock, err := net.Listen("unix", m.socket) if err != nil { return err } m.server = grpc.NewServer([]grpc.ServerOption{}...) pluginapi.RegisterDevicePluginServer(m.server, m) go m.server.Serve(sock) // Wait for server to start by launching a blocking connexion conn, err := dial(m.socket, 5*time.Second) if err != nil { return err } conn.Close() go m.healthcheck() return nil } // Stop stops the gRPC server func (m *FuseDevicePlugin) Stop() error { if m.server == nil { return nil } m.server.Stop() m.server = nil close(m.stop) return m.cleanup() } // Register registers the device plugin for the given resourceName with Kubelet. func (m *FuseDevicePlugin) Register(kubeletEndpoint, resourceName string) error { conn, err := dial(kubeletEndpoint, 5*time.Second) if err != nil { return err } defer conn.Close() client := pluginapi.NewRegistrationClient(conn) reqt := &pluginapi.RegisterRequest{ Version: pluginapi.Version, Endpoint: path.Base(m.socket), ResourceName: resourceName, } _, err = client.Register(context.Background(), reqt) if err != nil { return err } return nil } // ListAndWatch lists devices and update that list according to the health status func (m *FuseDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error { if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs}); err != nil { log.Printf("Failed to send message when start to list and watch: %+v", err) } for { select { case <-m.stop: return nil case d := <-m.health: d.Health = pluginapi.Unhealthy if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs}); err != nil { log.Printf("Failed to send message when plugin is to be health: %+v", err) } } } } func (m *FuseDevicePlugin) unhealthy(dev *pluginapi.Device) { m.health <- dev } // Allocate which return list of devices. func (m *FuseDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { devs := m.devs var responses pluginapi.AllocateResponse for _, req := range reqs.ContainerRequests { for _, id := range req.DevicesIDs { if !deviceExists(devs, id) { return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id) } response := new(pluginapi.ContainerAllocateResponse) response.Devices = []*pluginapi.DeviceSpec{ { ContainerPath: "/dev/fuse", HostPath: "/dev/fuse", Permissions: "rwm", }, } responses.ContainerResponses = append(responses.ContainerResponses, response) } } return &responses, nil } func (m *FuseDevicePlugin) cleanup() error { if err := os.Remove(m.socket); err != nil && !os.IsNotExist(err) { return err } return nil } func (m *FuseDevicePlugin) healthcheck() { for range m.stop { return } } // Serve starts the gRPC server and register the device plugin to Kubelet func (m *FuseDevicePlugin) Serve() error { err := m.Start() if err != nil { log.Printf("Could not start device plugin: %s", err) return err } log.Println("Starting to serve on", m.socket) err = m.Register(pluginapi.KubeletSocket, resourceName) if err != nil { log.Printf("Could not register device plugin: %s", err) m.Stop() return err } log.Println("Registered device plugin with Kubelet") return nil }