package logic import ( "context" "encoding/json" "fmt" "github.com/furryboard/spider-scheduler/pkg/conf" "github.com/furryboard/spider-scheduler/pkg/dao" "github.com/furryboard/spider-scheduler/pkg/dao/model" "github.com/furryboard/spider-scheduler/pkg/exception" "github.com/furryboard/spider-scheduler/pkg/log" "github.com/furryboard/spider-scheduler/rpc" "github.com/furryboard/spider-scheduler/rpc/pb" "gorm.io/gorm" "strconv" "sync" "time" ) // GetValidUsers 分页获取用户信息(数据库) func GetValidUsers(page, pageSize int) ([]*model.Furry, error) { // 分页查询 var furries []*model.Furry //tx := dao.DB().Where("status = 0").Offset((page - 1) * pageSize).Limit(pageSize).Find(&furries) tx := dao.DB().Offset((page - 1) * pageSize).Limit(pageSize).Find(&furries) if tx.Error != nil { return nil, exception.ErrFetchFurries("分页获取Furry列表时失败:" + tx.Error.Error()) } return furries, nil } // GetUserInfoByUID 获取用户粉丝数 包括名称,性别,签名等 func GetUserInfoByUID(uid uint) (*pb.InfoReply_Data, error) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() info, err := rpc.SpiderCore().GetUserBasicInfo(ctx, &pb.InfoRequest{ Uid: uint64(uid), Cookie: conf.Conf.SpiderCore.BiliCookie, }) if err != nil { return nil, exception.InternalError("获取用户信息失败:" + err.Error()) } if info.Code != 200 { if info.Msg == "" { info.Msg = "Unknown" } return nil, exception.ErrFetchUserInfo( fmt.Sprintf("用户[%d]基础信息获取失败:%s(Code: %d)", uid, info.Msg, info.Code)) } return info.Data, nil } // IsUserExistsByUID 通过UID判断用户是否存在数据库 func IsUserExistsByUID(uid uint) (bool, error) { var count int64 tx := dao.DB().Model(&model.Furry{}).Where("uid = ?", uid).Count(&count) if tx.Error != nil { return false, exception.ErrFetchFurries(fmt.Sprintf("查找用户[%d]时发生错误:%s", uid, tx.Error.Error())) } if count == 0 { return false, nil } else { return true, nil } } // BatchGetUserInfo 批量获取用户基础信息 func BatchGetUserInfo(userIds *[]uint) []*pb.InfoReply_Data { poolSize := 8 sem := make(chan struct{}, poolSize) defer close(sem) wg := sync.WaitGroup{} results := make(chan *pb.InfoReply_Data, len(*userIds)) defer close(results) for _, uid := range *userIds { sem <- struct{}{} wg.Add(1) go func(uid uint) { <-sem defer wg.Done() info, err := GetUserInfoByUID(uid) if err != nil { log.Logger().Warnf("用户[%d]基础信息获取失败:%s。已忽略", uid, err) return } results <- info log.Logger().Debugf("获取用户[%d]基础信息成功", uid) }(uid) } wg.Wait() s := make([]*pb.InfoReply_Data, 0) resultLen := len(results) for i := 0; i < resultLen; i++ { s = append(s, <-results) } return s } func UpdateUserInfo(users []*model.Furry, infos []*pb.InfoReply_Data) (int, error) { // 以UID为索引转换为map infoMap := make(map[uint]*pb.InfoReply_Data) for _, info := range infos { infoMap[uint(info.Mid)] = info } // 检测变化 changelogs, changes, err := createChangelogs(users, infos) if err != nil { return 0, err } if len(changelogs) == 0 { // 无变化 return 0, nil } // 数据库事务 err = dao.DB().Transaction(func(tx *gorm.DB) error { for _, changed := range changes { // 更新furries表 if err = tx.Model(&model.Furry{ID: changed.ID}).Updates(changed).Error; err != nil { return err } } for _, changelog := range changelogs { // 插入changelogs表 if err = tx.Create(changelog).Error; err != nil { return err } } return nil }) if err != nil { return 0, err } return len(changelogs), nil } func userInfoDiff(user *model.Furry, info *pb.InfoReply_Data) (*model.Changelog, *model.Furry, error) { if uint64(user.UID) != info.Mid { return nil, nil, exception.ErrUserMismatch(fmt.Sprintf("对比用户ID不匹配:%d和%d", user.UID, info.Mid)) } changelog := model.Changelog{ Operation: "", Target: strconv.FormatUint(uint64(user.UID), 10), Result: "", } updated := model.Furry{ ID: user.ID, } if user.Name != info.Name { result, _ := json.Marshal(map[string]string{"previous_name": user.Name, "new_name": info.Name}) changelog.Operation = "UpdateName" changelog.Result = string(result) updated.Name = info.Name return &changelog, &updated, nil } else if info.LiveRoom != nil && user.LiveRoomID != uint(info.LiveRoom.Roomid) { updated.LiveRoomID = uint(info.LiveRoom.Roomid) changelog.Operation = "AddLiveRoom" changelog.Result = strconv.FormatUint(info.LiveRoom.Roomid, 10) return &changelog, &updated, nil } else { return nil, nil, nil } } func createChangelogs(users []*model.Furry, infos []*pb.InfoReply_Data) ([]*model.Changelog, []*model.Furry, error) { // 以UID为索引转换为map infoMap := make(map[uint]*pb.InfoReply_Data) for _, i := range infos { infoMap[uint(i.Mid)] = i } // 检测变化 changelogs := make([]*model.Changelog, 0) changes := make([]*model.Furry, 0) for _, u := range users { if info, ok := infoMap[u.UID]; ok { changelog, changed, err := userInfoDiff(u, info) if err != nil { return nil, nil, err } if changed == nil { // 无变化 continue } changes = append(changes, changed) changelogs = append(changelogs, changelog) } } return changelogs, changes, nil } // GetStatByUID 获取用户粉丝数 func GetStatByUID(uid uint) (*pb.StatReply_Data, error) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() info, err := rpc.SpiderCore().GetUserStat(ctx, &pb.StatRequest{Uid: uint64(uid)}) if err != nil { return nil, exception.InternalError("获取用户信息失败:" + err.Error()) } if info.Code != 200 { if info.Msg == "" { info.Msg = "Unknown" } return nil, exception.ErrFetchStat( fmt.Sprintf("用户[%d]粉丝数获取失败:%s(Code: %d)", uid, info.Msg, info.Code)) } return info.Data, nil } // BatchGetUserStat 批量获取用户粉丝数 func BatchGetUserStat(uids *[]uint) []*pb.StatReply_Data { poolSize := 8 sem := make(chan struct{}, poolSize) defer close(sem) wg := sync.WaitGroup{} results := make(chan *pb.StatReply_Data, len(*uids)) defer close(results) for _, uid := range *uids { sem <- struct{}{} wg.Add(1) go func(uid uint) { <-sem defer wg.Done() stat, err := GetStatByUID(uid) if err != nil { log.Logger().Warnf("用户[%d]粉丝数获取失败:%s。已忽略", uid, err) return } results <- stat log.Logger().Debugf("获取用户[%d]粉丝数成功", uid) }(uid) } wg.Wait() s := make([]*pb.StatReply_Data, 0) resultLen := len(results) for i := 0; i < resultLen; i++ { s = append(s, <-results) } return s } func UpdateFansToDB(stats []*pb.StatReply_Data) (int, error) { var err error // 抽取必要字段转为数据库模型 var models []*model.Fans for _, stat := range stats { models = append(models, &model.Fans{ UID: uint(stat.Mid), Fans: uint(stat.Follower), }) } // 数据库事务 err = dao.DB().Transaction(func(tx *gorm.DB) error { for _, m := range models { if err = tx.Create(m).Error; err != nil { return err } } return nil }) if err != nil { return 0, err } return len(stats), nil } func AddUser(user *model.Furry) error { tx := dao.DB().Create(user) return tx.Error }