我正在尝试在 golang 中设置单个服务器单个客户端连接,目标是在服务器和客户端之间建立最快的连接从服务器到客户端的速率需要
我正在尝试在 golang 中设置单服务器单客户端连接,目标是在服务器和客户端之间建立最快的连接,从服务器到客户端的速率需要足以以 60 fps 的速度传输全高清视频(压缩帧后约为 30Mbps)
使用以下代码片段时,我的实际传输速率为:以太网约为 11 Mbps,WiFi 约为 6 Mbps
我已经为客户端和服务器设置了多通道视频发送,服务器通道有效地将 WiFi 速度提高了 2 倍,但客户端通道实际上根本没有帮助,这是合乎逻辑的,因为客户端通道使用的代码的性质
以下代码的目标是使用 websocket 以最快的速度从服务器传输到客户端
main.go(服务器)=>
package main
import (
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type Channel struct {
name string
buffer chan []byte
bytesSent int
mu sync.Mutex
}
type Server struct {
clients map[*websocket.Conn]bool
broadcast chan []byte
channels map[string]*Channel
mu sync.Mutex
}
func newServer() *Server {
return &Server{
clients: make(map[*websocket.Conn]bool),
broadcast: make(chan []byte),
channels: make(map[string]*Channel),
}
}
func (s *Server) handleConnections(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Fatalf("Failed to upgrade to WebSocket: %v", err)
}
defer ws.Close()
s.mu.Lock()
s.clients[ws] = true
s.mu.Unlock()
for {
_, msg, err := ws.ReadMessage()
if err != nil {
log.Printf("Error reading message: %v", err)
delete(s.clients, ws)
break
}
s.broadcast <- msg
}
}
func (s *Server) handleMessages() {
for {
msg := <-s.broadcast
for client := range s.clients {
err := client.WriteMessage(websocket.BinaryMessage, msg)
if err != nil {
log.Printf("Error writing message: %v", err)
client.Close()
delete(s.clients, client)
}
}
}
}
func (s *Server) createChannel(name string) {
s.mu.Lock()
defer s.mu.Unlock()
if _, exists := s.channels[name]; !exists {
s.channels[name] = &Channel{name: name, buffer: make(chan []byte, 100)}
}
}
func (s *Server) startChannel(name string) {
s.mu.Lock()
channel, exists := s.channels[name]
s.mu.Unlock()
if !exists {
log.Printf("Channel %s does not exist", name)
return
}
go func() {
for msg := range channel.buffer {
s.broadcast <- append([]byte(name+": "), msg...)
channel.mu.Lock()
channel.bytesSent += len(msg)
channel.mu.Unlock()
}
}()
}
func (s *Server) sendToChannel(name string, msg []byte) {
s.mu.Lock()
channel, exists := s.channels[name]
s.mu.Unlock()
if !exists {
log.Printf("Channel %s does not exist", name)
return
}
channel.buffer <- msg
}
func main() {
server := newServer()
http.HandleFunc("/ws", server.handleConnections)
go server.handleMessages()
numChannels := 10 // Number of channels
for i := 1; i <= numChannels; i++ {
channelName := fmt.Sprintf("channel_%d", i)
server.createChannel(channelName)
server.startChannel(channelName)
}
// Queue for distributing data
dataQueue := make(chan []byte, 1000)
// Generate dummy data
go func() {
dummyData := make([]byte, 512*1024) // 0.5 MB of dummy data
ticker := time.NewTicker(time.Millisecond * 17) // 60 times per second
for {
select {
case <-ticker.C:
dataQueue <- dummyData
}
}
}()
// Distribute data among channels
go func() {
channelId := 1
for data := range dataQueue {
channelName := fmt.Sprintf("channel_%d", channelId)
server.sendToChannel(channelName, data)
channelId++
if channelId > numChannels {
channelId = 1
}
}
}()
// Log the total sent data every second
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
totalSent := 0
for _, channel := range server.channels {
channel.mu.Lock()
totalSent += channel.bytesSent
channel.bytesSent = 0 // reset for the next second
channel.mu.Unlock()
}
log.Printf("Total Sent: %.2f MB/s", float64(totalSent)/1024/1024)
}
}()
log.Println("Server started on :8080")
err := http.ListenAndServe(":8080", nil)
if err != nil {
log.Fatalf("Server failed to start: %v", err)
}
}
main.go (客户端)
package main
import (
"log"
"net/url"
"sync"
"time"
"github.com/gorilla/websocket"
)
type ChannelStats struct {
sync.Mutex
bytesReceived int
}
func readFromWebSocket(u url.URL, messageChan chan []byte, done chan bool) {
u.Path = "/ws"
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
log.Fatalf("Dial error: %v", err)
}
defer c.Close()
for {
_, message, err := c.ReadMessage()
if err != nil {
log.Printf("Read error: %v", err)
done <- true
return
}
messageChan <- message
}
}
func processMessages(stats *ChannelStats, messageChan chan []byte, wg *sync.WaitGroup, done chan bool) {
defer wg.Done()
for {
select {
case message := <-messageChan:
stats.Lock()
stats.bytesReceived += len(message)
stats.Unlock()
case <-done:
return
}
}
}
func main() {
u := url.URL{Scheme: "ws", Host: "192.168.0.86:8080"}
var wg sync.WaitGroup
done := make(chan bool)
messageChan := make(chan []byte, 100) // Buffered channel to hold messages
stats := &ChannelStats{}
numWorkers := 10 // Number of worker goroutines
// Start a single reader goroutine
go readFromWebSocket(u, messageChan, done)
// Start multiple worker goroutines
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go processMessages(stats, messageChan, &wg, done)
}
// Log stats every second
go func() {
for {
time.Sleep(1 * time.Second)
stats.Lock()
totalReceived := stats.bytesReceived
stats.bytesReceived = 0 // reset for the next second
stats.Unlock()
log.Printf("Total Received: %.2f MB/s", float64(totalReceived)/1024/1024)
}
}()
// Wait for all workers to finish
wg.Wait()
close(done)
}
我愿意接受有关用于执行此数据传输的技术/库的建议,请记住目标只是在 golang 中执行最快的数据传输