同步操作将从 infraboard/go-course 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
设计注意事项:
我们的目标: 可以通过云商账号同步不同区域的资产
项目所有源码: CMDB项目源码
如果以解决问题的思路来做, 一个脚本应该就可以了
下面是伪代码:
func syncHandler() {
// new client
client, err := sdk.NewClientWithAccessKey("REGION_ID", "ACCESS_KEY_ID", "ACCESS_KEY_SECRET")
if err != nil {
// Handle exceptions
panic(err)
}
// query instances
resp, err := client.DescribeInstances(req)
if err != nil {
return nil, err
}
// save hosts
db.save(transfer(resp))
}
func main() {
ht := http.HandlerFunc(syncHandler)
if ht != nil {
http.Handle("/sync", ht)
}
err := http.ListenAndServe(":8090", nil)
if err != nil {
log.Fatal("ListenAndServe: ", err.Error())
}
}
这样写是不是很容易理解?
但是如果后面还有这些需求:
我们需要提前为4个云商准备API Key
为了方便调试,我们的key通过环境变量传人, 然后通过vscode的test 进行测试:
但是这个测试用例是读取不掉对应的key的, 怎么办?
vscode提供的项目配置里面可以配置从文件中读取环境变量: .vscode/settings.json
{
"go.testEnvFile": "${workspaceFolder}/etc/cmdb-api.env"
}
这样我们执行go test的时候, vscode读取改配置文件里面的环境变量
下面是本项目测试用例使用到的环境变量设置: etc/cmdb-api.env
TX_CLOUD_SECRET_ID=xxx
TX_CLOUD_SECRET_KEY=xxx
AL_CLOUD_ACCESS_KEY=xx
AL_CLOUD_ACCESS_SECRET=xxx
HW_CLOUD_ACCESS_KEY=xxx
HW_CLOUD_ACCESS_SECRET=xxx
VS_HOST=xxx
VS_USERNAME=xxxx
VS_PASSWORD=xxx
最后测试下 我们可以写个用例测试下
我们使用阿里云官方SDK, 来操作我们在云上的资源
如何创建一个Client:
package main
import "github.com/aliyun/alibaba-cloud-sdk-go/sdk"
func main() {
client, err := sdk.NewClientWithAccessKey("REGION_ID", "ACCESS_KEY_ID", "ACCESS_KEY_SECRET")
if err != nil {
// Handle exceptions
panic(err)
}
}
然后我们利用client 查询云上资产
resp, err := client.DescribeInstances(req)
if err != nil {
return nil, err
}
func TestClient(t *testing.T) {
var secretID, secretKey string
if secretID = os.Getenv("AL_CLOUD_ACCESS_KEY"); secretID == "" {
t.Fatal("empty AL_CLOUD_ACCESS_KEY")
}
if secretKey = os.Getenv("AL_CLOUD_ACCESS_SECRET"); secretKey == "" {
t.Fatal("empty AL_CLOUD_ACCESS_SECRET")
}
client, err := sdk.NewClientWithAccessKey("cn-hangzhou", secretID, secretKey)
resp, err := client.DescribeInstances(req)
if err != nil {
t.Fatal(err)
}
封装2个模块, 方便后期可以扩展更多的资源操作模块:
func TestClient(t *testing.T) {
var secretID, secretKey string
if secretID = os.Getenv("AL_CLOUD_ACCESS_KEY"); secretID == "" {
t.Fatal("empty AL_CLOUD_ACCESS_KEY")
}
if secretKey = os.Getenv("AL_CLOUD_ACCESS_SECRET"); secretKey == "" {
t.Fatal("empty AL_CLOUD_ACCESS_SECRET")
}
client := connectivity.NewAliCloudClient(secretID, secretKey, "cn-hangzhou")
client.EcsClient()
}
type AliCloudClient struct {
Region string
AccessKey string
AccessSecret string
accountId string
ecsConn *ecs.Client
rdsConn *rds.Client
}
// EcsClient 客户端
func (c *AliCloudClient) EcsClient() (*ecs.Client, error) {
if c.ecsConn != nil {
return c.ecsConn, nil
}
client, err := ecs.NewClientWithAccessKey(c.Region, c.AccessKey, c.AccessSecret)
if err != nil {
return nil, err
}
c.ecsConn = client
return client, nil
}
通过封装一个Operater模块来与云商具体的资源进行交互
var (
operater *op.EcsOperater
)
func TestQuery(t *testing.T) {
resp := operater.Query()
fmt.Println(resp)
}
func init() {
var secretID, secretKey string
if secretID = os.Getenv("AL_CLOUD_ACCESS_KEY"); secretID == "" {
panic("empty AL_CLOUD_ACCESS_KEY")
}
if secretKey = os.Getenv("AL_CLOUD_ACCESS_SECRET"); secretKey == "" {
panic("empty AL_CLOUD_ACCESS_SECRET")
}
client := connectivity.NewAliCloudClient(secretID, secretKey, "cn-hangzhou")
ec, err := client.EcsClient()
if err != nil {
panic(err)
}
operater = op.NewEcsOperater(ec)
}
type EcsOperater struct {
client *ecs.Client
log logger.Logger
}
func (o *EcsOperater) Query(req *ecs.DescribeInstancesRequest) (*host.HostSet, error) {
set := host.NewHostSet()
resp, err := o.client.DescribeInstances(req)
if err != nil {
return nil, err
}
set.Total = int64(resp.TotalCount)
set.Items = o.transferSet(resp.Instances.Instance).Items
return set, nil
}
这里我们需要先了解云商返回的字段的含义, 然后合理转化成我们的Host对象
func (o *EcsOperater) transferSet(items []ecs.Instance) *host.HostSet {
set := host.NewHostSet()
for i := range items {
set.Add(o.transferOne(items[i]))
}
return set
}
func (o *EcsOperater) transferOne(ins ecs.Instance) *host.Host {
h := host.NewDefaultHost()
h.Base.Vendor = resource.VendorAliYun
h.Base.Region = ins.RegionId
h.Base.Zone = ins.ZoneId
h.Base.CreateAt = o.parseTime(ins.CreationTime)
h.Base.InstanceId = ins.InstanceId
h.Information.ExpireAt = o.parseTime(ins.ExpiredTime)
h.Information.Type = ins.InstanceType
h.Information.Name = ins.InstanceName
h.Information.Description = ins.Description
h.Information.Status = ins.Status
h.Information.Tags = o.transferTags(ins.Tags.Tag)
h.Information.PublicIP = ins.PublicIpAddress.IpAddress
h.Information.PrivateIP = ins.InnerIpAddress.IpAddress
h.Information.PayType = ins.InstanceChargeType
h.Describe.CPU = int64(ins.CPU)
h.Describe.Memory = int64(ins.Memory)
h.Describe.GPUAmount = ins.GPUAmount
h.Describe.GPUSpec = ins.GPUSpec
h.Describe.OSType = ins.OsType
h.Describe.OSName = ins.OSName
h.Describe.SerialNumber = ins.SerialNumber
h.Describe.ImageID = ins.ImageId
h.Describe.InternetMaxBandwidthOut = int64(ins.InternetMaxBandwidthOut)
h.Describe.InternetMaxBandwidthIn = int64(ins.InternetMaxBandwidthIn)
h.Describe.KeyPairName = []string{ins.KeyPairName}
h.Describe.SecurityGroups = ins.SecurityGroupIds.SecurityGroupId
return h
}
func (o *EcsOperater) transferTags(tags []ecs.Tag) map[string]string {
return nil
}
func (o *EcsOperater) parseTime(t string) int64 {
ts, err := time.Parse("2006-01-02T15:04Z", t)
if err != nil {
o.log.Errorf("parse time %s error, %s", t, err)
return 0
}
return ts.UnixNano() / 1000000
}
云商的资源, 对于分页的Pagesize都是有限制的, 可以读写我们请求的参数: DescribeInstancesRequest
PageNumber requests.Integer `position:"Query" name:"PageNumber"`
PageSize requests.Integer `position:"Query" name:"PageSize"`
req.PageSize = requests.NewInteger(pageSize)
type PagerResult struct {
Data *HostSet
Err error
HasNext bool
}
// 分页迭代器
type Pager interface {
Next() *PagerResult
}
type pager struct {
size int
number int
total int64
operater *EcsOperater
req *ecs.DescribeInstancesRequest
log logger.Logger
}
func (p *pager) Next() *host.PagerResult {
result := host.NewPagerResult()
resp, err := p.operater.Query(p.nextReq())
if err != nil {
result.Err = err
return result
}
p.total = int64(resp.Total)
result.Data = resp
result.HasNext = p.hasNext()
p.number++
return result
}
func (p *pager) WithLogger(log logger.Logger) {
p.log = log
}
func (p *pager) nextReq() *ecs.DescribeInstancesRequest {
p.log.Debugf("请求第%d页数据", p.number)
p.req.PageNumber = requests.NewInteger(p.number)
return p.req
}
func (p *pager) hasNext() bool {
return int64(p.number*p.size) < p.total
}
func newPager(pageSize int, operater *EcsOperater) *pager {
req := ecs.CreateDescribeInstancesRequest()
req.PageSize = requests.NewInteger(pageSize)
return &pager{
size: pageSize,
number: 1,
operater: operater,
req: req,
log: zap.L().Named("Pagger"),
tb: tokenbucket.NewBucket(time.Duration(1)*time.Second, 3),
}
}
func (o *EcsOperater) PageQuery() host.Pager {
return newPager(20, o)
}
算法描述:
我们使用之前的一个实现: tokenbucket
创建一个令牌桶:
// NewBucket returns a new token bucket that fills at the
// rate of one token every fillInterval, up to the given
// maximum capacity. Both arguments must be
// positive. The bucket is initially full.
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
return NewBucketWithClock(fillInterval, capacity, nil)
}
等待一个可用token的方法:
// Wait takes count tokens from the bucket, waiting until they are
// available.
func (tb *Bucket) Wait(count int64) {
if d := tb.Take(count); d > 0 {
tb.clock.Sleep(d)
}
}
为Pager添加速率限制:
func newPager(pageSize int, operater *EcsOperater) *pager {
req := ecs.CreateDescribeInstancesRequest()
req.PageSize = requests.NewInteger(pageSize)
return &pager{
...
tb: tokenbucket.NewBucket(time.Duration(1)*time.Second, 3),
}
}
type pager struct {
...
tb *tokenbucket.Bucket
}
next请求时 先获取token
func (p *pager) nextReq() *ecs.DescribeInstancesRequest {
// 等待一个可用token
p.tb.Wait(1)
p.log.Debugf("请求第%d页数据", p.number)
p.req.PageNumber = requests.NewInteger(p.number)
return p.req
}
最后验证是否生效
func newPager(pageSize int, operater *EcsOperater, rate int) *pager {
req := ecs.CreateDescribeInstancesRequest()
req.PageSize = requests.NewInteger(pageSize)
return &pager{
size: pageSize,
number: 1,
operater: operater,
req: req,
log: zap.L().Named("Pagger"),
tb: tokenbucket.NewBucket(time.Duration(rate)*time.Second, 1),
}
}
func NewPageQueryRequest() *PageQueryRequest {
return &PageQueryRequest{
Rate: 1,
}
}
type PageQueryRequest struct {
Rate int
}
func (o *EcsOperater) PageQuery(req *PageQueryRequest) host.Pager {
return newPager(20, o, req.Rate)
}
func TestQuery(t *testing.T) {
pager := operater.PageQuery(op.NewPageQueryRequest())
hasNext := true
for hasNext {
p := pager.Next()
hasNext = p.HasNext
fmt.Println(p.Data)
}
}
过程和阿里云一样 主要是要关注下数据转换
host表除了os_name采用utf8mb4字符集之外和demo比起来没有调整:
CREATE TABLE `host` (
`resource_id` varchar(64) NOT NULL,
`cpu` tinyint(4) NOT NULL,
`memory` int(13) NOT NULL,
`gpu_amount` tinyint(4) DEFAULT NULL,
`gpu_spec` varchar(255) DEFAULT NULL,
`os_type` varchar(255) DEFAULT NULL,
`os_name` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL,
`serial_number` varchar(120) DEFAULT NULL,
`image_id` char(64) DEFAULT NULL,
`internet_max_bandwidth_out` int(10) DEFAULT NULL,
`internet_max_bandwidth_in` int(10) DEFAULT NULL,
`key_pair_name` varchar(255) DEFAULT NULL,
`security_groups` varchar(255) DEFAULT NULL,
PRIMARY KEY (`resource_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=latin1
resource表和demo一样没有调整:
CREATE TABLE `resource` (
`id` char(64) CHARACTER SET latin1 NOT NULL,
`vendor` tinyint(1) NOT NULL,
`region` varchar(64) CHARACTER SET latin1 NOT NULL,
`zone` varchar(64) CHARACTER SET latin1 NOT NULL,
`create_at` bigint(13) NOT NULL,
`expire_at` bigint(13) DEFAULT NULL,
`category` varchar(64) CHARACTER SET latin1 NOT NULL,
`type` varchar(120) CHARACTER SET latin1 NOT NULL,
`instance_id` varchar(120) CHARACTER SET latin1 NOT NULL,
`name` varchar(255) NOT NULL,
`description` varchar(255) DEFAULT NULL,
`status` varchar(255) CHARACTER SET latin1 NOT NULL,
`update_at` bigint(13) DEFAULT NULL,
`sync_at` bigint(13) DEFAULT NULL,
`sync_accout` varchar(255) CHARACTER SET latin1 DEFAULT NULL,
`public_ip` varchar(64) CHARACTER SET latin1 DEFAULT NULL,
`private_ip` varchar(64) CHARACTER SET latin1 DEFAULT NULL,
`pay_type` varchar(255) CHARACTER SET latin1 DEFAULT NULL,
`describe_hash` varchar(255) NOT NULL,
`resource_hash` varchar(255) NOT NULL,
PRIMARY KEY (`id`) USING BTREE,
KEY `name` (`name`) USING BTREE,
KEY `status` (`status`) USING BTREE,
KEY `private_ip` (`public_ip`) USING BTREE,
KEY `public_ip` (`public_ip`) USING BTREE,
KEY `instance_id` (`instance_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
我们cmdb有一个核心功能叫做: 全局资源解索:
因此我们需要把基础数据独立出来, 专门准备一个Resource模块, 迁移之前host里面的关于resource的定义到:
const (
Unsuport Type = iota
HostResource
RdsResource
)
type Type int
const (
VendorAliYun Vendor = iota
VendorTencent
VendorHuaWei
VendorVsphere
)
type Vendor int
type Resource struct {
*Base
*Information
}
type Base struct {
Id string `json:"id"` // 全局唯一Id
SyncAt int64 `json:"sync_at"` // 同步时间
SecretID string `json:"secret_id"` // 用于同步的凭证ID
Vendor Vendor `json:"vendor"` // 厂商
ResourceType Type `json:"resource_type"` // 资源类型
Region string `json:"region"` // 地域
Zone string `json:"zone"` // 区域
CreateAt int64 `json:"create_at"` // 创建时间
InstanceId string `json:"instance_id"` // 实例ID
ResourceHash string `json:"resource_hash"` // 基础数据Hash
DescribeHash string `json:"describe_hash"` // 描述数据Hash
}
type Information struct {
ExpireAt int64 `json:"expire_at"` // 过期时间
Category string `json:"category"` // 种类
Type string `json:"type"` // 规格
Name string `json:"name"` // 名称
Description string `json:"description"` // 描述
Status string `json:"status"` // 服务商中的状态
Tags map[string]string `json:"tags"` // 标签
UpdateAt int64 `json:"update_at"` // 更新时间
SyncAccount string `json:"sync_accout"` // 同步的账号
PublicIP []string `json:"public_ip"` // 公网IP
PrivateIP []string `json:"private_ip"` // 内网IP
PayType string `json:"pay_type"` // 实例付费方式
}
定义 resource service接口
type Service interface {
Search(context.Context, *SearchRequest) (*ResourceSet, error)
}
type SearchRequest struct {
Vendor Vendor
ResourceType Type
}
type ResourceSet struct {
Items []*Resource `json:"items"`
Total int64 `json:"total"`
}
我们数据库直接使用字符串来存储, 这是反模式的, 你也可以设计独立的表来存储。
这里在入库和出库时,单独处理那些无法直接存储到数据库的数据结构:
func (i *Information) PrivateIPToString() string {
return strings.Join(i.PrivateIP, ",")
}
func (i *Information) PublicIPToString() string {
return strings.Join(i.PublicIP, ",")
}
func (i *Information) LoadPrivateIPString(s string) {
if s != "" {
i.PrivateIP = strings.Split(s, ",")
}
}
func (i *Information) LoadPublicIPString(s string) {
if s != "" {
i.PublicIP = strings.Split(s, ",")
}
}
存入:
// vendor h.Version.String()
_, err = stmt.Exec(
h.Id, h.Vendor, h.Region, h.Zone, h.CreateAt, h.ExpireAt, h.Category, h.Type, h.InstanceId,
h.Name, h.Description, h.Status, h.UpdateAt, h.SyncAt, h.SyncAccount, h.PublicIPToString(),
h.PrivateIPToString(), h.PayType, h.DescribeHash, h.ResourceHash,
)
读取
set := host.NewHostSet()
var (
publicIPList, privateIPList, keyPairNameList, securityGroupsList string
)
for rows.Next() {
ins := host.NewDefaultHost()
err := rows.Scan(
&ins.Id, &ins.Vendor, &ins.Region, &ins.Zone, &ins.CreateAt, &ins.ExpireAt,
&ins.Category, &ins.Type, &ins.InstanceId, &ins.Name, &ins.Description,
&ins.Status, &ins.UpdateAt, &ins.SyncAt, &ins.SyncAccount,
&publicIPList, &privateIPList, &ins.PayType, &ins.DescribeHash, &ins.ResourceHash, &ins.ResourceId,
&ins.CPU, &ins.Memory, &ins.GPUAmount, &ins.GPUSpec, &ins.OSType, &ins.OSName,
&ins.SerialNumber, &ins.ImageID, &ins.InternetMaxBandwidthOut, &ins.InternetMaxBandwidthIn,
&keyPairNameList, &securityGroupsList,
)
if err != nil {
return nil, exception.NewInternalServerError("query host error, %s", err.Error())
}
ins.LoadPrivateIPString(privateIPList)
ins.LoadPublicIPString(publicIPList)
ins.LoadKeyPairNameString(keyPairNameList)
ins.LoadSecurityGroupsString(securityGroupsList)
set.Add(ins)
}
通过Postman 测试我们重构后的Host模块, 确保其依然可以正常工作
到现在我们准备好了:
最直接的方法:
|----------| |-------|
| Provider | ---> | Host |
|----------| |-------|
那Provider的配置我们怎么读取?
配置文件?有多个账号怎么办?
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。