package main import ( "bytes" "database/sql" "encoding/json" "errors" "flag" "fmt" "io" "io/ioutil" "log" "net/http" "net/url" "os" "os/signal" "time" "github.com/shopspring/decimal" log2 "github.com/sirupsen/logrus" "gorm.io/driver/mysql" "gorm.io/gorm" "gorm.io/gorm/logger" "gorm.io/gorm/schema" ) var apiUrl, dsn, apiKey, apiSecret, token string var size, timeout, expired, executionTime, sleepTime int var timeFmt string = "2006-01-02 15:04:05" func main() { flag.StringVar(&apiUrl, "url", "https://test-dna.bitfactory.cn", "激活接口地址") flag.StringVar(&dsn, "dsn", "root:rootPassword@tcp(192.168.150.40:23306)/dna_test", "mysql连接地址") flag.StringVar(&apiKey, "apiKey", "", "apiKey") flag.StringVar(&apiSecret, "apiSecret", "", "apiSecret") flag.IntVar(&size, "size", 100, "批处理数量") flag.IntVar(&timeout, "timeout", 3, "请求超时时间") flag.IntVar(&expired, "expired", 3, "token有效时间,单位秒") flag.IntVar(&executionTime, "execution_time", 60, "mysql execution_time") flag.IntVar(&sleepTime, "sleep_time", 0, "sleep time") flag.Parse() if apiUrl == "" { log2.Error("apiUrl is required") return } if dsn == "" { log2.Error("dsn is required") return } if apiKey == "" { log2.Error("apiKey is required") return } if apiSecret == "" { log2.Error("apiSecret is required") return } if size < 1 || size > 100 { log2.Error("size must be between 1 and 100") return } if sleepTime < 0 { log2.Error("sleep_time must be greater than 0 ") return } utcZone := time.FixedZone("UTC", 0) time.Local = utcZone // 连接mysql newLogger := logger.New( log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer logger.Config{ SlowThreshold: time.Second, // 慢 SQL 阈值 LogLevel: logger.Silent, // Log level Colorful: false, // 禁用彩色打印 }, ) dsn := fmt.Sprintf("%s?charset=utf8&parseTime=True&loc=Local&time_zone=%s&max_execution_time=%d", dsn, url.QueryEscape("'UTC'"), executionTime) db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{Logger: newLogger, NamingStrategy: schema.NamingStrategy{ TablePrefix: "t_", SingularTable: false, }, }) if err != nil { log2.WithError(err).Error("init db failed") return } ticker := time.NewTicker(time.Duration(expired) * time.Second) quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt) go func() { for { select { case <-ticker.C: // 更新token var err error token, err = getToken() if err != nil { log2.WithError(err).Error("update token failed") return } case <-quit: // 接收到退出通知 停止定时器和退出 Goroutine ticker.Stop() close(quit) return } } }() // 查询要处理的数据的最大主键id var total sql.NullInt64 err = db.Model(&DnaNft{}).Select("Max(id)").Where("status", 2).Scan(&total).Error if err != nil { log2.WithError(err).Error("query nfts total number failed") return } if !total.Valid { log2.Info("there is no unverified data left") return } token, err = getToken() if err != nil { log2.WithError(err).Error("update token failed") return } var min, id int64 for id < total.Int64 { time.Sleep(time.Millisecond * time.Duration(sleepTime)) var list []DnaNft err = db.Model(&DnaNft{}).Where("status", 2).Where("id > ?", id).Limit(size).Order("id asc").Find(&list).Error if err != nil { log2.WithField("start", list[0].Id).WithField("end", list[len(list)-1].Id).WithError(err).Error("query not active nfts failed") continue } if len(list) == 0 { log2.Info("there is no unverified data left") break } err = db.Transaction(func(tx *gorm.DB) error { var nftReq []NFTReq // 更新本批NFT状态为已激活 result := tx.Model(&DnaNft{}).Where("status", 2).Where("id > ?", id).Limit(size).Order("id asc").Update("status", 1) if result.Error != nil { log2.WithField("start", list[0].Id).WithField("end", list[len(list)-1].Id).WithError(result.Error).Error("update nft status failed") return err } logger := log2.WithField("start", list[0].Id).WithField("end", list[len(list)-1].Id) // 生成批量数据 for j := 0; j < len(list); j++ { if list[j].Name == "" || list[j].Url == "" || list[j].DisplayUrl == "" || list[j].Hash == "" { log2.WithField("dna_nft_id", list[j].Id).Error("nft name/url/display_url/hash is required") result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{ "status": 3, // 更新状态为未认证 "err_msg": "nft name/url/display_url/hash is required", }) if result.Error != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed") return result.Error } continue } if len([]rune(list[j].Name)) > 50 { log2.WithField("dna_nft_id", list[j].Id).Error("nft name length illegal") result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{ "status": 3, // 更新状态为未认证 "err_msg": "nft name length illegal", }) if result.Error != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed") return result.Error } continue } if list[j].Price.LessThan(decimal.Zero) { //单价<0 log2.WithField("dna_nft_id", list[j].Id).Error("nft price illegal") result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{ "status": 3, // 更新状态为未认证 "err_msg": "nft price illegal", }) if result.Error != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed") return result.Error } continue } // 检查class_id是否认证 var dnaClass DnaClasse err = tx.Model(&DnaClasse{}).Where("class_id", list[j].ClassId).First(&dnaClass).Error if err != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(err).Error("query nft class status failed") result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{ "status": 2, // 更新状态为未认证 "err_msg": fmt.Sprintf("query nft class status failed: %s", err.Error()), }) if result.Error != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed") return result.Error } continue } if dnaClass.Status != 1 { // 类别未认证 log2.WithField("dna_nft_id", list[j].Id).WithField("class_id", list[j].ClassId).WithError(err).Error("nft class unauthorized") result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{ "status": 3, // 更新状态为未认证 "err_msg": "nft class unauthorized", }) if result.Error != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed") return result.Error } continue } // 获取bid var nftBid NftBid err = tx.Model(&NftBid{}).Where("nft_id", list[j].Id).Find(&nftBid).Error if err != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(err).Error("query nft bid failed") result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{ "status": 2, // 更新状态为未认证 "err_msg": fmt.Sprintf("query nft bid failed: %s", err.Error()), }) if result.Error != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed") return result.Error } continue } if nftBid.Id == 0 { // 查询可用bid err = tx.Model(&NftBid{}).Where("nft_id", 0).Order("id asc").First(&nftBid).Error if err != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(err).Error("query available bid failed") result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{ "status": 2, // 更新状态为未认证 "err_msg": fmt.Sprintf("query available bid failed: %s", err.Error()), }) if result.Error != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed") return result.Error } continue } // 更新关联 err = tx.Model(&NftBid{}).Where("id", nftBid.Id).Update("nft_id", list[j].Id).Error if err != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(err).Error("add nft bid relation failed") result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{ "status": 2, // 更新状态为未认证 "err_msg": fmt.Sprintf("add nft bid relation failed: %s", err.Error()), }) if result.Error != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed") return result.Error } continue } } // 查询owner var account Account err = tx.Model(&Account{}).Where("hex_address", list[j].Owner).First(&account).Error if err != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(err).Error("query nft owner account_id failed") result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{ "status": 3, // 更新状态为未认证 "err_msg": fmt.Sprintf("query nft owner account_id failed: %s", err.Error()), }) if result.Error != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed") return result.Error } continue } var userAccount UserAccount err = tx.Model(&UserAccount{}).Where("account_id", account.Id).First(&userAccount).Error if err != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(err).Error("query nft user account failed") result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{ "status": 3, // 更新状态为未认证 "err_msg": fmt.Sprintf("query nft user account failed: %s", err.Error()), }) if result.Error != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed") return result.Error } continue } var user ProjectUser err = tx.Model(&ProjectUser{}).Where("user_id", userAccount.UserId).Where("project_id", userAccount.ProjectId).First(&user).Error if err != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(err).Error("query nft owner failed") result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{ "status": 3, // 更新状态为未认证 "err_msg": fmt.Sprintf("query nft owner failed: %s", err.Error()), }) if result.Error != nil { log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed") return result.Error } continue } nftReq = append(nftReq, NFTReq{ SeriesId: list[j].ClassId, DnaName: list[j].Name, DnaNumber: fmt.Sprintf("%d", list[j].TokenId), TokenBid: nftBid.Bid, DnaDes: list[j].Description, DnaType: "0", //图片 Url: list[j].Url, DisplayUrl: list[j].DisplayUrl, Hash: list[j].Hash, MintTime: list[j].Timestamp.Add(8 * time.Hour).Format(timeFmt), Owner: user.Bid, DnaPrice: list[j].Price.InexactFloat64(), Extension: "", }) } // 批量认证 objJson, err := json.Marshal(&Req{ Data: nftReq, }) if err != nil { log2.WithError(err).Error("marshal request body failed") return err } payload := bytes.NewReader(objJson) url := fmt.Sprintf("%s/auth/api/v1/dna", apiUrl) req, err := http.NewRequest("POST", url, payload) if err != nil { log2.WithError(err).Error("build request failed") return err } req.Header.Add("Content-Type", "application/json") req.Header.Add("accessToken", token) client := &http.Client{Timeout: time.Duration(timeout) * time.Second} res, err := client.Do(req) if err != nil { logger.WithError(err).Error("request failed") return err } body, err := io.ReadAll(res.Body) if err != nil { logger.WithError(err).Error("read body failed") res.Body.Close() return err } res.Body.Close() var resp Resp err = json.Unmarshal(body, &resp) if err != nil { logger.WithError(err).Error("unmarshal response failed") return err } if resp.RetCode != http.StatusOK || resp.RetMsg != "ok" { logger.Error("active nfts failed: ", resp.RetMsg) return errors.New(resp.RetMsg) } return nil }) if err != nil { // 更新状态 result := db.Model(&DnaNft{}).Where("status", 2).Where("id > ?", id).Limit(size).Order("id asc").Updates(map[string]interface{}{ "status": 3, "err_msg": err.Error(), }) min = list[0].Id id = list[len(list)-1].Id if result.Error != nil { log2.WithField("start", list[0].Id).WithField("end", list[len(list)-1].Id).WithError(result.Error).Error("update nfts status failed") continue } log2.WithField("start", min).WithField("end", id).Info("本次处理区间") continue } min = list[0].Id id = list[len(list)-1].Id log2.WithField("start", min).WithField("end", id).Info("本次认证成功区间") } fmt.Println() log2.Info("========= finish ============") quit <- os.Interrupt <-quit } func getToken() (string, error) { url := fmt.Sprintf("%s/registration/api/v2/getToken?apiKey=%s&apiSecret=%s", apiUrl, apiKey, apiSecret) req, err := http.NewRequest("GET", url, nil) if err != nil { log2.WithError(err).Error("build request failed") return "", err } req.Header.Add("Content-Type", "application/json") client := &http.Client{Timeout: time.Duration(timeout) * time.Second} res, err := client.Do(req) if err != nil { log2.WithError(err).Error("request failed") return "", err } body, err := ioutil.ReadAll(res.Body) if err != nil { log2.WithError(err).Error("read body failed") return "", err } var resp Resp err = json.Unmarshal(body, &resp) if err != nil { log2.WithError(err).Error("unmarshal response failed") return "", err } if resp.RetCode != http.StatusOK || resp.RetMsg != "ok" { log2.Error(resp.RetMsg) return "", err } return resp.AccessToken, nil } type Req struct { Data []NFTReq `json:"data"` } type NFTReq struct { SeriesId string `json:"seriesId"` DnaName string `json:"dnaName"` DnaNumber string `json:"dnaNumber"` TokenBid string `json:"tokenBid"` DnaDes string `json:"dnaDes"` DnaType string `json:"dnaType"` Url string `json:"url"` DisplayUrl string `json:"displayUrl"` Hash string `json:"hash"` MintTime string `json:"mintTime"` Owner string `json:"owner"` DnaPrice float64 `json:"dnaPrice"` Extension string `json:"extension"` } type Resp struct { RetCode int `json:"retCode"` RetMsg string `json:"retMsg"` AccessToken string `json:"accessToken"` } type NftBid struct { Id uint64 `gorm:"column:id;type:bigint(20) unsigned;primary_key;AUTO_INCREMENT;comment:主键id" json:"id"` Bid string `gorm:"column:bid;type:varchar(60);comment:星火bid;NOT NULL" json:"bid"` NftId int64 `gorm:"column:nft_id;type:bigint(20);default:0;comment:关联文昌链极速网的[ t_dna_nfts ]表主键id"` CreatedAt time.Time `gorm:"<-:false"` UpdatedAt time.Time `gorm:"<-:false"` } // DNA NFT认证表 type DnaNft struct { Id int64 `gorm:"column:id;type:bigint(20) unsigned;primary_key;AUTO_INCREMENT" json:"id"` ProjectId uint64 `gorm:"column:project_id;type:bigint(20) unsigned;comment:项目 ID;NOT NULL" json:"project_id"` ClassId string `gorm:"column:class_id;type:char(42);comment:类别ID;NOT NULL" json:"class_id"` TokenId int64 `gorm:"column:token_id;type:bigint(20);default:0;comment:NFT ID;NOT NULL" json:"token_id"` Owner string `gorm:"column:owner;type:char(42);comment:资产拥有者;NOT NULL" json:"owner"` Name string `gorm:"column:name;type:varchar(100);comment:资产名称;NOT NULL" json:"name"` Url string `gorm:"column:url;type:varchar(200);comment:图片url;NOT NULL" json:"url"` DisplayUrl string `gorm:"column:display_url;type:varchar(200);comment:缩略图url;NOT NULL" json:"display_url"` Hash string `gorm:"column:hash;type:varchar(100);comment:图片哈希;NOT NULL" json:"hash"` Price decimal.Decimal `gorm:"column:price;type:decimal(10,2);default:0.00;comment:发行加价格;NOT NULL" json:"price"` Description string `gorm:"column:description;type:varchar(255);comment:资产描述;NOT NULL" json:"description"` Status int `gorm:"column:status;type:tinyint(4);default:2;comment:dna认证状态 1:已认证 2:未认证;NOT NULL" json:"status"` Timestamp time.Time `gorm:"column:timestamp;type:datetime;comment:交易上链时间" json:"timestamp"` TxId int64 `gorm:"column:tx_id;type:bigint(20);default:0;comment:交易表主键id;NOT NULL" json:"tx_id"` CreatedAt time.Time `gorm:"<-:false"` UpdatedAt time.Time `gorm:"<-:false"` } // 项目和用户关联表 type ProjectUser struct { Id uint64 `gorm:"column:id;type:bigint(20) unsigned;primary_key;AUTO_INCREMENT;comment:ID" json:"id"` ProjectId uint64 `gorm:"column:project_id;type:bigint(20) unsigned;comment:项目 ID;NOT NULL" json:"project_id"` UserId uint64 `gorm:"column:user_id;type:bigint(20) unsigned;comment:用户ID ID;NOT NULL" json:"user_id"` Bid string `gorm:"column:bid;type:varchar(100);comment:bid" json:"bid"` } // DNA类别认证表 type DnaClasse struct { Id int64 `gorm:"column:id;type:bigint(20) unsigned;primary_key;AUTO_INCREMENT;comment:主键id" json:"id"` ClassId string `gorm:"column:class_id;type:char(42);comment:类别ID;NOT NULL" json:"class_id"` Name string `gorm:"column:name;type:varchar(100);comment:类别名称;NOT NULL" json:"name"` Url string `gorm:"column:url;type:varchar(100);comment:类别url;NOT NULL" json:"url"` Description string `gorm:"column:description;type:varchar(200);comment:类别描述;NOT NULL" json:"description"` Status int `gorm:"column:status;type:tinyint(4);default:2;comment:dna认证状态 1:已认证 2:未认证;NOT NULL" json:"status"` CollectIssuer string `gorm:"column:collect_issuer;type:varchar(50);comment:集合发行方;NOT NULL" json:"collect_issuer"` CreatedAt time.Time `gorm:"<-:false"` UpdatedAt time.Time `gorm:"<-:false"` } // 链账户表 type Account struct { Id uint64 `gorm:"column:id;type:bigint(20) unsigned;primary_key;AUTO_INCREMENT;comment:ID" json:"id"` ProjectId uint64 `gorm:"column:project_id;type:bigint(20) unsigned;default:0;comment:项目 ID;NOT NULL" json:"project_id"` NativeAddress string `gorm:"column:native_address;type:char(42);comment:原生账户地址;NOT NULL" json:"native_address"` HexAddress string `gorm:"column:hex_address;type:char(46);comment:地址" json:"hex_address"` } // 用户和链账户关联表 type UserAccount struct { Id uint64 `gorm:"column:id;type:bigint(20) unsigned;primary_key;AUTO_INCREMENT;comment:ID" json:"id"` ProjectId uint64 `gorm:"column:project_id;type:bigint(20) unsigned;comment:用户 ID;NOT NULL" json:"project_id"` UserId uint64 `gorm:"column:user_id;type:bigint(20) unsigned;comment:用户 ID;NOT NULL" json:"user_id"` AccountId uint64 `gorm:"column:account_id;type:bigint(20) unsigned;comment:链账户 ID;NOT NULL" json:"account_id"` }