fuse-device-plugin/src/server.go

216 lines
5.2 KiB
Go

package main
import (
"fmt"
"log"
"net"
"os"
"path"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
const (
resourceName = "github.com/fuse"
serverSock = pluginapi.DevicePluginPath + "fuse.sock"
)
// FuseDevicePlugin implements the Kubernetes device plugin API
type FuseDevicePlugin struct {
devs []*pluginapi.Device
socket string
stop chan interface{}
health chan *pluginapi.Device
server *grpc.Server
}
func NewFuseDevicePlugin(number int) *FuseDevicePlugin {
return &FuseDevicePlugin{
devs: getDevices(number),
socket: serverSock,
stop: make(chan interface{}),
health: make(chan *pluginapi.Device),
}
}
func (m *FuseDevicePlugin) GetDevicePluginOptions(context.Context, *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
return &pluginapi.DevicePluginOptions{}, nil
}
func (m *FuseDevicePlugin) GetPreferredAllocation(context.Context, *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
return &pluginapi.PreferredAllocationResponse{}, nil
}
func (m *FuseDevicePlugin) PreStartContainer(context.Context, *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
return &pluginapi.PreStartContainerResponse{}, nil
}
// dial establishes the gRPC communication with the registered device plugin.
func dial(unixSocketPath string, timeout time.Duration) (*grpc.ClientConn, error) {
c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithTimeout(timeout),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
if err != nil {
return nil, err
}
return c, nil
}
// Start starts the gRPC server of the device plugin
func (m *FuseDevicePlugin) Start() error {
err := m.cleanup()
if err != nil {
return err
}
sock, err := net.Listen("unix", m.socket)
if err != nil {
return err
}
m.server = grpc.NewServer([]grpc.ServerOption{}...)
pluginapi.RegisterDevicePluginServer(m.server, m)
go m.server.Serve(sock)
// Wait for server to start by launching a blocking connexion
conn, err := dial(m.socket, 5*time.Second)
if err != nil {
return err
}
conn.Close()
go m.healthcheck()
return nil
}
// Stop stops the gRPC server
func (m *FuseDevicePlugin) Stop() error {
if m.server == nil {
return nil
}
m.server.Stop()
m.server = nil
close(m.stop)
return m.cleanup()
}
// Register registers the device plugin for the given resourceName with Kubelet.
func (m *FuseDevicePlugin) Register(kubeletEndpoint, resourceName string) error {
conn, err := dial(kubeletEndpoint, 5*time.Second)
if err != nil {
return err
}
defer conn.Close()
client := pluginapi.NewRegistrationClient(conn)
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: path.Base(m.socket),
ResourceName: resourceName,
}
_, err = client.Register(context.Background(), reqt)
if err != nil {
return err
}
return nil
}
// ListAndWatch lists devices and update that list according to the health status
func (m *FuseDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs}); err != nil {
log.Printf("Failed to send message when start to list and watch: %+v", err)
}
for {
select {
case <-m.stop:
return nil
case d := <-m.health:
d.Health = pluginapi.Unhealthy
if err := s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs}); err != nil {
log.Printf("Failed to send message when plugin is to be health: %+v", err)
}
}
}
}
func (m *FuseDevicePlugin) unhealthy(dev *pluginapi.Device) {
m.health <- dev
}
// Allocate which return list of devices.
func (m *FuseDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
devs := m.devs
var responses pluginapi.AllocateResponse
for _, req := range reqs.ContainerRequests {
for _, id := range req.DevicesIDs {
if !deviceExists(devs, id) {
return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id)
}
response := new(pluginapi.ContainerAllocateResponse)
response.Devices = []*pluginapi.DeviceSpec{
{
ContainerPath: "/dev/fuse",
HostPath: "/dev/fuse",
Permissions: "rwm",
},
}
responses.ContainerResponses = append(responses.ContainerResponses, response)
}
}
return &responses, nil
}
func (m *FuseDevicePlugin) cleanup() error {
if err := os.Remove(m.socket); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
func (m *FuseDevicePlugin) healthcheck() {
for range m.stop {
return
}
}
// Serve starts the gRPC server and register the device plugin to Kubelet
func (m *FuseDevicePlugin) Serve() error {
err := m.Start()
if err != nil {
log.Printf("Could not start device plugin: %s", err)
return err
}
log.Println("Starting to serve on", m.socket)
err = m.Register(pluginapi.KubeletSocket, resourceName)
if err != nil {
log.Printf("Could not register device plugin: %s", err)
m.Stop()
return err
}
log.Println("Registered device plugin with Kubelet")
return nil
}