Skip to content

Commit

Permalink
Inbound
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangpeihao committed May 23, 2013
1 parent 302705a commit 5b8a95c
Show file tree
Hide file tree
Showing 7 changed files with 742 additions and 39 deletions.
19 changes: 13 additions & 6 deletions defines.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ const (

// Result codes
const (
RESULT_CONNECT_OK = "NetConnection.Connect.Success"
NETSTREAM_PLAY_START = "NetStream.Play.Start"
NETSTREAM_PUBLISH_START = "NetStream.Publish.Start"
RESULT_CONNECT_OK = "NetConnection.Connect.Success"
RESULT_CONNECT_REJECTED = "NetConnection.Connect.Rejected"
RESULT_CONNECT_OK_DESC = "Connection successed."
RESULT_CONNECT_REJECTED_DESC = "[ AccessManager.Reject ] : [ code=400 ] : "
NETSTREAM_PLAY_START = "NetStream.Play.Start"
NETSTREAM_PUBLISH_START = "NetStream.Publish.Start"
)

// Chunk stream ID
Expand Down Expand Up @@ -262,9 +265,11 @@ var (
//FLASH_PLAYER_VERSION_STRING = "LNX 10,0,32,18"
FLASH_PLAYER_VERSION_STRING = "LNX 9,0,124,2"
//FLASH_PLAYER_VERSION_STRING = "WIN 11,5,502,146"
SWF_URL_STRING = "http://localhost/1.swf"
PAGE_URL_STRING = "http://localhost/1.html"
MIN_BUFFER_LENGTH = uint32(256)
SWF_URL_STRING = "http://localhost/1.swf"
PAGE_URL_STRING = "http://localhost/1.html"
MIN_BUFFER_LENGTH = uint32(256)
FMS_VERSION = []byte{0x04, 0x05, 0x00, 0x01}
FMS_VERSION_STRING = "4,5,0,297"
)

const (
Expand All @@ -278,6 +283,8 @@ const (
DEFAULT_CAPABILITIES = float64(15)
DEFAULT_AUDIO_CODECS = float64(4071)
DEFAULT_VIDEO_CODECS = float64(252)
FMS_CAPBILITIES = uint32(255)
FMS_MODE = uint32(2)
)

type Writer interface {
Expand Down
96 changes: 96 additions & 0 deletions demo/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package main

import (
"flag"
"fmt"
"github.com/zhangpeihao/gortmp"
"github.com/zhangpeihao/log"
"os"
"os/signal"
"syscall"
)

const (
programName = "RtmpServer"
version = "0.0.1"
)

var (
address *string = flag.String("Address", ":1935", "The address to bind.")
mediaPath *string = flag.String("MediaPath", "./medias", "The media files folder.")
)

var (
status uint
)

type ServerHandler struct{}

// InboundConn handler funcions
func (handler *ServerHandler) OnStatus(ibConn rtmp.InboundConn) {
var err error
status, err = ibConn.Status()
fmt.Printf("@@@@@@@@@@@@@status: %d, err: %v\n", status, err)
}

func (handler *ServerHandler) OnStreamCreated(ibConn rtmp.InboundConn, stream rtmp.InboundStream) {
fmt.Printf("Stream created: %d\n", stream.ID())
}

// Conn handler functions
func (handler *ServerHandler) Closed() {
fmt.Printf("@@@@@@@@@@@@@Closed\n")
}

func (handler *ServerHandler) Received(message *rtmp.Message) {
}

func (handler *ServerHandler) ReceivedCommand(command *rtmp.Command) {
fmt.Printf("ReceviedCommand: %+v\n", command)
}

// Stream handle functions
func (handler *ServerHandler) OnPlayStart(stream rtmp.InboundStream) {
fmt.Printf("OnPlayStart\n")
}
func (handler *ServerHandler) OnPublishStart(stream rtmp.InboundStream) {
fmt.Printf("OnPublishStart\n")
}
func (handler *ServerHandler) OnReceiveAudio(stream rtmp.InboundStream, on bool) {
fmt.Printf("OnReceiveAudio: %b\n", on)
}
func (handler *ServerHandler) OnReceiveVideo(stream rtmp.InboundStream, on bool) {
fmt.Printf("OnReceiveVideo: %b\n", on)
}

// Server handler functions
func (handler *ServerHandler) NewConnection(ibConn rtmp.InboundConn, connectReq *rtmp.Command,
server *rtmp.Server) bool {
fmt.Printf("NewConnection\n")
ibConn.Attach(handler)
return true
}

func main() {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "%s version[%s]\r\nUsage: %s [OPTIONS]\r\n", programName, version, os.Args[0])
flag.PrintDefaults()
}
flag.Parse()

l := log.NewLogger(".", "server", nil, 60, 3600*24, true)
rtmp.InitLogger(l)
defer l.Close()
handler := &ServerHandler{}
server, err := rtmp.NewServer("tcp", *address, handler)
if err != nil {
fmt.Println("NewServer error", err)
os.Exit(-1)
}
defer server.Close()

ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGINT)
sig := <-ch
fmt.Printf("Signal received: %v\n", sig)
}
103 changes: 100 additions & 3 deletions handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/zhangpeihao/log"
"io"
"math/rand"
"net"
Expand Down Expand Up @@ -89,6 +90,15 @@ func CalcDigestPos(buf []byte, offset uint32, mod_val uint32, add_val uint32) (d
return
}

func CalcDHPos(buf []byte, offset uint32, mod_val uint32, add_val uint32) (digest_pos uint32) {
var i uint32
for i = 0; i < 4; i++ {
digest_pos += uint32(buf[i+offset])
}
digest_pos = digest_pos%mod_val + add_val
return
}

func ValidateDigest(buf []byte, offset uint32) uint32 {
digestPos := CalcDigestPos(buf, offset, 728, offset+4)
// Create temp buffer
Expand All @@ -106,7 +116,7 @@ func ValidateDigest(buf []byte, offset uint32) uint32 {
return 0
}

func ImprintWithDigest(buf []byte) uint32 {
func ImprintWithDigest(buf []byte, key []byte) uint32 {
//digestPos := CalcDigestPos(buf, 772, 728, 776)
digestPos := CalcDigestPos(buf, 8, 728, 12)

Expand All @@ -115,7 +125,7 @@ func ImprintWithDigest(buf []byte) uint32 {
tmpBuf.Write(buf[:digestPos])
tmpBuf.Write(buf[digestPos+SHA256_DIGEST_LENGTH:])
// Generate the hash
tempHash, err := HMACsha256(tmpBuf.Bytes(), GENUINE_FP_KEY[:30])
tempHash, err := HMACsha256(tmpBuf.Bytes(), key)
if err != nil {
return 0
}
Expand Down Expand Up @@ -177,7 +187,7 @@ func Handshake(c net.Conn, br *bufio.Reader, bw *bufio.Writer, timeout time.Dura

// TODO: Create the DH public/private key, and use it in encryption mode

clientDigestOffset := ImprintWithDigest(c1)
clientDigestOffset := ImprintWithDigest(c1, GENUINE_FP_KEY[:30])
if clientDigestOffset == 0 {
return errors.New("ImprintWithDigest failed")
}
Expand Down Expand Up @@ -207,6 +217,8 @@ func Handshake(c net.Conn, br *bufio.Reader, bw *bufio.Writer, timeout time.Dura
}
_, err = io.ReadAtLeast(br, s1, RTMP_SIG_SIZE)
CheckError(err, "Handshake Read S1")
logger.ModulePrintf(logHandler, log.LOG_LEVEL_DEBUG,
"Handshake() FMS version is %d.%d.%d.%d", s1[4], s1[5], s1[6], s1[7])
// if s1[4] < 3 {
// return errors.New(fmt.Sprintf("FMS version is %d.%d.%d.%d, unsupported!", s1[4], s1[5], s1[6], s1[7]))
// }
Expand Down Expand Up @@ -262,3 +274,88 @@ func Handshake(c net.Conn, br *bufio.Reader, bw *bufio.Writer, timeout time.Dura

return
}

func SHandshake(c net.Conn, br *bufio.Reader, bw *bufio.Writer, timeout time.Duration) (err error) {
defer func() {
if r := recover(); r != nil {
err = r.(error)
}
}()
// Send S0+S1
err = bw.WriteByte(0x03)
CheckError(err, "SHandshake() Send S0")
s1 := CreateRandomBlock(RTMP_SIG_SIZE)
// Set Timestamp
// binary.BigEndian.PutUint32(s1, uint32(GetTimestamp()))
binary.BigEndian.PutUint32(s1, uint32(0))
// Set FlashPlayer version
for i := 0; i < 4; i++ {
s1[4+i] = FMS_VERSION[i]
}

serverDigestOffset := ImprintWithDigest(s1, GENUINE_FMS_KEY[:36])
if serverDigestOffset == 0 {
return errors.New("ImprintWithDigest failed")
}

_, err = bw.Write(s1)
CheckError(err, "SHandshake() Send S1")
if timeout > 0 {
c.SetWriteDeadline(time.Now().Add(timeout))
}
err = bw.Flush()
CheckError(err, "SHandshake() Flush S0+S1")

// Read C0
if timeout > 0 {
c.SetReadDeadline(time.Now().Add(timeout))
}
c0, err := br.ReadByte()
CheckError(err, "SHandshake() Read C0")
if c0 != 0x03 {
return errors.New(fmt.Sprintf("SHandshake() Got C0: %x", c0))
}

// Read C1
c1 := make([]byte, RTMP_SIG_SIZE)
if timeout > 0 {
c.SetReadDeadline(time.Now().Add(timeout))
}
_, err = io.ReadAtLeast(br, c1, RTMP_SIG_SIZE)
CheckError(err, "SHandshake Read C1")
logger.ModulePrintf(logHandler, log.LOG_LEVEL_DEBUG,
"SHandshake() Flash player version is %d.%d.%d.%d", c1[4], c1[5], c1[6], c1[7])

clientDHOffset := CalcDHPos(c1, 1532, 632, 772)

// Generate S2
digestResp, err := HMACsha256(c1[clientDHOffset:clientDHOffset+SHA256_DIGEST_LENGTH], GENUINE_FMS_KEY)
CheckError(err, "SHandshake Generate S2 HMACsha256 digestResp")

s2 := CreateRandomBlock(RTMP_SIG_SIZE)
signatureResp, err := HMACsha256(s2[:RTMP_SIG_SIZE-SHA256_DIGEST_LENGTH], digestResp)
CheckError(err, "SHandshake Generate S2 HMACsha256 signatureResp")
DumpBuffer("SHandshake signatureResp", signatureResp, 0)
for index, b := range signatureResp {
s2[RTMP_SIG_SIZE-SHA256_DIGEST_LENGTH+index] = b
}

// Send S2
_, err = bw.Write(s2)
CheckError(err, "SHandshake() Send S2")
if timeout > 0 {
c.SetWriteDeadline(time.Now().Add(timeout))
}
err = bw.Flush()
CheckError(err, "SHandshake() Flush S2")

// Read C2
if timeout > 0 {
c.SetReadDeadline(time.Now().Add(timeout))
}
c2 := make([]byte, RTMP_SIG_SIZE)
_, err = io.ReadAtLeast(br, c2, RTMP_SIG_SIZE)
CheckError(err, "SHandshake() Read C2")
// TODO: check C2
return
}
Loading

0 comments on commit 5b8a95c

Please sign in to comment.