diff --git a/protocol/mongo.go b/protocol/mongo.go index ebad146..eced043 100644 --- a/protocol/mongo.go +++ b/protocol/mongo.go @@ -5,33 +5,34 @@ import ( "encoding/binary" "encoding/json" "fmt" + "io" + "github.com/fatih/color" "github.com/kevwan/tproxy/display" "github.com/mongodb/mongo-go-driver/bson" - "io" ) const ( - OP_REPLY = 1 //Reply to a client request. responseTo is set. - OP_UPDATE = 2001 //Update document. - OP_INSERT = 2002 //Insert new document. - RESERVED = 2003 //Formerly used for OP_GET_BY_OID. - OP_QUERY = 2004 //Query a collection. - OP_GET_MORE = 2005 //Get more data from a query. See Cursors. - OP_DELETE = 2006 //Delete documents. - OP_KILL_CURSORS = 2007 //Notify database that the client has finished with the cursor. - OP_COMMAND = 2010 //Cluster internal protocol representing a command request. - OP_COMMANDREPLY = 2011 //Cluster internal protocol representing a reply to an OP_COMMAND. - OP_MSG = 2013 //Send a message using the format introduced in MongoDB 3.6. + OpReply = 1 // Reply to a client request. responseTo is set. + OpUpdate = 2001 // Update document. + OpInsert = 2002 // Insert new document. + Reserved = 2003 // Formerly used for OP_GET_BY_OID. + OpQuery = 2004 // Query a collection. + OpGetMore = 2005 // Get more data from a query. See Cursors. + OpDelete = 2006 // Delete documents. + OpKillCursors = 2007 // Notify database that the client has finished with the cursor. + OpCommand = 2010 // Cluster internal protocol representing a command request. + OpCommandreply = 2011 // Cluster internal protocol representing a reply to an OP_COMMAND. + OpMsg = 2013 // Send a message using the format introduced in MongoDB 3.6. ) type mongoInterop struct { } type packet struct { - IsClientFlow bool //client->server + IsClientFlow bool // client->server MessageLength int - OpCode int //request type + OpCode int // request type Payload io.Reader } @@ -51,14 +52,12 @@ func (mongo *mongoInterop) Dump(r io.Reader, source string, id int, quiet bool) func resolveClientPacket(pk *packet) { var msg string switch pk.OpCode { - case OP_UPDATE: - zero := readInt32(pk.Payload) + case OpUpdate: + _ = readInt32(pk.Payload) fullCollectionName := readString(pk.Payload) - flags := readInt32(pk.Payload) + _ = readInt32(pk.Payload) selector := readBson2Json(pk.Payload) update := readBson2Json(pk.Payload) - _ = zero - _ = flags msg = fmt.Sprintf(" [Update] [coll:%s] %v %v", fullCollectionName, @@ -66,25 +65,21 @@ func resolveClientPacket(pk *packet) { update, ) - case OP_INSERT: - flags := readInt32(pk.Payload) + case OpInsert: + _ = readInt32(pk.Payload) fullCollectionName := readString(pk.Payload) command := readBson2Json(pk.Payload) - _ = flags msg = fmt.Sprintf(" [Insert] [coll:%s] %v", fullCollectionName, command, ) - case OP_QUERY: - flags := readInt32(pk.Payload) + case OpQuery: + _ = readInt32(pk.Payload) fullCollectionName := readString(pk.Payload) - numberToSkip := readInt32(pk.Payload) - numberToReturn := readInt32(pk.Payload) - _ = flags - _ = numberToSkip - _ = numberToReturn + _ = readInt32(pk.Payload) + _ = readInt32(pk.Payload) command := readBson2Json(pk.Payload) selector := readBson2Json(pk.Payload) @@ -95,7 +90,7 @@ func resolveClientPacket(pk *packet) { selector, ) - case OP_COMMAND: + case OpCommand: database := readString(pk.Payload) commandName := readString(pk.Payload) metaData := readBson2Json(pk.Payload) @@ -110,12 +105,11 @@ func resolveClientPacket(pk *packet) { inputDocs, ) - case OP_GET_MORE: - zero := readInt32(pk.Payload) + case OpGetMore: + _ = readInt32(pk.Payload) fullCollectionName := readString(pk.Payload) numberToReturn := readInt32(pk.Payload) cursorId := readInt64(pk.Payload) - _ = zero msg = fmt.Sprintf(" [Query more] [coll:%s] [num of reply:%v] [cursor:%v]", fullCollectionName, @@ -123,20 +117,18 @@ func resolveClientPacket(pk *packet) { cursorId, ) - case OP_DELETE: - zero := readInt32(pk.Payload) + case OpDelete: + _ = readInt32(pk.Payload) fullCollectionName := readString(pk.Payload) - flags := readInt32(pk.Payload) + _ = readInt32(pk.Payload) selector := readBson2Json(pk.Payload) - _ = zero - _ = flags msg = fmt.Sprintf(" [Delete] [coll:%s] %v", fullCollectionName, selector, ) - case OP_MSG: + case OpMsg: return default: return @@ -146,12 +138,12 @@ func resolveClientPacket(pk *packet) { } func newPacket(source string, r io.Reader) *packet { - //read pk + // read pk var pk *packet var err error pk, err = parsePacket(r) - //stream close + // stream close if err == io.EOF { display.PrintlnWithTime(" close") return nil @@ -174,7 +166,7 @@ func parsePacket(r io.Reader) (*packet, error) { var buf bytes.Buffer p := &packet{} - //header + // header header := make([]byte, 16) if _, err := io.ReadFull(r, header); err != nil { return nil, err @@ -239,30 +231,31 @@ func readString(r io.Reader) string { } func readBson2Json(r io.Reader) string { - //read len + // read len docLen := readInt32(r) if docLen == 0 { return "" } - //document []byte + // document []byte docBytes := make([]byte, int(docLen)) binary.LittleEndian.PutUint32(docBytes, uint32(docLen)) if _, err := io.ReadFull(r, docBytes[4:]); err != nil { panic(err) } - //resolve document + // resolve document var bsn bson.M err := bson.Unmarshal(docBytes, &bsn) if err != nil { panic(err) } - //format to Json - jsonStr, err := json.Marshal(bsn) + // format to Json + b, err := json.Marshal(bsn) if err != nil { - return fmt.Sprintf("{\"error\":%s}", err.Error()) + return fmt.Sprintf("{\"error\":%q}", err.Error()) } - return string(jsonStr) + + return string(b) } diff --git a/protocol/redis.go b/protocol/redis.go index 335b33f..7cdfb10 100644 --- a/protocol/redis.go +++ b/protocol/redis.go @@ -2,10 +2,11 @@ package protocol import ( "bufio" - "github.com/kevwan/tproxy/display" "io" "strconv" "strings" + + "github.com/kevwan/tproxy/display" ) type redisInterop struct { diff --git a/tproxy.go b/tproxy.go index a9b39aa..f994190 100644 --- a/tproxy.go +++ b/tproxy.go @@ -16,7 +16,7 @@ func main() { localHost = flag.String("l", "localhost", "Local address to listen on") remote = flag.String("r", "", "Remote address (host:port) to connect") delay = flag.Duration("d", 0, "the delay to relay packets") - protocol = flag.String("t", "", "The type of protocol, currently support grpc") + protocol = flag.String("t", "", "The type of protocol, currently support http2, grpc, redis and mongodb") stat = flag.Bool("s", false, "Enable statistics") quiet = flag.Bool("q", false, "Quiet mode, only prints connection open/close and stats, default false")