// StorageSerializationOptions contains the options for encoding resources. type StorageSerializationOptions struct { StorageVersions string // The default values for StorageVersions. StorageVersions overrides // these; you can change this if you want to change the defaults (e.g., // for testing). This is not actually exposed as a flag.
—storage-versions 参数说明: The per-group version to store resources in. Specified in the format “group1/version1,group2/version2,…”. In the case where objects are moved from one group to the other, you may specify the format “group1=group2/v1beta1,group3/v1beta1,…”. You only need to pass the groups you wish to change from the defaults. It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable. (default “admission.k8s.io/v1beta1,admissionregistration.k8s.io/v1beta1,apps/v1,authentication.k8s.io/v1,authorization.k8s.io/v1,autoscaling/v1,batch/v1,certificates.k8s.io/v1beta1,componentconfig/v1alpha1,events.k8s.io/v1beta1,extensions/v1beta1,imagepolicy.k8s.io/v1alpha1,networking.k8s.io/v1,policy/v1beta1,rbac.authorization.k8s.io/v1,scheduling.k8s.io/v1alpha1,settings.k8s.io/v1alpha1,storage.k8s.io/v1,v1”)
// 构建了存储工厂实例,使用了DefaultStorageFactory类型,归并了缺省资源编码(defaultResourceEncoding)与用户指定的数据 storageFactory, err := kubeapiserver.NewStorageFactory( s.Etcd.StorageConfig, // Etcd的配置 s.Etcd.DefaultStorageMediaType, // 缺省存储媒介类型:application/json legacyscheme.Codecs, // 传统的编解码 serverstorage.NewDefaultResourceEncodingConfig(legacyscheme.Registry), storageGroupsToEncodingVersion, // 前面创建的Group-> GroupVersion的映射 // The list includes resources that need to be stored in a different // group version than other resources in the groups. // FIXME (soltysh): this GroupVersionResource override should be configurable // 下面列表列举了与组内其他资源不同,需要存储到不同的group version的资源 []schema.GroupVersionResource{ batch.Resource("cronjobs").WithVersion("v1beta1"), storage.Resource("volumeattachments").WithVersion("v1beta1"), admissionregistration.Resource("initializerconfigurations").WithVersion("v1alpha1"), }, apiResourceConfig) // 描述了启动的API信息,基于启动参数中的APIEnablementOptions生成,存放到了genericapiserver.Config的MergedResourceConfig字段中。 // 该字段合并了DefaultAPIResourceConfigSource()方法定义的资源。
if err != nil { returnnil, fmt.Errorf("error in initializing storage factory: %s", err) }
// 同居资源绑定,约定了同居资源的查找顺序 storageFactory.AddCohabitatingResources(networking.Resource("networkpolicies"), extensions.Resource("networkpolicies")) storageFactory.AddCohabitatingResources(apps.Resource("deployments"), extensions.Resource("deployments")) storageFactory.AddCohabitatingResources(apps.Resource("daemonsets"), extensions.Resource("daemonsets")) storageFactory.AddCohabitatingResources(apps.Resource("replicasets"), extensions.Resource("replicasets")) storageFactory.AddCohabitatingResources(api.Resource("events"), events.Resource("events")) for _, override := range s.Etcd.EtcdServersOverrides { // EtcdServersOverrides的格式:group/resource#servers,可以指定不同资源存储在不同的Etcd服务中。 tokens := strings.Split(override, "#") iflen(tokens) != 2 { glog.Errorf("invalid value of etcd server overrides: %s", override) continue }
// NewStorage returns a RESTStorage object that will work against pods. funcNewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter)PodStorage {
// Creates a cacher based given storageConfig. funcStorageWithCacher(capacity int)generic.StorageDecorator { returnfunc( storageConfig *storagebackend.Config, objectType runtime.Object, resourcePrefix string, keyFunc func(obj runtime.Object)(string, error), newListFunc func()runtime.Object, getAttrsFunc storage.AttrFunc, triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) { // 创建一个裸的ETCD存储接口实例 s, d := generic.NewRawStorage(storageConfig) if capacity == 0 { glog.V(5).Infof("Storage caching is disabled for %T", objectType) return s, d } glog.V(5).Infof("Storage caching is enabled for %T with capacity %v", objectType, capacity)
// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside. // Currently it has two layers of same storage interface -- cacher and low level kv. cacherConfig := storage.CacherConfig{ CacheCapacity: capacity, Storage: s, Versioner: etcdstorage.APIObjectVersioner{}, Type: objectType, ResourcePrefix: resourcePrefix, KeyFunc: keyFunc, NewListFunc: newListFunc, GetAttrsFunc: getAttrsFunc, TriggerPublisherFunc: triggerFunc, Codec: storageConfig.Codec, } // 基于裸ETCD存储实例创建带缓存能力的存储 cacher := storage.NewCacherFromConfig(cacherConfig) // destroyFunc是用于释放存储本身资源的。 destroyFunc := func() { cacher.Stop() d() }
// TODO : Remove RegisterStorageCleanup below when PR // https://github.com/kubernetes/kubernetes/pull/50690 // merges as that shuts down storage properly RegisterStorageCleanup(destroyFunc)
funcCreate(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { switchc.Type { case storagebackend.StorageTypeETCD2: return newETCD2Storage(c) case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: // TODO: We have the following features to implement: // - Support secure connection by using key, cert, and CA files. // - Honor "https" scheme to support secure connection in gRPC. // - Support non-quorum read. return newETCD3Storage(c) default: returnnil, nil, fmt.Errorf("unknown storage type: %s", c.Type) } }
// New returns an etcd3 implementation of storage.Interface. funcNew(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface { return newStore(c, true, pagingEnabled, codec, prefix, transformer) }
funcnewStore(c *clientv3.Client, quorumRead, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store { versioner := etcd.APIObjectVersioner{} // 实现了版本化 result := &store{ client: c, codec: codec, versioner: versioner, transformer: transformer, pagingEnabled: pagingEnabled, // for compatibility with etcd2 impl. // no-op for default prefix of '/registry'. // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/' pathPrefix: path.Join("/", prefix), watcher: newWatcher(c, codec, versioner, transformer), } if !quorumRead { // In case of non-quorum reads, we can set WithSerializable() // options for all Get operations. result.getOps = append(result.getOps, clientv3.WithSerializable()) } return result }
// Creater is an object that can create an instance of a RESTful object. type Creater interface{ // New returns an empty object that can be used with Create after request data has been put into it. // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object) New() runtime.Object
// Create creates a new version of a resource. If includeUninitialized is set, the object may be returned // without completing initialization. Create(ctx genericapirequest.Context, obj runtime.Object, createValidation ValidateObjectFunc, includeUninitialized bool) (runtime.Object, error) }
// Create implements storage.Interface.Create. func(s *store)Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64)error { // version不能被设置 if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 { return errors.New("resourceVersion should not be set on objects to be created") }
// NewStorageCodec assembles a storage codec for the provided storage media type, the provided serializer, and the requested // storage and memory versions. funcNewStorageCodec(opts StorageCodecConfig)(runtime.Codec, error) { mediaType, _, err := mime.ParseMediaType(opts.StorageMediaType) // 这里一般为: application/json的话,返回的mediaType 也是application/json if err != nil { returnnil, fmt.Errorf("%q is not a valid mime-type", opts.StorageMediaType) }
// ETCD2 只支持 application/json if opts.Config.Type == storagebackend.StorageTypeETCD2 && mediaType != "application/json" { glog.Warningf(`storage type %q does not support media type %q, using "application/json"`, storagebackend.StorageTypeETCD2, mediaType) mediaType = "application/json" }
// 寻找对应的媒介类型的序列化实例,如果找不到,系统会报错 // 注意:在前面分析DefaultStorageFactory.NewConfig中,知道opts.StorageSerializer = DefaultStorageFactory.DefaultSerializer = legacyscheme.Codecs serializer, ok := runtime.SerializerInfoForMediaType(opts.StorageSerializer.SupportedMediaTypes(), mediaType) if !ok { // 如果找不到,系统会报错 returnnil, fmt.Errorf("unable to find serializer for %q", mediaType) }
// 知道对应的媒介类型的序列化器,具体可以参见一下序列化工厂部分文档 s := serializer.Serializer
// etcd2不支持二进制格式,只支持文本格式。 // make sure the selected encoder supports string data if !serializer.EncodesAsText && opts.Config.Type == storagebackend.StorageTypeETCD2 { returnnil, fmt.Errorf("storage type %q does not support binary media type %q", storagebackend.StorageTypeETCD2, mediaType) }
// serializer实现了Encoder,Decoder接口 // Give callers the opportunity to wrap encoders and decoders. For decoders, each returned decoder will // be passed to the recognizer so that multiple decoders are available. var encoder runtime.Encoder = s if opts.EncoderDecoratorFn != nil { // Encoder封装函数 encoder = opts.EncoderDecoratorFn(encoder) } decoders := []runtime.Decoder{ // selected decoder as the primary s, // universal deserializer as a fallback opts.StorageSerializer.UniversalDeserializer(), // s解析不了的,采用万能解析器 // base64-wrapped universal deserializer as a last resort. // this allows reading base64-encoded protobuf, which should only exist if etcd2+protobuf was used at some point. // data written that way could exist in etcd2, or could have been migrated to etcd3. // TODO: flag this type of data if we encounter it, require migration (read to decode, write to persist using a supported encoder), and remove in 1.8 runtime.NewBase64Serializer(nil, opts.StorageSerializer.UniversalDeserializer()), // 最后尝试Base64解析器 } if opts.DecoderDecoratorFn != nil { // Decoder封装函数 decoders = opts.DecoderDecoratorFn(decoders) }
type StorageSerializer interface { // SupportedMediaTypes are the media types supported for reading and writing objects. SupportedMediaTypes() []SerializerInfo
// UniversalDeserializer returns a Serializer that can read objects in multiple supported formats // by introspecting the data at rest. UniversalDeserializer() Decoder
// 返回一个encoder,该encoder能够保证写入底层序列化器的对象是指定的GV // EncoderForVersion returns an encoder that ensures objects being written to the provided // serializer are in the provided group version. EncoderForVersion(serializer Encoder, gv GroupVersioner) Encoder
// 返回一个decoder,该decoder能够保证被底层序列化器的反序列化的对象是指定的GV // DecoderForVersion returns a decoder that ensures objects being read by the provided // serializer are in the provided group version by default. DecoderToVersion(serializer Decoder, gv GroupVersioner) Decoder }
type APIRegistrationManager struct { // registeredGroupVersions stores all API group versions for which RegisterGroup is called. registeredVersions map[schema.GroupVersion]struct{} // 注册的group version
// enabledVersions represents all enabled API versions. It should be a // subset of registeredVersions. Please call EnableVersions() to add // enabled versions. enabledVersions map[schema.GroupVersion]struct{} // 启用的group version , 是注册GV的子集
// map of group meta for all groups. groupMetaMap map[string]*apimachinery.GroupMeta // 所有组的元数据信息
// envRequestedVersions represents the versions requested via the // KUBE_API_VERSIONS environment variable. The install package of each group // checks this list before add their versions to the latest package and // Scheme. This list is small and order matters, so represent as a slice envRequestedVersions []schema.GroupVersion }
// RegisterAndEnable is provided only to allow this code to get added in multiple steps. // It's really bad that this is called in init() methods, but supporting this // temporarily lets us do the change incrementally. func(gmf *GroupMetaFactory)RegisterAndEnable(registry *registered.APIRegistrationManager, scheme *runtime.Scheme)error { if err := gmf.Register(registry); err != nil { // 注册GV信息,只有注册以后的版本,才能Enable return err } if err := gmf.Enable(registry, scheme); err != nil { // Enable版本,添加资源数据对象到Scheme中 return err }
// Register constructs the finalized prioritized version list and sanity checks // the announced group & versions. Then it calls register. func(gmf *GroupMetaFactory)Register(m *registered.APIRegistrationManager)error { if gmf.GroupArgs == nil { return fmt.Errorf("partially announced groups are not allowed, only got versions: %#v", gmf.VersionArgs) } iflen(gmf.VersionArgs) == 0 { return fmt.Errorf("group %v announced but no versions announced", gmf.GroupArgs.GroupName) }
pvSet := sets.NewString(gmf.GroupArgs.VersionPreferenceOrder...) if pvSet.Len() != len(gmf.GroupArgs.VersionPreferenceOrder) { return fmt.Errorf("preference order for group %v has duplicates: %v", gmf.GroupArgs.GroupName, gmf.GroupArgs.VersionPreferenceOrder) } prioritizedVersions := []schema.GroupVersion{} // 版本信息 for _, v := range gmf.GroupArgs.VersionPreferenceOrder { prioritizedVersions = append( prioritizedVersions, schema.GroupVersion{ Group: gmf.GroupArgs.GroupName, // 组,这里为“” Version: v, // 这里为v1 }, ) }
// Go through versions that weren't explicitly prioritized. unprioritizedVersions := []schema.GroupVersion{} for _, v := range gmf.VersionArgs { if v.GroupName != gmf.GroupArgs.GroupName { return fmt.Errorf("found %v/%v in group %v?", v.GroupName, v.VersionName, gmf.GroupArgs.GroupName) } if pvSet.Has(v.VersionName) { pvSet.Delete(v.VersionName) continue } unprioritizedVersions = append(unprioritizedVersions, schema.GroupVersion{Group: v.GroupName, Version: v.VersionName}) } iflen(unprioritizedVersions) > 1 { glog.Warningf("group %v has multiple unprioritized versions: %#v. They will have an arbitrary preference order!", gmf.GroupArgs.GroupName, unprioritizedVersions) } if pvSet.Len() != 0 { return fmt.Errorf("group %v has versions in the priority list that were never announced: %s", gmf.GroupArgs.GroupName, pvSet) } prioritizedVersions = append(prioritizedVersions, unprioritizedVersions...) m.RegisterVersions(prioritizedVersions) // 注册GV,这里其实就是 {group="", version="v1"} gmf.prioritizedVersionList = prioritizedVersions returnnil }
type GroupMeta struct { // GroupVersion represents the preferred version of the group. GroupVersion schema.GroupVersion // 优先的组、版本信息
// GroupVersions is Group + all versions in that group. GroupVersions []schema.GroupVersion // 本组中的所有的版本信息
// SelfLinker can set or get the SelfLink field of all API types. // TODO: when versioning changes, make this part of each API definition. // TODO(lavalamp): Combine SelfLinker & ResourceVersioner interfaces, force all uses // to go through the InterfacesFor method below. SelfLinker runtime.SelfLinker // SelfLinker这块一直没有研究
// RESTMapper provides the default mapping between REST paths and the objects declared in a Scheme and all known // versions. RESTMapper meta.RESTMapper // Kind与资源的对应关系,譬如Kind为Pod,那么对应的资源有:pod, pods
// InterfacesFor returns the default Codec and ResourceVersioner for a given version // string, or an error if the version is not known. // TODO: make this stop being a func pointer and always use the default // function provided below once every place that populates this field has been changed. InterfacesFor func(version schema.GroupVersion)(*meta.VersionInterfaces, error) // 一般指向DefaultInterfacesFor函数,基于下面的map来返回对应的VersionInterfaces
func(gmf *GroupMetaFactory)newRESTMapper(scheme *runtime.Scheme, externalVersions []schema.GroupVersion, groupMeta *apimachinery.GroupMeta)meta.RESTMapper { // the list of kinds that are scoped at the root of the api hierarchy // if a kind is not enumerated here, it is assumed to have a namespace scope rootScoped := sets.NewString() if gmf.GroupArgs.RootScopedKinds != nil { rootScoped = gmf.GroupArgs.RootScopedKinds } ignoredKinds := sets.NewString() if gmf.GroupArgs.IgnoredKinds != nil { ignoredKinds = gmf.GroupArgs.IgnoredKinds }
// 创建缺省的RESTMapper mapper := meta.NewDefaultRESTMapper(externalVersions, groupMeta.InterfacesFor) for _, gv := range externalVersions { for kind := range scheme.KnownTypes(gv) { // 遍历scheme中的Kind if ignoredKinds.Has(kind) { // 不需要放到资源中的资源数据类型,在Install中定义的GroupMetaFactory continue } scope := meta.RESTScopeNamespace if rootScoped.Has(kind) { // Root资源,没有Namespace的资源,在Install中定义的GroupMetaFactory。 // 一般有四种:Node,Namepsace,PersistentVolume,ComponentsStatus scope = meta.RESTScopeRoot } mapper.Add(gv.WithKind(kind), scope) // Kind 《-》 Resource 映射 } }