| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529 |
- package main
- import (
- "bytes"
- "database/sql"
- "encoding/json"
- "errors"
- "flag"
- "fmt"
- "io"
- "io/ioutil"
- "log"
- "net/http"
- "net/url"
- "os"
- "os/signal"
- "time"
- "github.com/shopspring/decimal"
- log2 "github.com/sirupsen/logrus"
- "gorm.io/driver/mysql"
- "gorm.io/gorm"
- "gorm.io/gorm/logger"
- "gorm.io/gorm/schema"
- )
- var apiUrl, dsn, apiKey, apiSecret, token string
- var size, timeout, expired, executionTime, sleepTime int
- var timeFmt string = "2006-01-02 15:04:05"
- func main() {
- flag.StringVar(&apiUrl, "url", "https://test-dna.bitfactory.cn", "激活接口地址")
- flag.StringVar(&dsn, "dsn", "root:rootPassword@tcp(192.168.150.40:23306)/dna_test", "mysql连接地址")
- flag.StringVar(&apiKey, "apiKey", "", "apiKey")
- flag.StringVar(&apiSecret, "apiSecret", "", "apiSecret")
- flag.IntVar(&size, "size", 100, "批处理数量")
- flag.IntVar(&timeout, "timeout", 3, "请求超时时间")
- flag.IntVar(&expired, "expired", 3, "token有效时间,单位秒")
- flag.IntVar(&executionTime, "execution_time", 60, "mysql execution_time")
- flag.IntVar(&sleepTime, "sleep_time", 0, "sleep time")
- flag.Parse()
- if apiUrl == "" {
- log2.Error("apiUrl is required")
- return
- }
- if dsn == "" {
- log2.Error("dsn is required")
- return
- }
- if apiKey == "" {
- log2.Error("apiKey is required")
- return
- }
- if apiSecret == "" {
- log2.Error("apiSecret is required")
- return
- }
- if size < 1 || size > 100 {
- log2.Error("size must be between 1 and 100")
- return
- }
- if sleepTime < 0 {
- log2.Error("sleep_time must be greater than 0 ")
- return
- }
- utcZone := time.FixedZone("UTC", 0)
- time.Local = utcZone
- // 连接mysql
- newLogger := logger.New(
- log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer
- logger.Config{
- SlowThreshold: time.Second, // 慢 SQL 阈值
- LogLevel: logger.Silent, // Log level
- Colorful: false, // 禁用彩色打印
- },
- )
- dsn := fmt.Sprintf("%s?charset=utf8&parseTime=True&loc=Local&time_zone=%s&max_execution_time=%d", dsn, url.QueryEscape("'UTC'"), executionTime)
- db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{Logger: newLogger,
- NamingStrategy: schema.NamingStrategy{
- TablePrefix: "t_",
- SingularTable: false,
- },
- })
- if err != nil {
- log2.WithError(err).Error("init db failed")
- return
- }
- ticker := time.NewTicker(time.Duration(expired) * time.Second)
- quit := make(chan os.Signal, 1)
- signal.Notify(quit, os.Interrupt)
- go func() {
- for {
- select {
- case <-ticker.C:
- // 更新token
- var err error
- token, err = getToken()
- if err != nil {
- log2.WithError(err).Error("update token failed")
- return
- }
- case <-quit:
- // 接收到退出通知 停止定时器和退出 Goroutine
- ticker.Stop()
- close(quit)
- return
- }
- }
- }()
- // 查询要处理的数据的最大主键id
- var total sql.NullInt64
- err = db.Model(&DnaNft{}).Select("Max(id)").Where("status", 2).Scan(&total).Error
- if err != nil {
- log2.WithError(err).Error("query nfts total number failed")
- return
- }
- if !total.Valid {
- log2.Info("there is no unverified data left")
- return
- }
- token, err = getToken()
- if err != nil {
- log2.WithError(err).Error("update token failed")
- return
- }
- var min, id int64
- for id < total.Int64 {
- time.Sleep(time.Millisecond * time.Duration(sleepTime))
- var list []DnaNft
- err = db.Model(&DnaNft{}).Where("status", 2).Where("id > ?", id).Limit(size).Order("id asc").Find(&list).Error
- if err != nil {
- log2.WithField("start", list[0].Id).WithField("end", list[len(list)-1].Id).WithError(err).Error("query not active nfts failed")
- continue
- }
- if len(list) == 0 {
- log2.Info("there is no unverified data left")
- break
- }
- err = db.Transaction(func(tx *gorm.DB) error {
- var nftReq []NFTReq
- // 更新本批NFT状态为已激活
- result := tx.Model(&DnaNft{}).Where("status", 2).Where("id > ?", id).Limit(size).Order("id asc").Update("status", 1)
- if result.Error != nil {
- log2.WithField("start", list[0].Id).WithField("end", list[len(list)-1].Id).WithError(result.Error).Error("update nft status failed")
- return err
- }
- logger := log2.WithField("start", list[0].Id).WithField("end", list[len(list)-1].Id)
- // 生成批量数据
- for j := 0; j < len(list); j++ {
- if list[j].Name == "" || list[j].Url == "" || list[j].DisplayUrl == "" || list[j].Hash == "" {
- log2.WithField("dna_nft_id", list[j].Id).Error("nft name/url/display_url/hash is required")
- result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{
- "status": 3, // 更新状态为未认证
- "err_msg": "nft name/url/display_url/hash is required",
- })
- if result.Error != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed")
- return result.Error
- }
- continue
- }
- if len([]rune(list[j].Name)) > 50 {
- log2.WithField("dna_nft_id", list[j].Id).Error("nft name length illegal")
- result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{
- "status": 3, // 更新状态为未认证
- "err_msg": "nft name length illegal",
- })
- if result.Error != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed")
- return result.Error
- }
- continue
- }
- if list[j].Price.LessThan(decimal.Zero) { //单价<0
- log2.WithField("dna_nft_id", list[j].Id).Error("nft price illegal")
- result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{
- "status": 3, // 更新状态为未认证
- "err_msg": "nft price illegal",
- })
- if result.Error != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed")
- return result.Error
- }
- continue
- }
- // 检查class_id是否认证
- var dnaClass DnaClasse
- err = tx.Model(&DnaClasse{}).Where("class_id", list[j].ClassId).First(&dnaClass).Error
- if err != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(err).Error("query nft class status failed")
- result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{
- "status": 2, // 更新状态为未认证
- "err_msg": fmt.Sprintf("query nft class status failed: %s", err.Error()),
- })
- if result.Error != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed")
- return result.Error
- }
- continue
- }
- if dnaClass.Status != 1 { // 类别未认证
- log2.WithField("dna_nft_id", list[j].Id).WithField("class_id", list[j].ClassId).WithError(err).Error("nft class unauthorized")
- result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{
- "status": 3, // 更新状态为未认证
- "err_msg": "nft class unauthorized",
- })
- if result.Error != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed")
- return result.Error
- }
- continue
- }
- // 获取bid
- var nftBid NftBid
- err = tx.Model(&NftBid{}).Where("nft_id", list[j].Id).Find(&nftBid).Error
- if err != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(err).Error("query nft bid failed")
- result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{
- "status": 2, // 更新状态为未认证
- "err_msg": fmt.Sprintf("query nft bid failed: %s", err.Error()),
- })
- if result.Error != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed")
- return result.Error
- }
- continue
- }
- if nftBid.Id == 0 {
- // 查询可用bid
- err = tx.Model(&NftBid{}).Where("nft_id", 0).Order("id asc").First(&nftBid).Error
- if err != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(err).Error("query available bid failed")
- result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{
- "status": 2, // 更新状态为未认证
- "err_msg": fmt.Sprintf("query available bid failed: %s", err.Error()),
- })
- if result.Error != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed")
- return result.Error
- }
- continue
- }
- // 更新关联
- err = tx.Model(&NftBid{}).Where("id", nftBid.Id).Update("nft_id", list[j].Id).Error
- if err != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(err).Error("add nft bid relation failed")
- result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{
- "status": 2, // 更新状态为未认证
- "err_msg": fmt.Sprintf("add nft bid relation failed: %s", err.Error()),
- })
- if result.Error != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed")
- return result.Error
- }
- continue
- }
- }
- // 查询owner
- var account Account
- err = tx.Model(&Account{}).Where("hex_address", list[j].Owner).First(&account).Error
- if err != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(err).Error("query nft owner account_id failed")
- result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{
- "status": 3, // 更新状态为未认证
- "err_msg": fmt.Sprintf("query nft owner account_id failed: %s", err.Error()),
- })
- if result.Error != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed")
- return result.Error
- }
- continue
- }
- var userAccount UserAccount
- err = tx.Model(&UserAccount{}).Where("account_id", account.Id).First(&userAccount).Error
- if err != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(err).Error("query nft user account failed")
- result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{
- "status": 3, // 更新状态为未认证
- "err_msg": fmt.Sprintf("query nft user account failed: %s", err.Error()),
- })
- if result.Error != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed")
- return result.Error
- }
- continue
- }
- var user ProjectUser
- err = tx.Model(&ProjectUser{}).Where("user_id", userAccount.UserId).Where("project_id", userAccount.ProjectId).First(&user).Error
- if err != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(err).Error("query nft owner failed")
- result := tx.Model(&DnaNft{}).Where("id", list[j].Id).Updates(map[string]interface{}{
- "status": 3, // 更新状态为未认证
- "err_msg": fmt.Sprintf("query nft owner failed: %s", err.Error()),
- })
- if result.Error != nil {
- log2.WithField("dna_nft_id", list[j].Id).WithError(result.Error).Error("update nft status failed")
- return result.Error
- }
- continue
- }
- nftReq = append(nftReq, NFTReq{
- SeriesId: list[j].ClassId,
- DnaName: list[j].Name,
- DnaNumber: fmt.Sprintf("%d", list[j].TokenId),
- TokenBid: nftBid.Bid,
- DnaDes: list[j].Description,
- DnaType: "0", //图片
- Url: list[j].Url,
- DisplayUrl: list[j].DisplayUrl,
- Hash: list[j].Hash,
- MintTime: list[j].Timestamp.Add(8 * time.Hour).Format(timeFmt),
- Owner: user.Bid,
- DnaPrice: list[j].Price.InexactFloat64(),
- Extension: "",
- })
- }
- // 批量认证
- objJson, err := json.Marshal(&Req{
- Data: nftReq,
- })
- if err != nil {
- log2.WithError(err).Error("marshal request body failed")
- return err
- }
- payload := bytes.NewReader(objJson)
- url := fmt.Sprintf("%s/auth/api/v1/dna", apiUrl)
- req, err := http.NewRequest("POST", url, payload)
- if err != nil {
- log2.WithError(err).Error("build request failed")
- return err
- }
- req.Header.Add("Content-Type", "application/json")
- req.Header.Add("accessToken", token)
- client := &http.Client{Timeout: time.Duration(timeout) * time.Second}
- res, err := client.Do(req)
- if err != nil {
- logger.WithError(err).Error("request failed")
- return err
- }
- body, err := io.ReadAll(res.Body)
- if err != nil {
- logger.WithError(err).Error("read body failed")
- res.Body.Close()
- return err
- }
- res.Body.Close()
- var resp Resp
- err = json.Unmarshal(body, &resp)
- if err != nil {
- logger.WithError(err).Error("unmarshal response failed")
- return err
- }
- if resp.RetCode != http.StatusOK || resp.RetMsg != "ok" {
- logger.Error("active nfts failed: ", resp.RetMsg)
- return errors.New(resp.RetMsg)
- }
- return nil
- })
- if err != nil {
- // 更新状态
- result := db.Model(&DnaNft{}).Where("status", 2).Where("id > ?", id).Limit(size).Order("id asc").Updates(map[string]interface{}{
- "status": 3,
- "err_msg": err.Error(),
- })
- min = list[0].Id
- id = list[len(list)-1].Id
- if result.Error != nil {
- log2.WithField("start", list[0].Id).WithField("end", list[len(list)-1].Id).WithError(result.Error).Error("update nfts status failed")
- continue
- }
- log2.WithField("start", min).WithField("end", id).Info("本次处理区间")
- continue
- }
- min = list[0].Id
- id = list[len(list)-1].Id
- log2.WithField("start", min).WithField("end", id).Info("本次认证成功区间")
- }
- fmt.Println()
- log2.Info("========= finish ============")
- quit <- os.Interrupt
- <-quit
- }
- func getToken() (string, error) {
- url := fmt.Sprintf("%s/registration/api/v2/getToken?apiKey=%s&apiSecret=%s", apiUrl, apiKey, apiSecret)
- req, err := http.NewRequest("GET", url, nil)
- if err != nil {
- log2.WithError(err).Error("build request failed")
- return "", err
- }
- req.Header.Add("Content-Type", "application/json")
- client := &http.Client{Timeout: time.Duration(timeout) * time.Second}
- res, err := client.Do(req)
- if err != nil {
- log2.WithError(err).Error("request failed")
- return "", err
- }
- body, err := ioutil.ReadAll(res.Body)
- if err != nil {
- log2.WithError(err).Error("read body failed")
- return "", err
- }
- var resp Resp
- err = json.Unmarshal(body, &resp)
- if err != nil {
- log2.WithError(err).Error("unmarshal response failed")
- return "", err
- }
- if resp.RetCode != http.StatusOK || resp.RetMsg != "ok" {
- log2.Error(resp.RetMsg)
- return "", err
- }
- return resp.AccessToken, nil
- }
- type Req struct {
- Data []NFTReq `json:"data"`
- }
- type NFTReq struct {
- SeriesId string `json:"seriesId"`
- DnaName string `json:"dnaName"`
- DnaNumber string `json:"dnaNumber"`
- TokenBid string `json:"tokenBid"`
- DnaDes string `json:"dnaDes"`
- DnaType string `json:"dnaType"`
- Url string `json:"url"`
- DisplayUrl string `json:"displayUrl"`
- Hash string `json:"hash"`
- MintTime string `json:"mintTime"`
- Owner string `json:"owner"`
- DnaPrice float64 `json:"dnaPrice"`
- Extension string `json:"extension"`
- }
- type Resp struct {
- RetCode int `json:"retCode"`
- RetMsg string `json:"retMsg"`
- AccessToken string `json:"accessToken"`
- }
- type NftBid struct {
- Id uint64 `gorm:"column:id;type:bigint(20) unsigned;primary_key;AUTO_INCREMENT;comment:主键id" json:"id"`
- Bid string `gorm:"column:bid;type:varchar(60);comment:星火bid;NOT NULL" json:"bid"`
- NftId int64 `gorm:"column:nft_id;type:bigint(20);default:0;comment:关联文昌链极速网的[ t_dna_nfts ]表主键id"`
- CreatedAt time.Time `gorm:"<-:false"`
- UpdatedAt time.Time `gorm:"<-:false"`
- }
- // DNA NFT认证表
- type DnaNft struct {
- Id int64 `gorm:"column:id;type:bigint(20) unsigned;primary_key;AUTO_INCREMENT" json:"id"`
- ProjectId uint64 `gorm:"column:project_id;type:bigint(20) unsigned;comment:项目 ID;NOT NULL" json:"project_id"`
- ClassId string `gorm:"column:class_id;type:char(42);comment:类别ID;NOT NULL" json:"class_id"`
- TokenId int64 `gorm:"column:token_id;type:bigint(20);default:0;comment:NFT ID;NOT NULL" json:"token_id"`
- Owner string `gorm:"column:owner;type:char(42);comment:资产拥有者;NOT NULL" json:"owner"`
- Name string `gorm:"column:name;type:varchar(100);comment:资产名称;NOT NULL" json:"name"`
- Url string `gorm:"column:url;type:varchar(200);comment:图片url;NOT NULL" json:"url"`
- DisplayUrl string `gorm:"column:display_url;type:varchar(200);comment:缩略图url;NOT NULL" json:"display_url"`
- Hash string `gorm:"column:hash;type:varchar(100);comment:图片哈希;NOT NULL" json:"hash"`
- Price decimal.Decimal `gorm:"column:price;type:decimal(10,2);default:0.00;comment:发行加价格;NOT NULL" json:"price"`
- Description string `gorm:"column:description;type:varchar(255);comment:资产描述;NOT NULL" json:"description"`
- Status int `gorm:"column:status;type:tinyint(4);default:2;comment:dna认证状态 1:已认证 2:未认证;NOT NULL" json:"status"`
- Timestamp time.Time `gorm:"column:timestamp;type:datetime;comment:交易上链时间" json:"timestamp"`
- TxId int64 `gorm:"column:tx_id;type:bigint(20);default:0;comment:交易表主键id;NOT NULL" json:"tx_id"`
- CreatedAt time.Time `gorm:"<-:false"`
- UpdatedAt time.Time `gorm:"<-:false"`
- }
- // 项目和用户关联表
- type ProjectUser struct {
- Id uint64 `gorm:"column:id;type:bigint(20) unsigned;primary_key;AUTO_INCREMENT;comment:ID" json:"id"`
- ProjectId uint64 `gorm:"column:project_id;type:bigint(20) unsigned;comment:项目 ID;NOT NULL" json:"project_id"`
- UserId uint64 `gorm:"column:user_id;type:bigint(20) unsigned;comment:用户ID ID;NOT NULL" json:"user_id"`
- Bid string `gorm:"column:bid;type:varchar(100);comment:bid" json:"bid"`
- }
- // DNA类别认证表
- type DnaClasse struct {
- Id int64 `gorm:"column:id;type:bigint(20) unsigned;primary_key;AUTO_INCREMENT;comment:主键id" json:"id"`
- ClassId string `gorm:"column:class_id;type:char(42);comment:类别ID;NOT NULL" json:"class_id"`
- Name string `gorm:"column:name;type:varchar(100);comment:类别名称;NOT NULL" json:"name"`
- Url string `gorm:"column:url;type:varchar(100);comment:类别url;NOT NULL" json:"url"`
- Description string `gorm:"column:description;type:varchar(200);comment:类别描述;NOT NULL" json:"description"`
- Status int `gorm:"column:status;type:tinyint(4);default:2;comment:dna认证状态 1:已认证 2:未认证;NOT NULL" json:"status"`
- CollectIssuer string `gorm:"column:collect_issuer;type:varchar(50);comment:集合发行方;NOT NULL" json:"collect_issuer"`
- CreatedAt time.Time `gorm:"<-:false"`
- UpdatedAt time.Time `gorm:"<-:false"`
- }
- // 链账户表
- type Account struct {
- Id uint64 `gorm:"column:id;type:bigint(20) unsigned;primary_key;AUTO_INCREMENT;comment:ID" json:"id"`
- ProjectId uint64 `gorm:"column:project_id;type:bigint(20) unsigned;default:0;comment:项目 ID;NOT NULL" json:"project_id"`
- NativeAddress string `gorm:"column:native_address;type:char(42);comment:原生账户地址;NOT NULL" json:"native_address"`
- HexAddress string `gorm:"column:hex_address;type:char(46);comment:地址" json:"hex_address"`
- }
- // 用户和链账户关联表
- type UserAccount struct {
- Id uint64 `gorm:"column:id;type:bigint(20) unsigned;primary_key;AUTO_INCREMENT;comment:ID" json:"id"`
- ProjectId uint64 `gorm:"column:project_id;type:bigint(20) unsigned;comment:用户 ID;NOT NULL" json:"project_id"`
- UserId uint64 `gorm:"column:user_id;type:bigint(20) unsigned;comment:用户 ID;NOT NULL" json:"user_id"`
- AccountId uint64 `gorm:"column:account_id;type:bigint(20) unsigned;comment:链账户 ID;NOT NULL" json:"account_id"`
- }
|