//package main // //import ( // "database/sql" // "fmt" // _ "github.com/taosdata/driver-go/v3/taosSql" // "log" // "math/rand" // "sync" // "time" //) // //var DateFmtYYYYMMDDHHmmss = "2006-01-02 15:04:05" // //func main() { // var taosDSN = "root:taosdata@tcp(192.168.0.153:6030)/test2" // taos, err := sql.Open("taosSql", taosDSN) // if err != nil { // log.Fatalln("failed to connect TDengine, err:", err) // return // } // defer taos.Close() // // // 测试插入数据 // now := time.Now().Add(-18 * time.Hour) // // list := getObject(100) // // fmt.Println("开始 ", time.Now().Format(DateFmtYYYYMMDDHHmmss)) // wg := sync.WaitGroup{} // for p := 0; p < len(list); p++ { // wg.Add(1) // go func(p int) { // defer wg.Done() // // for i := 40; i > 1; i-- { // num := rand.Intn(24) // day := now.AddDate(0, 0, -i).Add(time.Duration(num) * time.Hour) // // data := "" // for j := 0; j < 10001; j++ { // //for j := 0; j < 5; j++ { // num := 1 // addMill := rand.Intn(3000) + 1 // operation := rand.Intn(19) + 1 // operationType := rand.Intn(1) + 1 // // //fmt.Println(day.Add(time.Duration(j*22)*time.Minute).Format(DateFmtYYYYMMDDHHmmss), a) // data = fmt.Sprintf("%s (%d,%d)", data, day.Add(time.Duration(j*addMill)*time.Second).UnixMilli(), num) // if j%10000 == 0 { // //if j%2 == 0 { // // insert into 1_1_2 USING tx_metrics TAGS (1,1,2,4,1) values .... // data = fmt.Sprintf("insert into %d_%d_%d USING tx_metrics TAGS (%d, %d, %d, %d, %d) values%s", list[p].Chain, list[p].Uid, list[p].Pid, list[p].Chain, list[p].Uid, list[p].Pid, operation, operationType, data) // // _, err = taos.Exec(data) // if err != nil { // fmt.Println("failed to insert, err:", err) // fmt.Println("failed to insert, cup:", list[p]) // return // } // data = "" // } // } // } // // fmt.Println("完成 ", time.Now().Format(DateFmtYYYYMMDDHHmmss), " 当前cup: ", list[p]) // }(p) // wg.Wait() // fmt.Println("结束 ", time.Now().Format(DateFmtYYYYMMDDHHmmss)) // // } //} // //type TxObject struct { // Chain int // Uid int // Pid int // //Account string //} // //func getObject(num int) []TxObject { // var list []TxObject // for i := 0; i < num; i++ { // rand.Seed(time.Now().UnixNano()) // chain := rand.Intn(3) + 1 // user := rand.Intn(10) + 1 // project1 := rand.Intn(20) + 1 // project2 := rand.Intn(20) + 1 // list = append(list, TxObject{ // Chain: chain, // Uid: user, // Pid: project1, // }, TxObject{ // Chain: chain, // Uid: user, // Pid: project2, // }) // } // return list //} package main import ( "context" "database/sql" "errors" "fmt" "log" "math/rand" "strings" "time" "golang.org/x/sync/errgroup" _ "github.com/taosdata/driver-go/v3/taosSql" ) func Client() *sql.DB { var taosDSN = "root:taosdata@tcp(192.168.0.153:6030)/avata_1" taos, err := sql.Open("taosSql", taosDSN) if err != nil { log.Fatalln("failed to connect TDengine, err:", err) return nil } return taos } type TxGas struct { ChainID int UserID int ProjectID int Account string Timestamp int64 Gas int } func Producer(v chan<- *[]*TxGas) { defer close(v) rand.Seed(time.Now().UnixNano()) // 随机生成链账户 3000w // 对于每个链账户的不同 operation 都造一个 point group, ctx := errgroup.WithContext(context.Background()) for i := 1; i <= 1000; i++ { chainId := rand.Intn(3) + 1 userId := i prjId := i group.Go(func() error { for a := 0; a < 5000; a++ { points := []*TxGas{} gas := 0 select { case <-ctx.Done(): return nil case <-time.After(time.Second * 10): return errors.New("timeout 10s") default: } address := RandAddress("iaa", 39) // 每个链账户的 20 种操作类型随机出 30 个 for j := 10; j > 0; j-- { gas += rand.Intn(1e5) points = append(points, &TxGas{ ChainID: chainId, UserID: userId, ProjectID: prjId, Timestamp: time.Now().Add(-(time.Hour * time.Duration(j))).UnixMilli(), Account: address, Gas: gas, }) } v <- &points } return nil }) } err := group.Wait() if err != nil { log.Fatalf("group err: %v", err) return } } func Consumer(v <-chan *[]*TxGas) { client := Client() group, ctx := errgroup.WithContext(context.Background()) count := 0 // 30418 for i := 0; i < 100; i++ { group.Go(func() error { for points, ok := <-v; ok; { select { case <-ctx.Done(): return nil case <-time.After(time.Second * 10): log.Println(errors.New("timeout 10s")) //return errors.New("timeout 10s") default: } data := "" for _, p := range *points { data += fmt.Sprintf(" (%d,%d)", p.Timestamp, p.Gas) } data = fmt.Sprintf("insert into %s USING txs_gas TAGS (%d, %d, %d, '%s') values %s", (*points)[0].Account, (*points)[0].ChainID, (*points)[0].UserID, (*points)[0].ProjectID, (*points)[0].Account, data) //fmt.Println(data) _, err := client.Exec(data) if err != nil { fmt.Println("failed to insert, err:", err) continue } count += 100 fmt.Println("count:", count, " ,len of v :", len(v)) points, ok = <-v } return nil }) } err := group.Wait() if err != nil { log.Fatalf("group err: %v", err) return } } func main() { v := make(chan *[]*TxGas, 100) go Producer(v) Consumer(v) } func RandAddress(prefix string, n int) string { return prefix + RandomString(n) } func RandomString(l int) string { now := time.Now().Format("060102150405.000") nowFmt := strings.Replace(now, ".", "", -1) str := "0123456789abcdefghijklmnopqrstuvwxyz" bytes := []byte(str) var result []byte j := 0 r := rand.New(rand.NewSource(time.Now().UnixNano() + int64(rand.Intn(100)))) for i := 0; i < l; i++ { if i%2 != 0 && j < len(nowFmt) { result = append(result, nowFmt[j]) j++ } else { result = append(result, bytes[r.Intn(len(bytes))]) } } return string(result) } // RandTime 随机生成近 n 天内的任意某天的时间点 func RandTime(n int64) time.Time { start := time.Now().Add(-time.Hour * 24 * time.Duration(n)) rand.Seed(time.Now().UnixNano()) hours := n * 24 return start.Add(time.Duration(rand.Intn(int(hours))) * time.Hour) }