Skip to content

Commit

Permalink
[WIP] Support multiple clusters with a file-based cluster registry (i…
Browse files Browse the repository at this point in the history
…stio#2880)

Support multiple clusters with a file-based cluster registry
  • Loading branch information
baodongli authored and costinm committed Feb 6, 2018
1 parent 1d1bc4a commit f444f24
Show file tree
Hide file tree
Showing 6 changed files with 820 additions and 22 deletions.
12 changes: 10 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pilot/cmd/pilot-discovery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ func init() {
bootstrap.KubernetesRegistry, bootstrap.ConsulRegistry, bootstrap.EurekaRegistry, bootstrap.CloudFoundryRegistry, bootstrap.MockRegistry))
discoveryCmd.PersistentFlags().StringVar(&serverArgs.Config.CFConfig, "cfConfig", "",
"Cloud Foundry config file")
discoveryCmd.PersistentFlags().StringVar(&serverArgs.Config.ClusterRegistriesDir, "clusterRegistriesDir", "",
"Directory for a file-based cluster config store")
discoveryCmd.PersistentFlags().StringVar(&serverArgs.Config.KubeConfig, "kubeconfig", "",
"Use a Kubernetes configuration file instead of in-cluster configuration")
discoveryCmd.PersistentFlags().StringVar(&serverArgs.Mesh.ConfigFile, "meshConfig", "/etc/istio/config/mesh",
Expand Down
95 changes: 81 additions & 14 deletions pilot/pkg/bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"net"
"os"
"path"
"time"

"code.cloudfoundry.org/copilot"
Expand All @@ -31,6 +32,7 @@ import (
meshconfig "istio.io/api/mesh/v1alpha1"
"istio.io/istio/pilot/cmd"
configaggregate "istio.io/istio/pilot/pkg/config/aggregate"
"istio.io/istio/pilot/pkg/config/clusterregistry"
"istio.io/istio/pilot/pkg/config/kube/crd"
"istio.io/istio/pilot/pkg/config/kube/crd/file"
"istio.io/istio/pilot/pkg/config/kube/ingress"
Expand Down Expand Up @@ -92,10 +94,11 @@ type MeshArgs struct {
// be monitored for CRD yaml files and will update the controller as those files change (This is used for testing
// purposes). Otherwise, a CRD client is created based on the configuration.
type ConfigArgs struct {
KubeConfig string
CFConfig string
ControllerOptions kube.ControllerOptions
FileDir string
ClusterRegistriesDir string
KubeConfig string
CFConfig string
ControllerOptions kube.ControllerOptions
FileDir string
}

// ConsulArgs provides configuration for the Consul service registry.
Expand Down Expand Up @@ -168,6 +171,7 @@ type Server struct {
kubeClient kubernetes.Interface
startFuncs []startFunc
listeningAddr net.Addr
clusterStore *clusterregistry.ClusterStore
}

// NewServer creates a new Server instance based on the provided arguments.
Expand All @@ -186,6 +190,9 @@ func NewServer(args PilotArgs) (*Server, error) {
if err := s.initMesh(&args); err != nil {
return nil, err
}
if err := s.initClusterRegistries(&args); err != nil {
return nil, err
}
if err := s.initKubeClient(&args); err != nil {
return nil, err
}
Expand Down Expand Up @@ -243,6 +250,16 @@ func (s *Server) initMonitor(args *PilotArgs) error {
return nil
}

func (s *Server) initClusterRegistries(args *PilotArgs) (err error) {
if args.Config.ClusterRegistriesDir != "" {
s.clusterStore, err = clusterregistry.ReadClusters(args.Config.ClusterRegistriesDir)
if s.clusterStore != nil {
log.Infof("clusters configuration %s", spew.Sdump(s.clusterStore))
}
}
return err
}

// initMesh creates the mesh in the pilotConfig from the input arguments.
func (s *Server) initMesh(args *PilotArgs) error {
// If a config file was specified, use it.
Expand Down Expand Up @@ -289,6 +306,19 @@ func (s *Server) initMixerSan(args *PilotArgs) error {
return nil
}

func (s *Server) getKubeCfgFile(args *PilotArgs) (kubeCfgFile string) {
// If the cluster store is configured, get pilot's kubeconfig from there
if s.clusterStore != nil {
if kubeCfgFile = s.clusterStore.GetPilotAccessConfig(); kubeCfgFile != "" {
kubeCfgFile = path.Join(args.Config.ClusterRegistriesDir, kubeCfgFile)
}
}
if kubeCfgFile == "" {
kubeCfgFile = args.Config.KubeConfig
}
return
}

// initKubeClient creates the k8s client if running in an k8s environment.
func (s *Server) initKubeClient(args *PilotArgs) error {
needToCreateClient := false
Expand All @@ -304,7 +334,11 @@ func (s *Server) initKubeClient(args *PilotArgs) error {
}

if needToCreateClient {
_, client, kuberr := kube.CreateInterface(args.Config.KubeConfig)
var client kubernetes.Interface
var kuberr error

kubeCfgFile := s.getKubeCfgFile(args)
_, client, kuberr = kube.CreateInterface(kubeCfgFile)
if kuberr != nil {
return multierror.Prefix(kuberr, "failed to connect to Kubernetes API.")
}
Expand Down Expand Up @@ -339,7 +373,8 @@ func (s *Server) initConfigController(args *PilotArgs) error {
return nil
})
} else {
configClient, err := crd.NewClient(args.Config.KubeConfig, configDescriptor,
kubeCfgFile := s.getKubeCfgFile(args)
configClient, err := crd.NewClient(kubeCfgFile, configDescriptor,
args.Config.ControllerOptions.DomainSuffix)
if err != nil {
return multierror.Prefix(err, "failed to open a config client.")
Expand All @@ -361,6 +396,43 @@ func (s *Server) initConfigController(args *PilotArgs) error {
return nil
}

// createK8sServiceControllers creates all the k8s service controllers under this pilot
func (s *Server) createK8sServiceControllers(serviceControllers *aggregate.Controller, args *PilotArgs) (err error) {
kubectl := kube.NewController(s.kubeClient, args.Config.ControllerOptions)
serviceControllers.AddRegistry(
aggregate.Registry{
Name: serviceregistry.ServiceRegistry(KubernetesRegistry),
ServiceDiscovery: kubectl,
ServiceAccounts: kubectl,
Controller: kubectl,
})

// Add clusters under the same pilot
if s.clusterStore != nil {
clusters := s.clusterStore.GetPilotClusters()
for _, cluster := range clusters {
kubeconfig := clusterregistry.GetClusterAccessConfig(cluster)
kubeCfgFile := path.Join(args.Config.ClusterRegistriesDir, kubeconfig)
log.Infof("Cluster name: %s, AccessConfigFile: %s", clusterregistry.GetClusterName(cluster), kubeCfgFile)
_, client, kuberr := kube.CreateInterface(kubeCfgFile)
if kuberr != nil {
err = multierror.Append(err, multierror.Prefix(kuberr, fmt.Sprintf("failed to connect to Access API with accessconfig: %s", kubeCfgFile)))
}

kubectl := kube.NewController(client, args.Config.ControllerOptions)
serviceControllers.AddRegistry(
aggregate.Registry{
Name: serviceregistry.ServiceRegistry(KubernetesRegistry),
ClusterName: clusterregistry.GetClusterName(cluster),
ServiceDiscovery: kubectl,
ServiceAccounts: kubectl,
Controller: kubectl,
})
}
}
return
}

// initServiceControllers creates and initializes the service controllers
func (s *Server) initServiceControllers(args *PilotArgs) error {
serviceControllers := aggregate.NewController()
Expand Down Expand Up @@ -401,14 +473,9 @@ func (s *Server) initServiceControllers(args *PilotArgs) error {
serviceControllers.AddRegistry(registry1)
serviceControllers.AddRegistry(registry2)
case KubernetesRegistry:
kubectl := kube.NewController(s.kubeClient, args.Config.ControllerOptions)
serviceControllers.AddRegistry(
aggregate.Registry{
Name: serviceregistry.ServiceRegistry(serviceRegistry),
ServiceDiscovery: kubectl,
ServiceAccounts: kubectl,
Controller: kubectl,
})
if err := s.createK8sServiceControllers(serviceControllers, args); err != nil {
return err
}
if s.mesh.IngressControllerMode != meshconfig.MeshConfig_OFF {
// Wrap the config controller with a cache.
configController, err := configaggregate.MakeCache([]model.ConfigStoreCache{
Expand Down
Loading

0 comments on commit f444f24

Please sign in to comment.