class.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. package main
  2. import (
  3. "bytes"
  4. "database/sql"
  5. "encoding/json"
  6. "errors"
  7. "flag"
  8. "fmt"
  9. "io"
  10. "io/ioutil"
  11. "log"
  12. "net/http"
  13. "net/url"
  14. "os"
  15. "os/signal"
  16. "time"
  17. log2 "github.com/sirupsen/logrus"
  18. "gorm.io/driver/mysql"
  19. "gorm.io/gorm"
  20. "gorm.io/gorm/logger"
  21. "gorm.io/gorm/schema"
  22. )
  23. var apiUrl, dsn, contracts, apiKey, apiSecret, token string
  24. var size, timeout, expired, executionTime, sleepTime int
  25. func main() {
  26. flag.StringVar(&apiUrl, "url", "https://test-dna.bitfactory.cn", "激活接口地址")
  27. flag.StringVar(&dsn, "dsn", "root:rootPassword@tcp(192.168.150.40:23306)/dna_test", "mysql连接地址")
  28. flag.StringVar(&contracts, "contract", "", "合约地址")
  29. flag.StringVar(&apiKey, "apiKey", "", "apiKey")
  30. flag.StringVar(&apiSecret, "apiSecret", "", "apiSecret")
  31. flag.IntVar(&size, "size", 1, "批处理数量")
  32. flag.IntVar(&timeout, "timeout", 3, "请求超时时间")
  33. flag.IntVar(&expired, "expired", 6000, "token有效时间,单位秒")
  34. flag.IntVar(&executionTime, "execution_time", 60, "mysql execution_time")
  35. flag.IntVar(&sleepTime, "sleep_time", 0, "sleep time")
  36. flag.Parse()
  37. if apiUrl == "" {
  38. log2.Error("apiUrl is required")
  39. return
  40. }
  41. if dsn == "" {
  42. log2.Error("dsn is required")
  43. return
  44. }
  45. if contracts == "" {
  46. log2.Error("contract is required")
  47. return
  48. }
  49. if apiKey == "" {
  50. log2.Error("apiKey is required")
  51. return
  52. }
  53. if apiSecret == "" {
  54. log2.Error("apiSecret is required")
  55. return
  56. }
  57. if size < 1 || size > 100 {
  58. log2.Error("size must be between 1 and 100")
  59. return
  60. }
  61. if sleepTime < 0 {
  62. log2.Error("sleep_time must be greater than 0 ")
  63. return
  64. }
  65. utcZone := time.FixedZone("UTC", 0)
  66. time.Local = utcZone
  67. // 连接mysql
  68. newLogger := logger.New(
  69. log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer
  70. logger.Config{
  71. SlowThreshold: time.Second, // 慢 SQL 阈值
  72. LogLevel: logger.Silent, // Log level
  73. Colorful: false, // 禁用彩色打印
  74. },
  75. )
  76. dsn := fmt.Sprintf("%s?charset=utf8&parseTime=True&loc=Local&time_zone=%s&max_execution_time=%d", dsn, url.QueryEscape("'UTC'"), executionTime)
  77. db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{Logger: newLogger,
  78. NamingStrategy: schema.NamingStrategy{
  79. TablePrefix: "t_",
  80. SingularTable: false,
  81. },
  82. })
  83. if err != nil {
  84. log2.WithError(err).Error("init db failed")
  85. return
  86. }
  87. ticker := time.NewTicker(time.Duration(expired) * time.Second)
  88. quit := make(chan os.Signal, 1)
  89. signal.Notify(quit, os.Interrupt)
  90. go func() {
  91. for {
  92. select {
  93. case <-ticker.C:
  94. // 更新token
  95. var err error
  96. token, err = getToken()
  97. if err != nil {
  98. log2.WithError(err).Error("update token failed")
  99. return
  100. }
  101. case <-quit:
  102. // 接收到退出通知 停止定时器和退出 Goroutine
  103. ticker.Stop()
  104. close(quit)
  105. return
  106. }
  107. }
  108. }()
  109. // 查询要处理的数据的最大主键id
  110. var total sql.NullInt64
  111. err = db.Model(&DnaClasse{}).Select("Max(id)").Where("status", 2).Scan(&total).Error
  112. if err != nil {
  113. log2.WithError(err).Error("query classes total number failed")
  114. return
  115. }
  116. if !total.Valid {
  117. log2.Error("there is no unverified data left")
  118. return
  119. }
  120. token, err = getToken()
  121. if err != nil {
  122. log2.WithError(err).Error("update token failed")
  123. return
  124. }
  125. var id int64
  126. for id < total.Int64 {
  127. time.Sleep(time.Millisecond * time.Duration(sleepTime))
  128. var list []DnaClasse
  129. err = db.Model(&DnaClasse{}).Where("status", 2).Where("id > ?", id).Limit(size).Order("id asc").Find(&list).Error
  130. if err != nil {
  131. log2.WithError(err).Error("query not active classes failed")
  132. continue
  133. }
  134. if len(list) == 0 {
  135. log2.Info("there is no unverified data left")
  136. break
  137. }
  138. sid := list[0].Id
  139. eid := list[len(list)-1].Id
  140. err = db.Transaction(func(tx *gorm.DB) error {
  141. var classReq []ClassReq
  142. // 更新本批类别状态为已激活
  143. result := tx.Model(&DnaClasse{}).Where("status", 2).Where("id > ?", id).Limit(size).Order("id asc").Update("status", 1)
  144. if result.Error != nil {
  145. log2.WithField("start", sid).WithField("end", eid).WithError(result.Error).Error("update classes status failed")
  146. return err
  147. }
  148. logger := log2.WithField("start", sid).WithField("end", eid)
  149. // 生成批量数据
  150. for j := 0; j < len(list); j++ {
  151. // 查询类别下nft总数
  152. var nftNumber int64
  153. err = tx.Model(&DnaNft{}).Where("class_id", list[j].ClassId).Where("status in (1,2,3)").Count(&nftNumber).Error
  154. if err != nil {
  155. log2.WithField("id", list[j].Id).WithError(err).Error("query classes total nft number failed")
  156. result := tx.Model(&DnaClasse{}).Where("id", list[j].Id).Updates(map[string]interface{}{
  157. "status": 2, // 更新状态为未认证
  158. "err_msg": fmt.Sprintf("query classes total nft number failed: %s", err.Error()),
  159. })
  160. if result.Error != nil {
  161. log2.WithField("id", list[j].Id).WithError(result.Error).Error("update classes status failed")
  162. return result.Error
  163. }
  164. continue
  165. }
  166. if list[j].Name == "" || list[j].CollectIssuer == "" || list[j].Url == "" {
  167. log2.WithField("id", list[j].Id).Error("classes name/collect_issuer/url is required")
  168. result = tx.Model(&DnaClasse{}).Where("id", list[j].Id).Updates(map[string]interface{}{
  169. "status": 3, // 更新状态为认证失败
  170. "err_msg": "classes name/collect_issuer/url is required",
  171. })
  172. if result.Error != nil {
  173. log2.WithField("id", list[j].Id).WithError(result.Error).Error("update classes status failed")
  174. return result.Error // 数据库错误 回滚
  175. }
  176. continue
  177. }
  178. if len([]rune(list[j].Name)) > 50 {
  179. log2.WithField("id", list[j].Id).Error("class name length illegal")
  180. result := tx.Model(&DnaClasse{}).Where("id", list[j].Id).Updates(map[string]interface{}{
  181. "status": 3, // 更新状态为未认证
  182. "err_msg": "class name length illegal",
  183. })
  184. if result.Error != nil {
  185. log2.WithField("id", list[j].Id).WithError(result.Error).Error("update classes status failed")
  186. return result.Error // 数据库错误 回滚
  187. }
  188. continue
  189. }
  190. classReq = append(classReq, ClassReq{
  191. SeriesName: list[j].Name,
  192. SeriesIssuer: list[j].CollectIssuer,
  193. ExternalUrl: list[j].Url,
  194. SeriesDes: list[j].Description,
  195. SeriesId: []string{list[j].ClassId},
  196. TotalDNA: nftNumber,
  197. AssetContracts: contracts,
  198. })
  199. }
  200. // 批量认证
  201. objJson, err := json.Marshal(&Req{
  202. Data: classReq,
  203. })
  204. if err != nil {
  205. logger.WithError(err).Error("marshal request body failed")
  206. return err
  207. }
  208. payload := bytes.NewReader(objJson)
  209. url := fmt.Sprintf("%s/auth/api/v1/series", apiUrl)
  210. req, err := http.NewRequest("POST", url, payload)
  211. if err != nil {
  212. logger.WithError(err).Error("build request failed")
  213. return err
  214. }
  215. req.Header.Add("Content-Type", "application/json")
  216. req.Header.Add("accessToken", token)
  217. client := &http.Client{Timeout: time.Duration(timeout) * time.Second}
  218. res, err := client.Do(req)
  219. if err != nil {
  220. logger.WithError(err).Error("request failed")
  221. return err
  222. }
  223. body, err := io.ReadAll(res.Body)
  224. if err != nil {
  225. logger.WithError(err).Error("read body failed")
  226. res.Body.Close()
  227. return err
  228. }
  229. res.Body.Close()
  230. var resp Resp
  231. err = json.Unmarshal(body, &resp)
  232. if err != nil {
  233. logger.WithError(err).Error("unmarshal response failed")
  234. return err
  235. }
  236. if resp.RetCode != http.StatusOK || resp.RetMsg != "ok" {
  237. logger.Error("active class failed: ", resp.RetMsg)
  238. return errors.New(resp.RetMsg)
  239. }
  240. return nil
  241. })
  242. if err != nil {
  243. // 更新状态
  244. result := db.Model(&DnaClasse{}).Where("status", 2).Where("id > ?", id).Limit(size).Order("id asc").Updates(map[string]interface{}{
  245. "status": 3, // 更新状态为认证失败
  246. "err_msg": err.Error(),
  247. })
  248. id = list[len(list)-1].Id
  249. if result.Error != nil {
  250. log2.WithField("start", sid).WithField("end", eid).WithError(result.Error).Error("update classes status failed")
  251. continue
  252. }
  253. log2.WithField("start", sid).WithField("end", eid).Info("本次处理区间")
  254. continue
  255. }
  256. id = list[len(list)-1].Id
  257. log2.WithField("start", sid).WithField("end", eid).Info("本次认证成功区间")
  258. }
  259. fmt.Println()
  260. log2.Info("========= finish ============")
  261. fmt.Println()
  262. quit <- os.Interrupt
  263. <-quit
  264. }
  265. func getToken() (string, error) {
  266. url := fmt.Sprintf("%s/registration/api/v2/getToken?apiKey=%s&apiSecret=%s", apiUrl, apiKey, apiSecret)
  267. req, err := http.NewRequest("GET", url, nil)
  268. if err != nil {
  269. log2.WithError(err).Error("build request failed")
  270. return "", err
  271. }
  272. req.Header.Add("Content-Type", "application/json")
  273. client := &http.Client{Timeout: time.Duration(timeout) * time.Second}
  274. res, err := client.Do(req)
  275. if err != nil {
  276. log2.WithError(err).Error("request failed")
  277. return "", err
  278. }
  279. body, err := ioutil.ReadAll(res.Body)
  280. if err != nil {
  281. log2.WithError(err).Error("read body failed")
  282. return "", err
  283. }
  284. var resp Resp
  285. err = json.Unmarshal(body, &resp)
  286. if err != nil {
  287. log2.WithError(err).Error("unmarshal response failed")
  288. return "", err
  289. }
  290. if resp.RetCode != http.StatusOK || resp.RetMsg != "ok" {
  291. log2.Error(resp.RetMsg)
  292. return "", err
  293. }
  294. return resp.AccessToken, nil
  295. }
  296. type TokenResp struct {
  297. Data []ClassReq `json:"data"`
  298. }
  299. type Req struct {
  300. Data []ClassReq `json:"data"`
  301. }
  302. type ClassReq struct {
  303. SeriesName string `json:"seriesName"`
  304. SeriesIssuer string `json:"seriesIssuer"`
  305. ExternalUrl string `json:"externalUrl"`
  306. SeriesDes string `json:"seriesDes"`
  307. SeriesId []string `json:"seriesId"`
  308. TotalDNA int64 `json:"totalDNA"`
  309. AssetContracts string `json:"asset_contracts"`
  310. }
  311. type Resp struct {
  312. RetCode int `json:"retCode"`
  313. RetMsg string `json:"retMsg"`
  314. AccessToken string `json:"accessToken"`
  315. }
  316. // DNA类别认证表
  317. type DnaClasse struct {
  318. Id int64 `gorm:"column:id;type:bigint(20) unsigned;primary_key;AUTO_INCREMENT;comment:主键id" json:"id"`
  319. ClassId string `gorm:"column:class_id;type:char(42);comment:类别ID;NOT NULL" json:"class_id"`
  320. Name string `gorm:"column:name;type:varchar(100);comment:类别名称;NOT NULL" json:"name"`
  321. Url string `gorm:"column:url;type:varchar(100);comment:类别url;NOT NULL" json:"url"`
  322. Description string `gorm:"column:description;type:varchar(200);comment:类别描述;NOT NULL" json:"description"`
  323. Status int `gorm:"column:status;type:tinyint(4);default:2;comment:dna认证状态 1:已认证 2:未认证;NOT NULL" json:"status"`
  324. CollectIssuer string `gorm:"column:collect_issuer;type:varchar(50);comment:集合发行方;NOT NULL" json:"collect_issuer"`
  325. CreatedAt time.Time `gorm:"<-:false"`
  326. UpdatedAt time.Time `gorm:"<-:false"`
  327. }
  328. // DNA NFT认证表
  329. type DnaNft struct {
  330. Id uint64 `gorm:"column:id;type:bigint(20) unsigned;primary_key;AUTO_INCREMENT" json:"id"`
  331. ClassId string `gorm:"column:class_id;type:char(42);comment:类别ID;NOT NULL" json:"class_id"`
  332. TokenId int64 `gorm:"column:token_id;type:bigint(20);default:0;comment:NFT ID;NOT NULL" json:"token_id"`
  333. Owner string `gorm:"column:owner;type:char(42);comment:资产拥有者;NOT NULL" json:"owner"`
  334. Name string `gorm:"column:name;type:varchar(100);comment:资产名称;NOT NULL" json:"name"`
  335. Url string `gorm:"column:url;type:varchar(200);comment:图片url;NOT NULL" json:"url"`
  336. DisplayUrl string `gorm:"column:display_url;type:varchar(200);comment:缩略图url;NOT NULL" json:"display_url"`
  337. Hash string `gorm:"column:hash;type:varchar(100);comment:图片哈希;NOT NULL" json:"hash"`
  338. Price float64 `gorm:"column:price;type:decimal(10,2);default:0.00;comment:发行加价格;NOT NULL" json:"price"`
  339. Description string `gorm:"column:description;type:varchar(255);comment:资产描述;NOT NULL" json:"description"`
  340. Status int `gorm:"column:status;type:tinyint(4);default:2;comment:dna认证状态 1:已认证 2:未认证;NOT NULL" json:"status"`
  341. Timestamp time.Time `gorm:"column:timestamp;type:datetime;comment:交易上链时间" json:"timestamp"`
  342. TxId int64 `gorm:"column:tx_id;type:bigint(20);default:0;comment:交易表主键id;NOT NULL" json:"tx_id"`
  343. CreatedAt time.Time `gorm:"<-:false"`
  344. UpdatedAt time.Time `gorm:"<-:false"`
  345. }