完成用户名更新,粉丝数获取的基础功能

This commit is contained in:
2022-12-20 21:45:52 +08:00
commit 511d889557
28 changed files with 2084 additions and 0 deletions

79
pkg/conf/conf.go Normal file
View File

@@ -0,0 +1,79 @@
package conf
import (
"encoding/json"
"github.com/BurntSushi/toml"
"github.com/eigeen/furryboard/spider-scheduler/pkg/log"
"github.com/eigeen/furryboard/spider-scheduler/pkg/util"
"github.com/mcuadros/go-defaults"
"os"
)
type Config struct {
Common Common `toml:"common"`
Database Database `toml:"database"`
SpiderCore SpiderCore `toml:"spider_core"`
}
type Common struct {
}
type Database struct {
Host string `toml:"host" default:"localhost"`
Port uint16 `toml:"port" default:"5432"`
User string `toml:"user" default:"user"`
Password string `toml:"password" default:"password"`
DB string `toml:"db" default:"furryboard"`
}
type SpiderCore struct {
Host string `toml:"host" default:"localhost"`
Port uint16 `toml:"port" default:"9996"`
}
var (
Conf *Config
)
// 释放默认配置文件
func releaseConfig(file string, config *Config) {
f, err := util.CreateNestedFile(file)
if err != nil {
log.Logger().Fatalf("创建默认配置文件失败: %s", err.Error())
}
encoder := toml.NewEncoder(f)
err = encoder.Encode(config)
if err != nil {
log.Logger().Fatalf("创建默认配置文件失败: %s", err.Error())
}
}
func InitConfig(file string) {
Conf = &Config{}
defaults.SetDefaults(Conf)
if util.NotExists(file) {
// 默认配置文件不存在则创建
if file == "config.toml" {
releaseConfig(file, Conf)
log.Logger().Infof("已创建默认配置文件: %s请修改配置文件内容后重新启动", file)
os.Exit(0)
} else {
log.Logger().Fatalf("配置文件不存在: %s", file)
}
}
// 解析配置文件
_, err := toml.DecodeFile(file, &Conf)
if err != nil {
log.Logger().Fatalf("配置文件解析出错: %s", err.Error())
}
// 配置预检查
jsonConfig, err := json.Marshal(Conf)
if err == nil {
log.Logger().Debugf("配置文件已加载: %s", string(jsonConfig))
}
}

77
pkg/dao/dao.go Normal file
View File

@@ -0,0 +1,77 @@
package dao
import (
"fmt"
"github.com/eigeen/furryboard/spider-scheduler/pkg/conf"
"github.com/eigeen/furryboard/spider-scheduler/pkg/dao/model"
"github.com/eigeen/furryboard/spider-scheduler/pkg/log"
"github.com/sirupsen/logrus"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"sync"
"time"
)
var (
once sync.Once
db *gorm.DB
)
func DB() *gorm.DB {
once.Do(initDB)
return db
}
type LogrusWriter struct {
log *logrus.Logger
}
func (w *LogrusWriter) Printf(format string, v ...interface{}) {
logStr := fmt.Sprintf(format, v...)
w.log.WithField("method", "GORM").Warn(logStr)
}
func NewLogger() *LogrusWriter {
return &LogrusWriter{log: log.Logger()}
}
func initDB() {
var err error
dsn := fmt.Sprintf(
"host=%s user=%s password=%s dbname=%s port=%d sslmode=disable TimeZone=Asia/Shanghai",
conf.Conf.Database.Host,
conf.Conf.Database.User,
conf.Conf.Database.Password,
conf.Conf.Database.DB,
conf.Conf.Database.Port,
)
// >= 1s SQL慢查询
slowLogger := logger.New(NewLogger(), logger.Config{
SlowThreshold: time.Second * 1,
LogLevel: logger.Warn,
})
db, err = gorm.Open(postgres.Open(dsn), &gorm.Config{
Logger: slowLogger,
})
if err != nil {
log.Logger().Fatal("创建数据库连接失败:", err)
}
autoMigrate()
}
func autoMigrate() {
var err error
err = db.AutoMigrate(&model.Changelog{})
err = db.AutoMigrate(&model.Fans{})
if err != nil {
return
}
if err != nil {
log.Logger().Fatal("关联数据表失败:", err)
}
}

View File

@@ -0,0 +1,10 @@
package model
import "gorm.io/gorm"
type Changelog struct {
gorm.Model
Operation string `gorm:"index;not null;default:''"`
Target string `gorm:"index;not null;default:''"`
Result string `gorm:"not null;default:''"`
}

13
pkg/dao/model/fans.go Normal file
View File

@@ -0,0 +1,13 @@
package model
import "gorm.io/gorm"
type Fans struct {
gorm.Model
UID uint `gorm:"index;not null"`
Fans uint `gorm:"index;not null;default:0"`
}
func (Fans) TableName() string {
return "fans"
}

12
pkg/dao/model/furry.go Normal file
View File

@@ -0,0 +1,12 @@
package model
type Furry struct {
ID uint `gorm:"primaryKey"`
UID uint `gorm:"uniqueIndex;not null"`
Name string `gorm:"index;not null;default:''"`
Status int `gorm:"index;not null;default:0"`
}
func (Furry) TableName() string {
return "furries"
}

15
pkg/exception/base.go Normal file
View File

@@ -0,0 +1,15 @@
package exception
import (
"fmt"
)
type BaseError struct {
Code int
Msg string
ShortMsg string
}
func (err BaseError) Error() string {
return fmt.Sprintf("%d - 错误: %s - 原因:%s", err.Code, err.ShortMsg, err.Msg)
}

26
pkg/exception/business.go Normal file
View File

@@ -0,0 +1,26 @@
package exception
func InternalError(msg string) BaseError {
shortMsg := "系统异常,请联系管理员"
return BaseError{Code: 500, Msg: msg, ShortMsg: shortMsg}
}
func ErrFetchUserInfo(msg string) BaseError {
shortMsg := "用户信息获取失败"
return BaseError{Code: 5001, Msg: msg, ShortMsg: shortMsg}
}
func ErrFetchFurries(msg string) BaseError {
shortMsg := "获取Furry列表失败"
return BaseError{Code: 5002, Msg: msg, ShortMsg: shortMsg}
}
func ErrUserMismatch(msg string) BaseError {
shortMsg := "操作的两个用户不匹配"
return BaseError{Code: 5003, Msg: msg, ShortMsg: shortMsg}
}
func ErrFetchStat(msg string) BaseError {
shortMsg := "获取用户统计信息失败"
return BaseError{Code: 5004, Msg: msg, ShortMsg: shortMsg}
}

32
pkg/log/logger.go Normal file
View File

@@ -0,0 +1,32 @@
package log
import (
nested "github.com/antonfisher/nested-logrus-formatter"
"github.com/sirupsen/logrus"
)
var logger *logrus.Logger
func Logger() *logrus.Logger {
return logger
}
func InitLogger(debug bool) {
log := logrus.New()
log.SetFormatter(&nested.Formatter{
FieldsOrder: []string{"method", "url", "statusCode", "spendTime"},
HideKeys: true,
NoFieldsColors: true,
TimestampFormat: "2006-01-02 15:04:05.000",
})
var lvl logrus.Level
switch debug {
case true:
lvl = logrus.DebugLevel
case false:
lvl = logrus.InfoLevel
}
log.SetLevel(lvl)
logger = log
}

12
pkg/logic/config.toml Normal file
View File

@@ -0,0 +1,12 @@
[common]
[database]
host = "localhost"
port = 5432
user = "user"
password = "password"
db = "furryboard"
[spider_core]
host = "localhost"
port = 9101

242
pkg/logic/user.go Normal file
View File

@@ -0,0 +1,242 @@
package logic
import (
"context"
"encoding/json"
"fmt"
"github.com/eigeen/furryboard/spider-scheduler/pkg/dao"
"github.com/eigeen/furryboard/spider-scheduler/pkg/dao/model"
"github.com/eigeen/furryboard/spider-scheduler/pkg/exception"
"github.com/eigeen/furryboard/spider-scheduler/pkg/log"
"github.com/eigeen/furryboard/spider-scheduler/rpc"
"github.com/eigeen/furryboard/spider-scheduler/rpc/pb"
"gorm.io/gorm"
"strconv"
"sync"
"time"
)
// GetUsers 分页获取用户信息(数据库)
func GetUsers(page, pageSize int) ([]*model.Furry, error) {
// 分页查询
var furries []*model.Furry
tx := dao.DB().Offset((page - 1) * pageSize).Limit(pageSize).Find(&furries)
if tx.Error != nil {
return nil, exception.ErrFetchFurries("分页获取Furry列表时失败" + tx.Error.Error())
}
return furries, nil
}
// GetUserInfoByUID 获取用户粉丝数 包括名称,性别,签名等
func GetUserInfoByUID(uid uint) (*pb.InfoReply_Data, error) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
info, err := rpc.SpiderCore().GetUserBasicInfo(ctx, &pb.InfoRequest{Uid: uint64(uid)})
if err != nil {
return nil, exception.InternalError("获取用户信息失败:" + err.Error())
}
if info.Code != 200 {
if info.Msg == "" {
info.Msg = "Unknown"
}
return nil, exception.ErrFetchUserInfo(
fmt.Sprintf("用户[%d]基础信息获取失败:%s(Code: %d)", uid, info.Msg, info.Code))
}
return info.Data, nil
}
// BatchGetUserInfo 批量获取用户基础信息
func BatchGetUserInfo(userIds *[]uint) []*pb.InfoReply_Data {
poolSize := 8
sem := make(chan struct{}, poolSize)
defer close(sem)
wg := sync.WaitGroup{}
results := make(chan *pb.InfoReply_Data, len(*userIds))
defer close(results)
for _, uid := range *userIds {
sem <- struct{}{}
wg.Add(1)
go func(uid uint) {
<-sem
defer wg.Done()
info, err := GetUserInfoByUID(uid)
if err != nil {
log.Logger().Warnf("用户[%d]基础信息获取失败:%s。已忽略", uid, err)
return
}
results <- info
log.Logger().Debugf("获取用户[%d]基础信息成功", uid)
}(uid)
}
wg.Wait()
s := make([]*pb.InfoReply_Data, 0)
resultLen := len(results)
for i := 0; i < resultLen; i++ {
s = append(s, <-results)
}
return s
}
func UpdateUserInfo(users []*model.Furry, infos []*pb.InfoReply_Data) (int, error) {
// 以UID为索引转换为map
infoMap := make(map[uint]*pb.InfoReply_Data)
for _, i := range infos {
infoMap[uint(i.Mid)] = i
}
// 检测变化
changelogs, changes, err := createChangelogs(users, infos)
if err != nil {
return 0, err
}
if len(changelogs) == 0 { // 无变化
return 0, nil
}
// 数据库事务
err = dao.DB().Transaction(func(tx *gorm.DB) error {
for _, changed := range changes {
// 更新furries表
if err = tx.Model(&model.Furry{ID: changed.ID}).Updates(changed).Error; err != nil {
return err
}
}
for _, changelog := range changelogs {
// 插入changelogs表
if err = tx.Create(changelog).Error; err != nil {
return err
}
}
return nil
})
if err != nil {
return 0, err
}
return len(changelogs), nil
}
func userInfoDiff(user *model.Furry, info *pb.InfoReply_Data) (*model.Changelog, *model.Furry, error) {
if uint64(user.UID) != info.Mid {
return nil, nil, exception.ErrUserMismatch(fmt.Sprintf("对比用户ID不匹配%d和%d", user.UID, info.Mid))
}
changelog := model.Changelog{
Operation: "UpdateName",
Target: strconv.FormatUint(uint64(user.UID), 10),
Result: "",
}
updated := model.Furry{
ID: user.ID,
}
if user.Name != info.Name {
result, _ := json.Marshal(map[string]string{"previous_name": user.Name, "new_name": info.Name})
changelog.Result = string(result)
updated.Name = info.Name
return &changelog, &updated, nil
} else {
return nil, nil, nil
}
}
func createChangelogs(users []*model.Furry, infos []*pb.InfoReply_Data) ([]*model.Changelog, []*model.Furry, error) {
// 以UID为索引转换为map
infoMap := make(map[uint]*pb.InfoReply_Data)
for _, i := range infos {
infoMap[uint(i.Mid)] = i
}
// 检测变化
changelogs := make([]*model.Changelog, 0)
changes := make([]*model.Furry, 0)
for _, u := range users {
if info, ok := infoMap[u.UID]; ok {
changelog, changed, err := userInfoDiff(u, info)
if err != nil {
return nil, nil, err
}
if changed == nil { // 无变化
continue
}
changes = append(changes, changed)
changelogs = append(changelogs, changelog)
}
}
return changelogs, changes, nil
}
// GetStatByUID 获取用户粉丝数
func GetStatByUID(uid uint) (*pb.StatReply_Data, error) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
info, err := rpc.SpiderCore().GetUserStat(ctx, &pb.StatRequest{Uid: uint64(uid)})
if err != nil {
return nil, exception.InternalError("获取用户信息失败:" + err.Error())
}
if info.Code != 200 {
if info.Msg == "" {
info.Msg = "Unknown"
}
return nil, exception.ErrFetchStat(
fmt.Sprintf("用户[%d]粉丝数获取失败:%s(Code: %d)", uid, info.Msg, info.Code))
}
return info.Data, nil
}
// BatchGetUserStat 批量获取用户粉丝数
func BatchGetUserStat(uids *[]uint) []*pb.StatReply_Data {
poolSize := 8
sem := make(chan struct{}, poolSize)
defer close(sem)
wg := sync.WaitGroup{}
results := make(chan *pb.StatReply_Data, len(*uids))
defer close(results)
for _, uid := range *uids {
sem <- struct{}{}
wg.Add(1)
go func(uid uint) {
<-sem
defer wg.Done()
stat, err := GetStatByUID(uid)
if err != nil {
log.Logger().Warnf("用户[%d]粉丝数获取失败:%s。已忽略", uid, err)
return
}
results <- stat
log.Logger().Debugf("获取用户[%d]粉丝数成功", uid)
}(uid)
}
wg.Wait()
s := make([]*pb.StatReply_Data, 0)
resultLen := len(results)
for i := 0; i < resultLen; i++ {
s = append(s, <-results)
}
return s
}
func UpdateFansToDB(stats []*pb.StatReply_Data) (int, error) {
var err error
// 抽取必要字段转为数据库模型
var models []*model.Fans
for _, stat := range stats {
models = append(models, &model.Fans{
UID: uint(stat.Mid),
Fans: uint(stat.Follower),
})
}
// 数据库事务
err = dao.DB().Transaction(func(tx *gorm.DB) error {
for _, m := range models {
if err = tx.Create(m).Error; err != nil {
return err
}
}
return nil
})
if err != nil {
return 0, err
}
return len(stats), nil
}

33
pkg/logic/user_test.go Normal file
View File

@@ -0,0 +1,33 @@
package logic
import (
"github.com/eigeen/furryboard/spider-scheduler/pkg/conf"
"github.com/eigeen/furryboard/spider-scheduler/pkg/log"
"testing"
)
func BeforeTesting() {
log.InitLogger(false)
conf.InitConfig("config.toml")
}
func TestGetUserInfoByUID(t *testing.T) {
BeforeTesting()
info, err := GetUserInfoByUID(686127)
if err != nil {
t.Error(err)
return
}
t.Log(info)
}
func TestBatchGetUserInfo(t *testing.T) {
BeforeTesting()
userIds := []uint{1635354787, 10504485, 272460888, 474481910, 82191626}
infos := BatchGetUserInfo(&userIds)
if len(infos) != len(userIds) {
t.Errorf("结果长度不正确:应为%d实际为%d", len(userIds), len(infos))
return
}
}

12
pkg/task/config.toml Normal file
View File

@@ -0,0 +1,12 @@
[common]
[database]
host = "localhost"
port = 5432
user = "postgres"
password = "postrootpwd"
db = "furryboard"
[spider_core]
host = "localhost"
port = 9101

103
pkg/task/user.go Normal file
View File

@@ -0,0 +1,103 @@
package task
import (
"github.com/eigeen/furryboard/spider-scheduler/pkg/dao"
"github.com/eigeen/furryboard/spider-scheduler/pkg/dao/model"
"github.com/eigeen/furryboard/spider-scheduler/pkg/exception"
"github.com/eigeen/furryboard/spider-scheduler/pkg/log"
"github.com/eigeen/furryboard/spider-scheduler/pkg/logic"
"math"
"time"
)
// UpdateUserInfo 更新数据库内用户信息
// 会对原始表进行更改并将更改记录写入changelogs
func UpdateUserInfo() {
// 预查询数量
// TODO: 预查询数量和分页策略功能可复用
var count int64
tx := dao.DB().Model(&model.Furry{}).Count(&count)
if tx.Error != nil {
err := exception.ErrFetchFurries("获取furries数量失败" + tx.Error.Error())
log.Logger().Errorf("获取furries数量失败%s", err)
return
}
pageSize := 32
maxPage := int(math.Ceil(float64(count) / float64(pageSize)))
for page := 0; page <= maxPage; page++ {
// 获取目标用户列表(分页)
users, err := logic.GetUsers(page, pageSize)
if err != nil {
log.Logger().Errorf("获取用户列表时发生错误:%s", err)
continue
}
// 已获取完毕
if users == nil {
return
}
// 抽取uid列表
var uids []uint
for _, user := range users {
uids = append(uids, user.UID)
}
// 通过API获取用户信息
infos := logic.BatchGetUserInfo(&uids)
// 检查更新内容更新furries表插入changelogs使用事务
c, err := logic.UpdateUserInfo(users, infos)
if err != nil {
log.Logger().Errorf("更新数据时发生错误:%s", err)
continue
}
log.Logger().Infof("成功更新了%d个用户信息", c)
// delay
time.Sleep(2 * time.Second)
}
}
func UpdateFans() {
// TODO: 与UpdateUserInfo有大量重复可抽离复用代码
// 预查询数量
var count int64
tx := dao.DB().Model(&model.Furry{}).Count(&count)
if tx.Error != nil {
err := exception.ErrFetchFurries("获取furries数量失败" + tx.Error.Error())
log.Logger().Errorf("获取furries数量失败%s", err)
return
}
pageSize := 4
maxPage := int(math.Ceil(float64(count) / float64(pageSize)))
for page := 0; page <= maxPage; page++ {
// 获取目标用户列表(分页)
users, err := logic.GetUsers(page, pageSize)
if err != nil {
log.Logger().Errorf("获取用户列表时发生错误:%s", err)
continue
}
// 已获取完毕
if users == nil {
return
}
// 抽取uid列表
var uids []uint
for _, user := range users {
uids = append(uids, user.UID)
}
// 通过API获取用户统计信息
stats := logic.BatchGetUserStat(&uids)
// 插入表
c, err := logic.UpdateFansToDB(stats)
if err != nil {
log.Logger().Errorf("更新粉丝数量时发生错误:%s", err)
return
}
log.Logger().Infof("成功更新了%d个用户粉丝数", c)
time.Sleep(2 * time.Second)
}
}
func UpdateFansAndInfo() {
}

22
pkg/task/user_test.go Normal file
View File

@@ -0,0 +1,22 @@
package task
import (
"github.com/eigeen/furryboard/spider-scheduler/pkg/conf"
"github.com/eigeen/furryboard/spider-scheduler/pkg/log"
"testing"
)
func BeforeTesting() {
log.InitLogger(false)
conf.InitConfig("config.toml")
}
func TestUpdateUserInfo(t *testing.T) {
BeforeTesting()
UpdateUserInfo()
}
func TestUpdateFans(t *testing.T) {
BeforeTesting()
UpdateFans()
}

30
pkg/util/file.go Normal file
View File

@@ -0,0 +1,30 @@
package util
import (
"os"
"path/filepath"
)
func Exists(name string) bool {
if _, err := os.Stat(name); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}
func NotExists(name string) bool {
return !Exists(name)
}
func CreateNestedFile(path string) (*os.File, error) {
basePath := filepath.Dir(path)
if !Exists(basePath) {
err := os.MkdirAll(basePath, 0700)
if err != nil {
return nil, err
}
}
return os.Create(path)
}

36
pkg/util/rand.go Normal file
View File

@@ -0,0 +1,36 @@
package util
import (
"math/rand"
"time"
"unsafe"
)
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"
var src = rand.NewSource(time.Now().UnixNano())
const (
// 6 bits to represent a letter index
letterIdBits = 6
// All 1-bits as many as letterIdBits
letterIdMask = 1<<letterIdBits - 1
letterIdMax = 63 / letterIdBits
)
func RandStr(n int) string {
b := make([]byte, n)
// A rand.Int63() generates 63 random bits, enough for letterIdMax letters!
for i, cache, remain := n-1, src.Int63(), letterIdMax; i >= 0; {
if remain == 0 {
cache, remain = src.Int63(), letterIdMax
}
if idx := int(cache & letterIdMask); idx < len(letters) {
b[i] = letters[idx]
i--
}
cache >>= letterIdBits
remain--
}
return *(*string)(unsafe.Pointer(&b))
}

21
pkg/util/slice.go Normal file
View File

@@ -0,0 +1,21 @@
package util
func InStringSlice(sl []string, ele string) bool {
slMap := convertStrSlice2Map(sl)
return inMap(slMap, ele)
}
// ConvertStrSlice2Map 将字符串 slice 转为 map[string]struct{}。
func convertStrSlice2Map(sl []string) map[string]struct{} {
set := make(map[string]struct{}, len(sl))
for _, v := range sl {
set[v] = struct{}{}
}
return set
}
// InMap 判断字符串是否在 map 中。
func inMap(m map[string]struct{}, s string) bool {
_, ok := m[s]
return ok
}

23
pkg/util/time.go Normal file
View File

@@ -0,0 +1,23 @@
package util
import "time"
func ToSQLTimeFormat(t time.Time) string {
return t.Format("2006-01-02 15:04:05")
}
func MustParseSQLTime(timeStr string) time.Time {
timeObj, err := time.ParseInLocation("2006-01-02 15:04:05", timeStr, time.Local)
if err != nil {
panic(err)
}
return timeObj
}
func IsSQLTimeFormat(timeStr string) bool {
_, err := time.ParseInLocation("2006-01-02 15:04:05", timeStr, time.Local)
if err != nil {
return false
}
return true
}