嘴太笨了

Go数字孪生

项目背景:企业合作项目。负责设计开发数据采集与指令下发模块,实现车间物理设备与Unity模拟软件的实时状态映射。

这个项目是老师和科大讯飞与农科信创这个公司合作的一个数字化管理系统,这个项目虽然叫数字孪生但是您可以把它当作一个游戏前端+共享文档来看,客户端程序我们用的unity开发的,农科信创提供的了数字资产和传感器,科大讯飞提供了一部分深度学习模型,做的事包括产量预估、病虫害表型鉴定、采摘车的三维图像定位(通过ROS控制)、异常人员检测,这个项目我做的事就基于go做的后端,一个是监听、收集和预处理数据发送到模型端,一个是提供给unity接口支持多个用户去操作边缘设备(调整温度光照,要更改采摘车的采摘顺序),将模型处理后的统计数据、实体状态呈现给前端。

项目难点:根据传感器文档构建指令,指令是通过状态写入协议来进行的;处理多个用户操作同一个对象的冲突问题。

  • 后端主要基于Gin和Zinx开发,连接Unity端收发实时数据,提供用户控制端Web服务接口
  • 使用gRPC对序列化后的对象消息进行流式传输,保证服务器端与Unity端的高效同步
  • 使用MongoDB存储服务器端的统计数据、历史设备指令与数字工厂状态的日志文件提供历史状态查询与回滚操作
  1. gin做的是登陆认证、资产文件上分片上传/拉取功能,存储在mongodb,资产文件用gridFS。以及获取grpc通信的结构体解析成XML的功能,unity前端有一个功能是鼠标移动到实例上就有一个悬浮框显示所有属性
  2. 数据接收模块:传感器都是基于UDP的协议进行上传,频率有高有低,所以对一个传感器go了一个routine,在这个routine里接收解析所需的数据进行解析完,处理成结构体之后,用grpc的stream消息发到GPU服务器上部署的模型来处理。
  3. 用户在unity端的操作和指令下发是用zinx实现的,它是一个TCP长连接框架,然后服务器端去实现温度、土壤、种植区、采摘车这些实体对象与现实状态的同步,(比如控制土壤ph、二氧化碳浓度、控温、光谱开关、采摘车路线修正)包括位置、惯导的方位角,角速度线速度等(用MSO格式封装的报文),在用户端封装的控制指令以bytes的形式用zinx的DataPack封装,封装的控制指令转发到终端设备的service
  4. 如果有多个用户并发要对这个对象操作,在这个处理的goroutine中可能会造成冲突,我们维护了一个对象map,对这个map加锁处理,对于采摘车的话,是单独维护了一个处于三维坐标系下的对象。
  5. 用到channel的场景:一个预测服务中,对发送数据的goroutine是配套的,
  6. 历史状态查询的话,是根据mongo中存储各对象的历史状态,根据时间戳向前端的拉取。
  7. 是做用户在本地对传感器做操作后,连接的控制终端,比如采摘车的路径校准、和毫米波雷达的控制,主要包括DIFOP设备信息输出协议,用户配置写入协议UCWP
  8. 在服务器端连接mongodb,服务器数据收集、统计计算、上传数据库

Go 轻量聊天APP(Go/Kratos/GPT2)

项目角色:唯一贡献者
项目背景:该软件作为当时学习Go语言微服务框架与组件的个人练手项目。聊天以群组的形式进行,将基于GPT2的bot与聊天室服务拆分成微服务,主要包括用户信息服务、群组管理服务、消息传输服务、GPT会话存储服务四个。

  • 使用的微服务框架为Kratos,开发组件和工具主要包括Redis/Go-kit/Swagger
  • 遵循微服务部署流程,微服务使用k8s部署,dlv调试工具,godoc自动化生成文档等

数据库

1
2
3
4
5
6
用户表
用户id,密码,密码哈希值,salt
群组表
群组id, 用户id, 权限值
聊天记录表
key:userid, groupid,timestamp做的索引

缓存

用户信息服务

  • 查看个人/他人信息功能
  • 更改字段功能
  • 增加删除(注销)功能
  • 登陆模块:用户输入用户名、密码
    • 注册时提供密码,系统为用户生成唯一salt
    • 将密码的字符串salt与原字符串拼接,得到hash
    • 数据库存储 hash 和 salt
    • 登陆时将字符串与salt连接后的到hash’,判断hash’ == hash
    • 验证正确后,服务端使用jwt机制,进行API调用(库使用的jwt-go
  • jwt优点
    • 更适用CDN: 可以通过内容分发网络请求你服务端的所有资料
    • Token机制在服务端不需要存储session信息,因为Token 自身包含了所有登录用户的信息,只需要在客户端的cookie或本地介质存储状态信息.
    • 去耦: 不需要绑定到一个特定的身份验证方案。Token可以在任何地方生成,只要在你的API被调用的时候,你可以进行Token生成调用即可.
    • 更适用于移动应用: 当你的客户端是一个原生平台(iOS, Android,Windows 8等)时,Cookie是不被支持的(你需要通过Cookie容器进行处理),这时采用Token认证机制就会简单得多。

群组管理服务

  • 邀请加入用户byID
  • 踢出用户byID

    消息传输服务

  • 群组以聊天室为单位进行,一个聊天室对应一个Pod
  • 主要包括ChatRoomClient和ChatRoomServer
  • ChatRoomClient 主要负责与前端通信和处理与当前用户相关的信息
  • ChatRoomServer 则负责维护所有在线用户信息以及处理消息的分发
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    type ChatRoomClient struct {
    roomId int
    user User
    conn net.Conn //与前端维持的通信连接
    }
    type ChatRoomServer struct {
    clients map[string]*ChatRoomClient // 维护一个连接池
    rooms map[string][]*ChatRoomClient // 用于维护聊天室与客户端的关系
    }
    // SendMessage 方法用于发送消息给聊天室
    func (c *ChatRoomClient) SendMessage(message string) {
    c.server.BroadcastMessage(c.RoomID, c.UserID, c.UserName, message)
    }
    // Disconnect 方法处理客户端断开连接的情况
    func (c *ChatRoomClient) Disconnect() {
    c.server.RemoveClient(c)
    }
    // NewChatRoomServer 方法用于创建一个新的ChatRoomServer对象
    func NewChatRoomServer() *ChatRoomServer {
    return &ChatRoomServer{
    clients: make(map[string]*ChatRoomClient),
    rooms: make(map[string][]*ChatRoomClient),
    }
    }
    // AddClient 方法将一个客户端添加到服务器
    func (s *ChatRoomServer) AddClient(client *ChatRoomClient) {
    s.clients[client.ConnectionID] = client
    }

    // RemoveClient 方法将一个客户端从服务器移除
    func (s *ChatRoomServer) RemoveClient(client *ChatRoomClient) {
    delete(s.clients, client.ConnectionID)
    // 在此处也可以将客户端从对应的聊天室中移除
    }

    // JoinRoom 方法将一个客户端加入到指定聊天室中
    func (s *ChatRoomServer) JoinRoom(client *ChatRoomClient, roomID string) {
    s.rooms[roomID] = append(s.rooms[roomID], client)
    client.RoomID = roomID
    }

    // LeaveRoom 方法将一个客户端从指定聊天室中移除
    func (s *ChatRoomServer) LeaveRoom(client *ChatRoomClient, roomID string) {
    clients := s.rooms[roomID]
    for i, c := range clients {
    if c.ConnectionID == client.ConnectionID {
    // 从聊天室中移除客户端
    s.rooms[roomID] = append(clients[:i], clients[i+1:]...)
    client.RoomID = "" // 将客户端的聊天室ID清空
    break
    }
    }
    }

    // BroadcastMessage 方法向指定聊天室内的所有客户端广播消息
    func (s *ChatRoomServer) BroadcastMessage(roomID, userID, userName, message string) {
    clients := s.rooms[roomID]
    for _, client := range clients {
    go func(client *ChatRoomClient) {
    // 在协程中处理消息发送
    client.SendMessage(userID, userName, message)
    }(client)
    }
    }

消息传输的并发处理和消息分发问题是怎么解决的

消息发送时,一个客户端的sendmessage用一个goroutine进行

  • 可能出现的并发安全问题
    1. map互斥锁 在 ChatRoomServer 中的 rooms 和 clients 字典是共享的数据结构,多个goroutine可能会同时访问或修改这些数据结构,可能导致竞态条件(race condition)或数据不一致的问题。

kratos进行微服务间的通信

这些服务的通信都是一些很轻量的元数据,kratos的用protobuf定义的middleware

gRPC API 进行接口交互,服务架构需要使用统一的元信息(Metadata)传输进行微服务间的传递。 目前 gRPC 中可以携带元信息传递,原理是将元信息放入 HTTP Header 中,这样上游即可收到对应的元信息 信息。 因此在Kratos的设计上,也是通过 HTTP Header 进行传递。在框架中先将元信息包封装成key/value结构,然后携带到 Transport Header 中。

服务注册

创建一个 Registrar(以 consul 为例),将 Registrar 注入进 Kratos 应用实例中,Kratos 会自动完成实例注册和反注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import (
consul "github.com/go-kratos/kratos/contrib/registry/consul/v2"
"github.com/hashicorp/consul/api"
)

// new consul client
client, err := api.NewClient(api.DefaultConfig())
if err != nil {
panic(err)
}
// new reg with consul client
reg := consul.New(client)

app := kratos.New(
// service-name
kratos.Name(Name),
kratos.Version(Version),
kratos.Metadata(map[string]string{}),
kratos.Logger(logger),
kratos.Server(
hs,
gs,
),
// with registrar
kratos.Registrar(reg),
)

用的是kratos的负载均衡器

1
2
3
4
5
6
7
8
9
10
11
12
13
type Selector interface {
// Selector 内部维护的服务节点列表通过 Rebalancer 接口来更新
Rebalancer
// Select nodes
// if err == nil, selected and done must not be empty.
Select(ctx context.Context, opts ...SelectOption)
(selected Node, done DoneFunc, err error)
}

// 通过 Rebalancer 实现服务节点变更感知
type Rebalancer interface {
Apply(nodes []Node)
}

用的默认的wrr算法 Weighted round robin

rpc是什么 远程过程调用 和rmi的区别【RMI相当于RPC的一种实现】

  • RPC (Remote Procedure Call) 采用客户端/服务器方式 (请求/响应),发送请求到服务器端,服务端执行方法后返回结果。 优点是跨语言跨平台,缺点是编译期无法排错。返回的对象由外部数据表示

  • RMI (Remote Method Invocation) 客户端jvm调用服务端jvm的方法,直接获取远端方法的签名,进行调用。优点是强类型、编译期可检查错误;缺点是只限于java语言。返回的对象可以是java支持的所有对象名。

RPC的代理类stub,将消息序列化为RPCrequest/RPCresponse

RMI流程原理

  • 定义远程接口interface Rmiiface extends Remote

  • 实现一个远程接口类MyRemoteObject implements Rmiiface

  • 服务器端注册和查找远程对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class Server {
    public static void main(String[] args) {
    try {
    Rmiiface remoteObject = new MyRemoteObject();
    Registry registry = LocateRegistry.createRegistry(1099); // 默认 RMI 注册表端口号为 1099
    registry.bind("MyRemoteObject", remoteObject);
    System.out.println("Server is ready.");
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }
  • 客户端查找注册表来获取远程方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class Client {
    public static void main(String[] args) {
    try {
    Registry registry = LocateRegistry.getRegistry("localhost", 1099);
    MyRemoteInterface remoteObject = (MyRemoteInterface) registry.lookup("MyRemoteObject");
    String response = remoteObject.sayHello();
    System.out.println(response);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

通过JRMP协议来通信

grpc

基于protobuf序列化协议的通讯协议