This is an automated email from the ASF dual-hosted git repository.
maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 2eefbc3ca refactor(go): Introduce Command interface to improve Go
client request handling (#2737)
2eefbc3ca is described below
commit 2eefbc3ca412c6489d311b65ac34121bb77d889f
Author: Chengxi Luo <[email protected]>
AuthorDate: Mon Mar 2 04:01:35 2026 -0500
refactor(go): Introduce Command interface to improve Go client request
handling (#2737)
---
.../binary_request_serializer.go | 410 ---------------------
.../binary_response_deserializer.go | 8 +-
.../binary_serialization/identifier_serializer.go | 54 ---
.../update_topic_serializer.go | 68 ----
.../go/client/tcp/tcp_access_token_management.go | 8 +-
foreign/go/client/tcp/tcp_clients_management.go | 5 +-
.../go/client/tcp/tcp_consumer_group_management.go | 53 ++-
foreign/go/client/tcp/tcp_messaging.go | 11 +-
foreign/go/client/tcp/tcp_offset_management.go | 9 +-
foreign/go/client/tcp/tcp_partition_management.go | 7 +-
foreign/go/client/tcp/tcp_session_management.go | 10 +-
foreign/go/client/tcp/tcp_stream_management.go | 16 +-
foreign/go/client/tcp/tcp_topic_management.go | 20 +-
foreign/go/client/tcp/tcp_user_management.go | 22 +-
foreign/go/client/tcp/tcp_utilities.go | 4 +-
foreign/go/contracts/access_tokens.go | 44 ++-
foreign/go/contracts/change_password.go | 52 +++
foreign/go/contracts/command.go | 29 +-
foreign/go/contracts/consumer.go | 11 +
foreign/go/contracts/consumer_groups.go | 111 +++++-
.../create_stream.go} | 14 +-
.../create_stream_test.go} | 12 +-
.../create_topic.go} | 48 +--
foreign/go/contracts/create_user.go | 77 ++++
.../delete_stream.go} | 18 +-
.../go/contracts/{command.go => delete_topic.go} | 14 +-
.../go/contracts/{command.go => delete_user.go} | 13 +-
foreign/go/contracts/{command.go => get_stats.go} | 11 +-
foreign/go/contracts/{command.go => get_stream.go} | 13 +-
.../go/contracts/{command.go => get_streams.go} | 11 +-
foreign/go/contracts/{command.go => get_topic.go} | 14 +-
foreign/go/contracts/{command.go => get_topics.go} | 13 +-
foreign/go/contracts/{command.go => get_user.go} | 13 +-
foreign/go/contracts/{command.go => get_users.go} | 11 +-
foreign/go/contracts/identifier.go | 32 ++
.../identifier_test.go} | 25 +-
foreign/go/contracts/login.go | 36 --
foreign/go/contracts/offsets.go | 118 +++++-
foreign/go/contracts/partitions.go | 60 ++-
foreign/go/contracts/{command.go => ping.go} | 11 +-
.../poll_messages.go} | 57 +--
.../poll_messages_test.go} | 23 +-
.../send_messages.go} | 72 ++--
.../send_messages_test.go} | 38 +-
.../session.go} | 53 ++-
.../update_stream.go} | 28 +-
.../update_stream_test.go} | 16 +-
foreign/go/contracts/update_topic.go | 76 ++++
.../update_topic_test.go} | 20 +-
foreign/go/contracts/update_user.go | 85 +++++
foreign/go/contracts/update_user_permissions.go | 63 ++++
foreign/go/contracts/users.go | 118 ++++--
52 files changed, 1244 insertions(+), 921 deletions(-)
diff --git a/foreign/go/binary_serialization/binary_request_serializer.go
b/foreign/go/binary_serialization/binary_request_serializer.go
deleted file mode 100644
index 412aae18c..000000000
--- a/foreign/go/binary_serialization/binary_request_serializer.go
+++ /dev/null
@@ -1,410 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package binaryserialization
-
-import (
- "encoding/binary"
-
- iggcon "github.com/apache/iggy/foreign/go/contracts"
-)
-
-func CreateGroup(request iggcon.CreateConsumerGroupRequest) []byte {
- streamIdBytes := SerializeIdentifier(request.StreamId)
- topicIdBytes := SerializeIdentifier(request.TopicId)
- offset := len(streamIdBytes) + len(topicIdBytes)
- bytes := make([]byte, offset+1+len(request.Name))
- copy(bytes[0:len(streamIdBytes)], streamIdBytes)
- copy(bytes[len(streamIdBytes):offset], topicIdBytes)
- bytes[offset] = byte(len(request.Name))
- copy(bytes[offset+1:], request.Name)
- return bytes
-}
-
-func UpdateOffset(request iggcon.StoreConsumerOffsetRequest) []byte {
- hasPartition := byte(0)
- var partition uint32 = 0
- if request.PartitionId != nil {
- hasPartition = 1
- partition = *request.PartitionId
- }
- consumerBytes := SerializeConsumer(request.Consumer)
- streamIdBytes := SerializeIdentifier(request.StreamId)
- topicIdBytes := SerializeIdentifier(request.TopicId)
- // consumer + stream_id + topic_id + hasPartition(1) + partition(4) +
offset(8)
- bytes := make([]byte,
len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+13)
- position := 0
- copy(bytes[position:], consumerBytes)
- position += len(consumerBytes)
- copy(bytes[position:], streamIdBytes)
- position += len(streamIdBytes)
- copy(bytes[position:], topicIdBytes)
- position += len(topicIdBytes)
- bytes[position] = hasPartition
- binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition)
- binary.LittleEndian.PutUint64(bytes[position+5:position+13],
uint64(request.Offset))
- return bytes
-}
-
-func GetOffset(request iggcon.GetConsumerOffsetRequest) []byte {
- hasPartition := byte(0)
- var partition uint32 = 0
- if request.PartitionId != nil {
- hasPartition = 1
- partition = *request.PartitionId
- }
- consumerBytes := SerializeConsumer(request.Consumer)
- streamIdBytes := SerializeIdentifier(request.StreamId)
- topicIdBytes := SerializeIdentifier(request.TopicId)
- // consumer + stream_id + topic_id + hasPartition(1) + partition(4)
- bytes := make([]byte,
len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+5)
- position := 0
- copy(bytes[position:], consumerBytes)
- position += len(consumerBytes)
- copy(bytes[position:], streamIdBytes)
- position += len(streamIdBytes)
- copy(bytes[position:], topicIdBytes)
- position += len(topicIdBytes)
- bytes[position] = hasPartition
- binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition)
- return bytes
-}
-
-func DeleteOffset(request iggcon.DeleteConsumerOffsetRequest) []byte {
- hasPartition := byte(0)
- var partition uint32 = 0
- if request.PartitionId != nil {
- hasPartition = 1
- partition = *request.PartitionId
- }
- consumerBytes := SerializeConsumer(request.Consumer)
- streamIdBytes := SerializeIdentifier(request.StreamId)
- topicIdBytes := SerializeIdentifier(request.TopicId)
- // consumer + stream_id + topic_id + hasPartition(1) + partition(4)
- bytes := make([]byte,
len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+5)
- position := 0
- copy(bytes[position:], consumerBytes)
- position += len(consumerBytes)
- copy(bytes[position:], streamIdBytes)
- position += len(streamIdBytes)
- copy(bytes[position:], topicIdBytes)
- position += len(topicIdBytes)
- bytes[position] = hasPartition
- binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition)
- return bytes
-}
-
-func CreatePartitions(request iggcon.CreatePartitionsRequest) []byte {
- streamIdBytes := SerializeIdentifier(request.StreamId)
- topicIdBytes := SerializeIdentifier(request.TopicId)
- bytes := make([]byte, len(streamIdBytes)+len(topicIdBytes)+4)
- position := 0
- copy(bytes[position:], streamIdBytes)
- position += len(streamIdBytes)
- copy(bytes[position:], topicIdBytes)
- position += len(topicIdBytes)
- binary.LittleEndian.PutUint32(bytes[position:position+4],
uint32(request.PartitionsCount))
-
- return bytes
-}
-
-func DeletePartitions(request iggcon.DeletePartitionsRequest) []byte {
- streamIdBytes := SerializeIdentifier(request.StreamId)
- topicIdBytes := SerializeIdentifier(request.TopicId)
- bytes := make([]byte, len(streamIdBytes)+len(topicIdBytes)+4)
- position := 0
- copy(bytes[position:], streamIdBytes)
- position += len(streamIdBytes)
- copy(bytes[position:], topicIdBytes)
- position += len(topicIdBytes)
- binary.LittleEndian.PutUint32(bytes[position:position+4],
uint32(request.PartitionsCount))
-
- return bytes
-}
-
-//USERS
-
-func SerializeCreateUserRequest(request iggcon.CreateUserRequest) []byte {
- capacity := 4 + len(request.Username) + len(request.Password)
- if request.Permissions != nil {
- capacity += 1 + 4 +
CalculatePermissionsSize(request.Permissions)
- }
-
- bytes := make([]byte, capacity)
- position := 0
-
- bytes[position] = byte(len(request.Username))
- position += 1
- copy(bytes[position:position+len(request.Username)],
[]byte(request.Username))
- position += len(request.Username)
-
- bytes[position] = byte(len(request.Password))
- position += 1
- copy(bytes[position:position+len(request.Password)],
[]byte(request.Password))
- position += len(request.Password)
-
- statusByte := byte(0)
- switch request.Status {
- case iggcon.Active:
- statusByte = byte(1)
- case iggcon.Inactive:
- statusByte = byte(2)
- }
- bytes[position] = statusByte
- position += 1
-
- if request.Permissions != nil {
- bytes[position] = byte(1)
- position += 1
- permissionsBytes := GetBytesFromPermissions(request.Permissions)
- binary.LittleEndian.PutUint32(bytes[position:position+4],
uint32(len(permissionsBytes)))
- position += 4
- copy(bytes[position:position+len(permissionsBytes)],
permissionsBytes)
- } else {
- bytes[position] = byte(0)
- }
-
- return bytes
-}
-
-func GetBytesFromPermissions(data *iggcon.Permissions) []byte {
- size := CalculatePermissionsSize(data)
- bytes := make([]byte, size)
-
- bytes[0] = boolToByte(data.Global.ManageServers)
- bytes[1] = boolToByte(data.Global.ReadServers)
- bytes[2] = boolToByte(data.Global.ManageUsers)
- bytes[3] = boolToByte(data.Global.ReadUsers)
- bytes[4] = boolToByte(data.Global.ManageStreams)
- bytes[5] = boolToByte(data.Global.ReadStreams)
- bytes[6] = boolToByte(data.Global.ManageTopics)
- bytes[7] = boolToByte(data.Global.ReadTopics)
- bytes[8] = boolToByte(data.Global.PollMessages)
- bytes[9] = boolToByte(data.Global.SendMessages)
-
- position := 10
-
- if data.Streams != nil {
- bytes[position] = byte(1)
- position += 1
-
- for streamID, stream := range data.Streams {
-
binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(streamID))
- position += 4
-
- bytes[position] = boolToByte(stream.ManageStream)
- bytes[position+1] = boolToByte(stream.ReadStream)
- bytes[position+2] = boolToByte(stream.ManageTopics)
- bytes[position+3] = boolToByte(stream.ReadTopics)
- bytes[position+4] = boolToByte(stream.PollMessages)
- bytes[position+5] = boolToByte(stream.SendMessages)
- position += 6
-
- if stream.Topics != nil {
- bytes[position] = byte(1)
- position += 1
-
- for topicID, topic := range stream.Topics {
-
binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(topicID))
- position += 4
-
- bytes[position] =
boolToByte(topic.ManageTopic)
- bytes[position+1] =
boolToByte(topic.ReadTopic)
- bytes[position+2] =
boolToByte(topic.PollMessages)
- bytes[position+3] =
boolToByte(topic.SendMessages)
- position += 4
-
- bytes[position] = byte(0)
- position += 1
- }
- } else {
- bytes[position] = byte(0)
- position += 1
- }
- }
- } else {
- bytes[0] = byte(0)
- }
-
- return bytes
-}
-
-func CalculatePermissionsSize(data *iggcon.Permissions) int {
- size := 10
-
- if data.Streams != nil {
- size += 1
-
- for _, stream := range data.Streams {
- size += 4
- size += 6
- size += 1
-
- if stream.Topics != nil {
- size += 1
- size += len(stream.Topics) * 9
- } else {
- size += 1
- }
- }
- } else {
- size += 1
- }
-
- return size
-}
-
-func boolToByte(b bool) byte {
- if b {
- return 1
- }
- return 0
-}
-
-func SerializeUpdateUser(request iggcon.UpdateUserRequest) []byte {
- userIdBytes := SerializeIdentifier(request.UserID)
- length := len(userIdBytes)
-
- if request.Username == nil {
- request.Username = new(string)
- }
-
- username := *request.Username
-
- if len(username) != 0 {
- length += 2 + len(username)
- }
-
- if request.Status != nil {
- length += 2
- }
-
- bytes := make([]byte, length+1)
- position := 0
-
- copy(bytes[position:position+len(userIdBytes)], userIdBytes)
- position += len(userIdBytes)
-
- if len(username) != 0 {
- bytes[position] = 1
- position++
- bytes[position] = byte(len(username))
- position++
- copy(bytes[position:position+len(username)], username)
- position += len(username)
- } else {
- bytes[position] = 0
- position++
- }
-
- if request.Status != nil {
- bytes[position] = 1
- position++
- statusByte := byte(0)
- switch *request.Status {
- case iggcon.Active:
- statusByte = 1
- case iggcon.Inactive:
- statusByte = 2
- }
- bytes[position] = statusByte
- } else {
- bytes[position] = 0
- }
-
- return bytes
-}
-
-func SerializeChangePasswordRequest(request iggcon.ChangePasswordRequest)
[]byte {
- userIdBytes := SerializeIdentifier(request.UserID)
- length := len(userIdBytes) + len(request.CurrentPassword) +
len(request.NewPassword) + 2
- bytes := make([]byte, length)
- position := 0
-
- copy(bytes[position:position+len(userIdBytes)], userIdBytes)
- position += len(userIdBytes)
-
- bytes[position] = byte(len(request.CurrentPassword))
- position++
- copy(bytes[position:position+len(request.CurrentPassword)],
[]byte(request.CurrentPassword))
- position += len(request.CurrentPassword)
-
- bytes[position] = byte(len(request.NewPassword))
- position++
- copy(bytes[position:position+len(request.NewPassword)],
[]byte(request.NewPassword))
-
- return bytes
-}
-
-func SerializeUpdateUserPermissionsRequest(request
iggcon.UpdatePermissionsRequest) []byte {
- userIdBytes := SerializeIdentifier(request.UserID)
- length := len(userIdBytes)
-
- if request.Permissions != nil {
- length += 1 + 4 + CalculatePermissionsSize(request.Permissions)
- }
-
- bytes := make([]byte, length)
- position := 0
-
- copy(bytes[position:position+len(userIdBytes)], userIdBytes)
- position += len(userIdBytes)
-
- if request.Permissions != nil {
- bytes[position] = 1
- position++
- permissionsBytes := GetBytesFromPermissions(request.Permissions)
- binary.LittleEndian.PutUint32(bytes[position:position+4],
uint32(len(permissionsBytes)))
- position += 4
- copy(bytes[position:position+len(permissionsBytes)],
permissionsBytes)
- } else {
- bytes[position] = 0
- }
-
- return bytes
-}
-
-func SerializeUint32(value uint32) []byte {
- bytes := make([]byte, 4)
- binary.LittleEndian.PutUint32(bytes, value)
- return bytes
-}
-
-func SerializeLoginWithPersonalAccessToken(request
iggcon.LoginWithPersonalAccessTokenRequest) []byte {
- length := 1 + len(request.Token)
- bytes := make([]byte, length)
- bytes[0] = byte(len(request.Token))
- copy(bytes[1:], []byte(request.Token))
- return bytes
-}
-
-func SerializeDeletePersonalAccessToken(request
iggcon.DeletePersonalAccessTokenRequest) []byte {
- length := 1 + len(request.Name)
- bytes := make([]byte, length)
- bytes[0] = byte(len(request.Name))
- copy(bytes[1:], []byte(request.Name))
- return bytes
-}
-
-func SerializeCreatePersonalAccessToken(request
iggcon.CreatePersonalAccessTokenRequest) []byte {
- length := 1 + len(request.Name) + 8
- bytes := make([]byte, length)
- bytes[0] = byte(len(request.Name))
- copy(bytes[1:], []byte(request.Name))
- binary.LittleEndian.PutUint32(bytes[len(bytes)-4:], request.Expiry)
- return bytes
-}
diff --git a/foreign/go/binary_serialization/binary_response_deserializer.go
b/foreign/go/binary_serialization/binary_response_deserializer.go
index 033de643b..81bd35153 100644
--- a/foreign/go/binary_serialization/binary_response_deserializer.go
+++ b/foreign/go/binary_serialization/binary_response_deserializer.go
@@ -545,12 +545,12 @@ func DeserializeClient(payload []byte)
*iggcon.ClientInfoDetails {
for i := uint32(0); i < clientInfo.ConsumerGroupsCount; i++ {
streamId := binary.LittleEndian.Uint32(payload[position
: position+4])
topicId :=
binary.LittleEndian.Uint32(payload[position+4 : position+8])
- consumerGroupId :=
binary.LittleEndian.Uint32(payload[position+8 : position+12])
+ groupId :=
binary.LittleEndian.Uint32(payload[position+8 : position+12])
consumerGroup := iggcon.ConsumerGroupInfo{
- StreamId: streamId,
- TopicId: topicId,
- ConsumerGroupId: consumerGroupId,
+ StreamId: streamId,
+ TopicId: topicId,
+ GroupId: groupId,
}
consumerGroups = append(consumerGroups, consumerGroup)
position += 12
diff --git a/foreign/go/binary_serialization/identifier_serializer.go
b/foreign/go/binary_serialization/identifier_serializer.go
deleted file mode 100644
index 91b1efee2..000000000
--- a/foreign/go/binary_serialization/identifier_serializer.go
+++ /dev/null
@@ -1,54 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package binaryserialization
-
-import (
- iggcon "github.com/apache/iggy/foreign/go/contracts"
-)
-
-func SerializeIdentifier(identifier iggcon.Identifier) []byte {
- bytes := make([]byte, identifier.Length+2)
- bytes[0] = byte(identifier.Kind)
- bytes[1] = byte(identifier.Length)
- copy(bytes[2:], identifier.Value)
- return bytes
-}
-
-func SerializeIdentifiers(identifiers ...iggcon.Identifier) []byte {
- size := 0
- for i := 0; i < len(identifiers); i++ {
- size += 2 + identifiers[i].Length
- }
- bytes := make([]byte, size)
- position := 0
-
- for i := 0; i < len(identifiers); i++ {
- copy(bytes[position:position+2+identifiers[i].Length],
SerializeIdentifier(identifiers[i]))
- position += 2 + identifiers[i].Length
- }
-
- return bytes
-}
-
-func SerializePartitioning(partitioning iggcon.Partitioning) []byte {
- bytes := make([]byte, 2+partitioning.Length)
- bytes[0] = byte(partitioning.Kind)
- bytes[1] = byte(partitioning.Length)
- copy(bytes[2:], partitioning.Value)
- return bytes
-}
diff --git a/foreign/go/binary_serialization/update_topic_serializer.go
b/foreign/go/binary_serialization/update_topic_serializer.go
deleted file mode 100644
index 2005cb080..000000000
--- a/foreign/go/binary_serialization/update_topic_serializer.go
+++ /dev/null
@@ -1,68 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package binaryserialization
-
-import (
- "encoding/binary"
-
- iggcon "github.com/apache/iggy/foreign/go/contracts"
-)
-
-type TcpUpdateTopicRequest struct {
- StreamId iggcon.Identifier `json:"streamId"`
- TopicId iggcon.Identifier `json:"topicId"`
- CompressionAlgorithm iggcon.CompressionAlgorithm
`json:"compressionAlgorithm"`
- MessageExpiry iggcon.Duration `json:"messageExpiry"`
- MaxTopicSize uint64 `json:"maxTopicSize"`
- ReplicationFactor *uint8
`json:"replicationFactor"`
- Name string `json:"name"`
-}
-
-func (request *TcpUpdateTopicRequest) Serialize() []byte {
- if request.ReplicationFactor == nil {
- request.ReplicationFactor = new(uint8)
- }
- streamIdBytes := SerializeIdentifier(request.StreamId)
- topicIdBytes := SerializeIdentifier(request.TopicId)
-
- buffer := make([]byte,
19+len(streamIdBytes)+len(topicIdBytes)+len(request.Name))
-
- offset := 0
-
- offset += copy(buffer[offset:], streamIdBytes)
- offset += copy(buffer[offset:], topicIdBytes)
-
- buffer[offset] = byte(request.CompressionAlgorithm)
- offset++
-
- binary.LittleEndian.PutUint64(buffer[offset:],
uint64(request.MessageExpiry))
- offset += 8
-
- binary.LittleEndian.PutUint64(buffer[offset:], request.MaxTopicSize)
- offset += 8
-
- buffer[offset] = *request.ReplicationFactor
- offset++
-
- buffer[offset] = uint8(len(request.Name))
- offset++
-
- copy(buffer[offset:], request.Name)
-
- return buffer
-}
diff --git a/foreign/go/client/tcp/tcp_access_token_management.go
b/foreign/go/client/tcp/tcp_access_token_management.go
index 84f67a498..fdb487644 100644
--- a/foreign/go/client/tcp/tcp_access_token_management.go
+++ b/foreign/go/client/tcp/tcp_access_token_management.go
@@ -23,11 +23,10 @@ import (
)
func (c *IggyTcpClient) CreatePersonalAccessToken(name string, expiry uint32)
(*iggcon.RawPersonalAccessToken, error) {
- message :=
binaryserialization.SerializeCreatePersonalAccessToken(iggcon.CreatePersonalAccessTokenRequest{
+ buffer, err := c.do(&iggcon.CreatePersonalAccessToken{
Name: name,
Expiry: expiry,
})
- buffer, err := c.sendAndFetchResponse(message,
iggcon.CreateAccessTokenCode)
if err != nil {
return nil, err
}
@@ -36,15 +35,14 @@ func (c *IggyTcpClient) CreatePersonalAccessToken(name
string, expiry uint32) (*
}
func (c *IggyTcpClient) DeletePersonalAccessToken(name string) error {
- message :=
binaryserialization.SerializeDeletePersonalAccessToken(iggcon.DeletePersonalAccessTokenRequest{
+ _, err := c.do(&iggcon.DeletePersonalAccessToken{
Name: name,
})
- _, err := c.sendAndFetchResponse(message, iggcon.DeleteAccessTokenCode)
return err
}
func (c *IggyTcpClient) GetPersonalAccessTokens()
([]iggcon.PersonalAccessTokenInfo, error) {
- buffer, err := c.sendAndFetchResponse([]byte{},
iggcon.GetAccessTokensCode)
+ buffer, err := c.do(&iggcon.GetPersonalAccessTokens{})
if err != nil {
return nil, err
}
diff --git a/foreign/go/client/tcp/tcp_clients_management.go
b/foreign/go/client/tcp/tcp_clients_management.go
index ea0dddd8d..cb93d66e8 100644
--- a/foreign/go/client/tcp/tcp_clients_management.go
+++ b/foreign/go/client/tcp/tcp_clients_management.go
@@ -23,7 +23,7 @@ import (
)
func (c *IggyTcpClient) GetClients() ([]iggcon.ClientInfo, error) {
- buffer, err := c.sendAndFetchResponse([]byte{}, iggcon.GetClientsCode)
+ buffer, err := c.do(&iggcon.GetClients{})
if err != nil {
return nil, err
}
@@ -32,8 +32,7 @@ func (c *IggyTcpClient) GetClients() ([]iggcon.ClientInfo,
error) {
}
func (c *IggyTcpClient) GetClient(clientId uint32) (*iggcon.ClientInfoDetails,
error) {
- message := binaryserialization.SerializeUint32(clientId)
- buffer, err := c.sendAndFetchResponse(message, iggcon.GetClientCode)
+ buffer, err := c.do(&iggcon.GetClient{ClientID: clientId})
if err != nil {
return nil, err
}
diff --git a/foreign/go/client/tcp/tcp_consumer_group_management.go
b/foreign/go/client/tcp/tcp_consumer_group_management.go
index 83a85a667..deec1fe91 100644
--- a/foreign/go/client/tcp/tcp_consumer_group_management.go
+++ b/foreign/go/client/tcp/tcp_consumer_group_management.go
@@ -24,8 +24,10 @@ import (
)
func (c *IggyTcpClient) GetConsumerGroups(streamId, topicId iggcon.Identifier)
([]iggcon.ConsumerGroup, error) {
- message := binaryserialization.SerializeIdentifiers(streamId, topicId)
- buffer, err := c.sendAndFetchResponse(message, iggcon.GetGroupsCode)
+ buffer, err := c.do(&iggcon.GetConsumerGroups{
+ StreamId: streamId,
+ TopicId: topicId,
+ })
if err != nil {
return nil, err
}
@@ -34,8 +36,13 @@ func (c *IggyTcpClient) GetConsumerGroups(streamId, topicId
iggcon.Identifier) (
}
func (c *IggyTcpClient) GetConsumerGroup(streamId, topicId, groupId
iggcon.Identifier) (*iggcon.ConsumerGroupDetails, error) {
- message := binaryserialization.SerializeIdentifiers(streamId, topicId,
groupId)
- buffer, err := c.sendAndFetchResponse(message, iggcon.GetGroupCode)
+ buffer, err := c.do(&iggcon.GetConsumerGroup{
+ TopicPath: iggcon.TopicPath{
+ StreamId: streamId,
+ TopicId: topicId,
+ },
+ GroupId: groupId,
+ })
if err != nil {
return nil, err
}
@@ -51,12 +58,13 @@ func (c *IggyTcpClient) CreateConsumerGroup(streamId
iggcon.Identifier, topicId
if MaxStringLength < len(name) || len(name) == 0 {
return nil, ierror.ErrInvalidConsumerGroupName
}
- message :=
binaryserialization.CreateGroup(iggcon.CreateConsumerGroupRequest{
- StreamId: streamId,
- TopicId: topicId,
- Name: name,
+ buffer, err := c.do(&iggcon.CreateConsumerGroup{
+ TopicPath: iggcon.TopicPath{
+ StreamId: streamId,
+ TopicId: topicId,
+ },
+ Name: name,
})
- buffer, err := c.sendAndFetchResponse(message, iggcon.CreateGroupCode)
if err != nil {
return nil, err
}
@@ -65,19 +73,34 @@ func (c *IggyTcpClient) CreateConsumerGroup(streamId
iggcon.Identifier, topicId
}
func (c *IggyTcpClient) DeleteConsumerGroup(streamId iggcon.Identifier,
topicId iggcon.Identifier, groupId iggcon.Identifier) error {
- message := binaryserialization.SerializeIdentifiers(streamId, topicId,
groupId)
- _, err := c.sendAndFetchResponse(message, iggcon.DeleteGroupCode)
+ _, err := c.do(&iggcon.DeleteConsumerGroup{
+ TopicPath: iggcon.TopicPath{
+ StreamId: streamId,
+ TopicId: topicId,
+ },
+ GroupId: groupId,
+ })
return err
}
func (c *IggyTcpClient) JoinConsumerGroup(streamId iggcon.Identifier, topicId
iggcon.Identifier, groupId iggcon.Identifier) error {
- message := binaryserialization.SerializeIdentifiers(streamId, topicId,
groupId)
- _, err := c.sendAndFetchResponse(message, iggcon.JoinGroupCode)
+ _, err := c.do(&iggcon.JoinConsumerGroup{
+ TopicPath: iggcon.TopicPath{
+ StreamId: streamId,
+ TopicId: topicId,
+ },
+ GroupId: groupId,
+ })
return err
}
func (c *IggyTcpClient) LeaveConsumerGroup(streamId iggcon.Identifier, topicId
iggcon.Identifier, groupId iggcon.Identifier) error {
- message := binaryserialization.SerializeIdentifiers(streamId, topicId,
groupId)
- _, err := c.sendAndFetchResponse(message, iggcon.LeaveGroupCode)
+ _, err := c.do(&iggcon.LeaveConsumerGroup{
+ TopicPath: iggcon.TopicPath{
+ StreamId: streamId,
+ TopicId: topicId,
+ },
+ GroupId: groupId,
+ })
return err
}
diff --git a/foreign/go/client/tcp/tcp_messaging.go
b/foreign/go/client/tcp/tcp_messaging.go
index 69a301019..22a5d008b 100644
--- a/foreign/go/client/tcp/tcp_messaging.go
+++ b/foreign/go/client/tcp/tcp_messaging.go
@@ -36,13 +36,13 @@ func (c *IggyTcpClient) SendMessages(
if len(messages) == 0 {
return ierror.ErrInvalidMessagesCount
}
- serializedRequest := binaryserialization.TcpSendMessagesRequest{
+ _, err := c.do(&iggcon.SendMessages{
+ Compression: c.MessageCompression,
StreamId: streamId,
TopicId: topicId,
Partitioning: partitioning,
Messages: messages,
- }
- _, err :=
c.sendAndFetchResponse(serializedRequest.Serialize(c.MessageCompression),
iggcon.SendMessagesCode)
+ })
return err
}
@@ -55,7 +55,7 @@ func (c *IggyTcpClient) PollMessages(
autoCommit bool,
partitionId *uint32,
) (*iggcon.PolledMessage, error) {
- serializedRequest := binaryserialization.TcpFetchMessagesRequest{
+ buffer, err := c.do(&iggcon.PollMessages{
StreamId: streamId,
TopicId: topicId,
Consumer: consumer,
@@ -63,8 +63,7 @@ func (c *IggyTcpClient) PollMessages(
Strategy: strategy,
Count: count,
PartitionId: partitionId,
- }
- buffer, err := c.sendAndFetchResponse(serializedRequest.Serialize(),
iggcon.PollMessagesCode)
+ })
if err != nil {
return nil, err
}
diff --git a/foreign/go/client/tcp/tcp_offset_management.go
b/foreign/go/client/tcp/tcp_offset_management.go
index 0c87bd1fc..cec3c2c89 100644
--- a/foreign/go/client/tcp/tcp_offset_management.go
+++ b/foreign/go/client/tcp/tcp_offset_management.go
@@ -23,13 +23,12 @@ import (
)
func (c *IggyTcpClient) GetConsumerOffset(consumer iggcon.Consumer, streamId
iggcon.Identifier, topicId iggcon.Identifier, partitionId *uint32)
(*iggcon.ConsumerOffsetInfo, error) {
- message :=
binaryserialization.GetOffset(iggcon.GetConsumerOffsetRequest{
+ buffer, err := c.do(&iggcon.GetConsumerOffset{
StreamId: streamId,
TopicId: topicId,
Consumer: consumer,
PartitionId: partitionId,
})
- buffer, err := c.sendAndFetchResponse(message, iggcon.GetOffsetCode)
if err != nil {
return nil, err
}
@@ -38,24 +37,22 @@ func (c *IggyTcpClient) GetConsumerOffset(consumer
iggcon.Consumer, streamId igg
}
func (c *IggyTcpClient) StoreConsumerOffset(consumer iggcon.Consumer, streamId
iggcon.Identifier, topicId iggcon.Identifier, offset uint64, partitionId
*uint32) error {
- message :=
binaryserialization.UpdateOffset(iggcon.StoreConsumerOffsetRequest{
+ _, err := c.do(&iggcon.StoreConsumerOffsetRequest{
StreamId: streamId,
TopicId: topicId,
Offset: offset,
Consumer: consumer,
PartitionId: partitionId,
})
- _, err := c.sendAndFetchResponse(message, iggcon.StoreOffsetCode)
return err
}
func (c *IggyTcpClient) DeleteConsumerOffset(consumer iggcon.Consumer,
streamId iggcon.Identifier, topicId iggcon.Identifier, partitionId *uint32)
error {
- message :=
binaryserialization.DeleteOffset(iggcon.DeleteConsumerOffsetRequest{
+ _, err := c.do(&iggcon.DeleteConsumerOffset{
Consumer: consumer,
StreamId: streamId,
TopicId: topicId,
PartitionId: partitionId,
})
- _, err := c.sendAndFetchResponse(message,
iggcon.DeleteConsumerOffsetCode)
return err
}
diff --git a/foreign/go/client/tcp/tcp_partition_management.go
b/foreign/go/client/tcp/tcp_partition_management.go
index 0a910240d..5ed117164 100644
--- a/foreign/go/client/tcp/tcp_partition_management.go
+++ b/foreign/go/client/tcp/tcp_partition_management.go
@@ -18,26 +18,23 @@
package tcp
import (
- binaryserialization
"github.com/apache/iggy/foreign/go/binary_serialization"
iggcon "github.com/apache/iggy/foreign/go/contracts"
)
func (c *IggyTcpClient) CreatePartitions(streamId iggcon.Identifier, topicId
iggcon.Identifier, partitionsCount uint32) error {
- message :=
binaryserialization.CreatePartitions(iggcon.CreatePartitionsRequest{
+ _, err := c.do(&iggcon.CreatePartitions{
StreamId: streamId,
TopicId: topicId,
PartitionsCount: partitionsCount,
})
- _, err := c.sendAndFetchResponse(message, iggcon.CreatePartitionsCode)
return err
}
func (c *IggyTcpClient) DeletePartitions(streamId iggcon.Identifier, topicId
iggcon.Identifier, partitionsCount uint32) error {
- message :=
binaryserialization.DeletePartitions(iggcon.DeletePartitionsRequest{
+ _, err := c.do(&iggcon.DeletePartitions{
StreamId: streamId,
TopicId: topicId,
PartitionsCount: partitionsCount,
})
- _, err := c.sendAndFetchResponse(message, iggcon.DeletePartitionsCode)
return err
}
diff --git a/foreign/go/client/tcp/tcp_session_management.go
b/foreign/go/client/tcp/tcp_session_management.go
index e446ad40f..8b32b522e 100644
--- a/foreign/go/client/tcp/tcp_session_management.go
+++ b/foreign/go/client/tcp/tcp_session_management.go
@@ -27,11 +27,10 @@ import (
)
func (c *IggyTcpClient) LoginUser(username string, password string)
(*iggcon.IdentityInfo, error) {
- serializedRequest := binaryserialization.TcpLogInRequest{
+ buffer, err := c.do(&iggcon.LoginUser{
Username: username,
Password: password,
- }
- buffer, err := c.sendAndFetchResponse(serializedRequest.Serialize(),
iggcon.LoginUserCode)
+ })
if err != nil {
return nil, err
}
@@ -51,10 +50,9 @@ func (c *IggyTcpClient) LoginUser(username string, password
string) (*iggcon.Ide
}
func (c *IggyTcpClient) LoginWithPersonalAccessToken(token string)
(*iggcon.IdentityInfo, error) {
- message :=
binaryserialization.SerializeLoginWithPersonalAccessToken(iggcon.LoginWithPersonalAccessTokenRequest{
+ buffer, err := c.do(&iggcon.LoginWithPersonalAccessToken{
Token: token,
})
- buffer, err := c.sendAndFetchResponse(message,
iggcon.LoginWithAccessTokenCode)
if err != nil {
return nil, err
}
@@ -74,7 +72,7 @@ func (c *IggyTcpClient) LoginWithPersonalAccessToken(token
string) (*iggcon.Iden
}
func (c *IggyTcpClient) LogoutUser() error {
- _, err := c.sendAndFetchResponse([]byte{}, iggcon.LogoutUserCode)
+ _, err := c.do(&iggcon.LogoutUser{})
return err
}
diff --git a/foreign/go/client/tcp/tcp_stream_management.go
b/foreign/go/client/tcp/tcp_stream_management.go
index e8ab8711a..acc5b6202 100644
--- a/foreign/go/client/tcp/tcp_stream_management.go
+++ b/foreign/go/client/tcp/tcp_stream_management.go
@@ -24,7 +24,7 @@ import (
)
func (c *IggyTcpClient) GetStreams() ([]iggcon.Stream, error) {
- buffer, err := c.sendAndFetchResponse([]byte{}, iggcon.GetStreamsCode)
+ buffer, err := c.do(&iggcon.GetStreams{})
if err != nil {
return nil, err
}
@@ -33,8 +33,9 @@ func (c *IggyTcpClient) GetStreams() ([]iggcon.Stream, error)
{
}
func (c *IggyTcpClient) GetStream(streamId iggcon.Identifier)
(*iggcon.StreamDetails, error) {
- message := binaryserialization.SerializeIdentifier(streamId)
- buffer, err := c.sendAndFetchResponse(message, iggcon.GetStreamCode)
+ buffer, err := c.do(&iggcon.GetStream{
+ StreamId: streamId,
+ })
if err != nil {
return nil, err
}
@@ -54,8 +55,7 @@ func (c *IggyTcpClient) CreateStream(name string)
(*iggcon.StreamDetails, error)
if len(name) == 0 || MaxStringLength < len(name) {
return nil, ierror.ErrInvalidStreamName
}
- serializedRequest := binaryserialization.TcpCreateStreamRequest{Name:
name}
- buffer, err := c.sendAndFetchResponse(serializedRequest.Serialize(),
iggcon.CreateStreamCode)
+ buffer, err := c.do(&iggcon.CreateStream{Name: name})
if err != nil {
return nil, err
}
@@ -71,13 +71,11 @@ func (c *IggyTcpClient) UpdateStream(streamId
iggcon.Identifier, name string) er
if len(name) > MaxStringLength || len(name) == 0 {
return ierror.ErrInvalidStreamName
}
- serializedRequest :=
binaryserialization.TcpUpdateStreamRequest{StreamId: streamId, Name: name}
- _, err := c.sendAndFetchResponse(serializedRequest.Serialize(),
iggcon.UpdateStreamCode)
+ _, err := c.do(&iggcon.UpdateStream{StreamId: streamId, Name: name})
return err
}
func (c *IggyTcpClient) DeleteStream(id iggcon.Identifier) error {
- message := binaryserialization.SerializeIdentifier(id)
- _, err := c.sendAndFetchResponse(message, iggcon.DeleteStreamCode)
+ _, err := c.do(&iggcon.DeleteStream{StreamId: id})
return err
}
diff --git a/foreign/go/client/tcp/tcp_topic_management.go
b/foreign/go/client/tcp/tcp_topic_management.go
index 3bea2b2a5..0c69ac8e9 100644
--- a/foreign/go/client/tcp/tcp_topic_management.go
+++ b/foreign/go/client/tcp/tcp_topic_management.go
@@ -24,8 +24,7 @@ import (
)
func (c *IggyTcpClient) GetTopics(streamId iggcon.Identifier) ([]iggcon.Topic,
error) {
- message := binaryserialization.SerializeIdentifier(streamId)
- buffer, err := c.sendAndFetchResponse(message, iggcon.GetTopicsCode)
+ buffer, err := c.do(&iggcon.GetTopics{StreamId: streamId})
if err != nil {
return nil, err
}
@@ -34,8 +33,7 @@ func (c *IggyTcpClient) GetTopics(streamId iggcon.Identifier)
([]iggcon.Topic, e
}
func (c *IggyTcpClient) GetTopic(streamId iggcon.Identifier, topicId
iggcon.Identifier) (*iggcon.TopicDetails, error) {
- message := binaryserialization.SerializeIdentifiers(streamId, topicId)
- buffer, err := c.sendAndFetchResponse(message, iggcon.GetTopicCode)
+ buffer, err := c.do(&iggcon.GetTopic{StreamId: streamId, TopicId:
topicId})
if err != nil {
return nil, err
}
@@ -70,7 +68,7 @@ func (c *IggyTcpClient) CreateTopic(
return nil, ierror.ErrInvalidReplicationFactor
}
- serializedRequest := binaryserialization.TcpCreateTopicRequest{
+ buffer, err := c.do(&iggcon.CreateTopic{
StreamId: streamId,
Name: name,
PartitionsCount: partitionsCount,
@@ -78,8 +76,7 @@ func (c *IggyTcpClient) CreateTopic(
MessageExpiry: messageExpiry,
MaxTopicSize: maxTopicSize,
ReplicationFactor: replicationFactor,
- }
- buffer, err := c.sendAndFetchResponse(serializedRequest.Serialize(),
iggcon.CreateTopicCode)
+ })
if err != nil {
return nil, err
}
@@ -102,20 +99,19 @@ func (c *IggyTcpClient) UpdateTopic(
if replicationFactor != nil && *replicationFactor == 0 {
return ierror.ErrInvalidReplicationFactor
}
- serializedRequest := binaryserialization.TcpUpdateTopicRequest{
+ _, err := c.do(&iggcon.UpdateTopic{
StreamId: streamId,
TopicId: topicId,
CompressionAlgorithm: compressionAlgorithm,
MessageExpiry: messageExpiry,
MaxTopicSize: maxTopicSize,
ReplicationFactor: replicationFactor,
- Name: name}
- _, err := c.sendAndFetchResponse(serializedRequest.Serialize(),
iggcon.UpdateTopicCode)
+ Name: name,
+ })
return err
}
func (c *IggyTcpClient) DeleteTopic(streamId, topicId iggcon.Identifier) error
{
- message := binaryserialization.SerializeIdentifiers(streamId, topicId)
- _, err := c.sendAndFetchResponse(message, iggcon.DeleteTopicCode)
+ _, err := c.do(&iggcon.DeleteTopic{StreamId: streamId, TopicId:
topicId})
return err
}
diff --git a/foreign/go/client/tcp/tcp_user_management.go
b/foreign/go/client/tcp/tcp_user_management.go
index bf53331d9..c9836ebf1 100644
--- a/foreign/go/client/tcp/tcp_user_management.go
+++ b/foreign/go/client/tcp/tcp_user_management.go
@@ -24,8 +24,7 @@ import (
)
func (c *IggyTcpClient) GetUser(identifier iggcon.Identifier)
(*iggcon.UserInfoDetails, error) {
- message := binaryserialization.SerializeIdentifier(identifier)
- buffer, err := c.sendAndFetchResponse(message, iggcon.GetUserCode)
+ buffer, err := c.do(&iggcon.GetUser{Id: identifier})
if err != nil {
return nil, err
}
@@ -37,7 +36,7 @@ func (c *IggyTcpClient) GetUser(identifier iggcon.Identifier)
(*iggcon.UserInfoD
}
func (c *IggyTcpClient) GetUsers() ([]iggcon.UserInfo, error) {
- buffer, err := c.sendAndFetchResponse([]byte{}, iggcon.GetUsersCode)
+ buffer, err := c.do(&iggcon.GetUsers{})
if err != nil {
return nil, err
}
@@ -46,13 +45,12 @@ func (c *IggyTcpClient) GetUsers() ([]iggcon.UserInfo,
error) {
}
func (c *IggyTcpClient) CreateUser(username string, password string, status
iggcon.UserStatus, permissions *iggcon.Permissions) (*iggcon.UserInfoDetails,
error) {
- message :=
binaryserialization.SerializeCreateUserRequest(iggcon.CreateUserRequest{
+ buffer, err := c.do(&iggcon.CreateUser{
Username: username,
Password: password,
Status: status,
Permissions: permissions,
})
- buffer, err := c.sendAndFetchResponse(message, iggcon.CreateUserCode)
if err != nil {
return nil, err
}
@@ -64,36 +62,34 @@ func (c *IggyTcpClient) CreateUser(username string,
password string, status iggc
}
func (c *IggyTcpClient) UpdateUser(userID iggcon.Identifier, username *string,
status *iggcon.UserStatus) error {
- message :=
binaryserialization.SerializeUpdateUser(iggcon.UpdateUserRequest{
+ _, err := c.do(&iggcon.UpdateUser{
UserID: userID,
Username: username,
Status: status,
})
- _, err := c.sendAndFetchResponse(message, iggcon.UpdateUserCode)
return err
}
func (c *IggyTcpClient) DeleteUser(identifier iggcon.Identifier) error {
- message := binaryserialization.SerializeIdentifier(identifier)
- _, err := c.sendAndFetchResponse(message, iggcon.DeleteUserCode)
+ _, err := c.do(&iggcon.DeleteUser{
+ Id: identifier,
+ })
return err
}
func (c *IggyTcpClient) UpdatePermissions(userID iggcon.Identifier,
permissions *iggcon.Permissions) error {
- message :=
binaryserialization.SerializeUpdateUserPermissionsRequest(iggcon.UpdatePermissionsRequest{
+ _, err := c.do(&iggcon.UpdatePermissions{
UserID: userID,
Permissions: permissions,
})
- _, err := c.sendAndFetchResponse(message, iggcon.UpdatePermissionsCode)
return err
}
func (c *IggyTcpClient) ChangePassword(userID iggcon.Identifier,
currentPassword string, newPassword string) error {
- message :=
binaryserialization.SerializeChangePasswordRequest(iggcon.ChangePasswordRequest{
+ _, err := c.do(&iggcon.ChangePassword{
UserID: userID,
CurrentPassword: currentPassword,
NewPassword: newPassword,
})
- _, err := c.sendAndFetchResponse(message, iggcon.ChangePasswordCode)
return err
}
diff --git a/foreign/go/client/tcp/tcp_utilities.go
b/foreign/go/client/tcp/tcp_utilities.go
index e9d667203..bd5beaef3 100644
--- a/foreign/go/client/tcp/tcp_utilities.go
+++ b/foreign/go/client/tcp/tcp_utilities.go
@@ -23,7 +23,7 @@ import (
)
func (c *IggyTcpClient) GetStats() (*iggcon.Stats, error) {
- buffer, err := c.sendAndFetchResponse([]byte{}, iggcon.GetStatsCode)
+ buffer, err := c.do(&iggcon.GetStats{})
if err != nil {
return nil, err
}
@@ -35,6 +35,6 @@ func (c *IggyTcpClient) GetStats() (*iggcon.Stats, error) {
}
func (c *IggyTcpClient) Ping() error {
- _, err := c.sendAndFetchResponse([]byte{}, iggcon.PingCode)
+ _, err := c.do(&iggcon.Ping{})
return err
}
diff --git a/foreign/go/contracts/access_tokens.go
b/foreign/go/contracts/access_tokens.go
index 773a5d77a..edb9b4ef3 100644
--- a/foreign/go/contracts/access_tokens.go
+++ b/foreign/go/contracts/access_tokens.go
@@ -17,17 +17,55 @@
package iggcon
-import "time"
+import (
+ "encoding/binary"
+ "time"
+)
-type CreatePersonalAccessTokenRequest struct {
+type CreatePersonalAccessToken struct {
Name string `json:"Name"`
Expiry uint32 `json:"Expiry"`
}
-type DeletePersonalAccessTokenRequest struct {
+func (c *CreatePersonalAccessToken) Code() CommandCode {
+ return CreateAccessTokenCode
+}
+
+func (c *CreatePersonalAccessToken) MarshalBinary() ([]byte, error) {
+ length := 1 + len(c.Name) + 8
+ bytes := make([]byte, length)
+ bytes[0] = byte(len(c.Name))
+ copy(bytes[1:], c.Name)
+ binary.LittleEndian.PutUint32(bytes[len(bytes)-4:], c.Expiry)
+ return bytes, nil
+}
+
+type DeletePersonalAccessToken struct {
Name string `json:"Name"`
}
+func (d *DeletePersonalAccessToken) Code() CommandCode {
+ return DeleteAccessTokenCode
+}
+
+func (d *DeletePersonalAccessToken) MarshalBinary() ([]byte, error) {
+ length := 1 + len(d.Name)
+ bytes := make([]byte, length)
+ bytes[0] = byte(len(d.Name))
+ copy(bytes[1:], d.Name)
+ return bytes, nil
+}
+
+type GetPersonalAccessTokens struct{}
+
+func (g *GetPersonalAccessTokens) Code() CommandCode {
+ return GetAccessTokensCode
+}
+
+func (g *GetPersonalAccessTokens) MarshalBinary() ([]byte, error) {
+ return []byte{}, nil
+}
+
type PersonalAccessTokenInfo struct {
Name string `json:"Name"`
Expiry *time.Time `json:"Expiry"`
diff --git a/foreign/go/contracts/change_password.go
b/foreign/go/contracts/change_password.go
new file mode 100644
index 000000000..f2c8c1d85
--- /dev/null
+++ b/foreign/go/contracts/change_password.go
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iggcon
+
+type ChangePassword struct {
+ UserID Identifier `json:"-"`
+ CurrentPassword string `json:"CurrentPassword"`
+ NewPassword string `json:"NewPassword"`
+}
+
+func (c *ChangePassword) Code() CommandCode {
+ return ChangePasswordCode
+}
+
+func (c *ChangePassword) MarshalBinary() ([]byte, error) {
+ userIdBytes, err := c.UserID.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ length := len(userIdBytes) + len(c.CurrentPassword) +
len(c.NewPassword) + 2
+ bytes := make([]byte, length)
+ position := 0
+
+ copy(bytes[position:position+len(userIdBytes)], userIdBytes)
+ position += len(userIdBytes)
+
+ bytes[position] = byte(len(c.CurrentPassword))
+ position++
+ copy(bytes[position:position+len(c.CurrentPassword)], c.CurrentPassword)
+ position += len(c.CurrentPassword)
+
+ bytes[position] = byte(len(c.NewPassword))
+ position++
+ copy(bytes[position:position+len(c.NewPassword)], c.NewPassword)
+
+ return bytes, nil
+}
diff --git a/foreign/go/contracts/command.go b/foreign/go/contracts/command.go
index d8b735da0..2c0bbdbfb 100644
--- a/foreign/go/contracts/command.go
+++ b/foreign/go/contracts/command.go
@@ -17,7 +17,10 @@
package iggcon
-import "encoding"
+import (
+ "encoding"
+ "encoding/binary"
+)
type Command interface {
// Code returns the command code associated with this command.
@@ -25,3 +28,27 @@ type Command interface {
encoding.BinaryMarshaler
}
+
+type GetClients struct{}
+
+func (c *GetClients) Code() CommandCode {
+ return GetClientsCode
+}
+
+func (c *GetClients) MarshalBinary() ([]byte, error) {
+ return []byte{}, nil
+}
+
+type GetClient struct {
+ ClientID uint32
+}
+
+func (c *GetClient) Code() CommandCode {
+ return GetClientCode
+}
+
+func (c *GetClient) MarshalBinary() ([]byte, error) {
+ bytes := make([]byte, 4)
+ binary.LittleEndian.PutUint32(bytes, c.ClientID)
+ return bytes, nil
+}
diff --git a/foreign/go/contracts/consumer.go b/foreign/go/contracts/consumer.go
index 84195ebba..4d9bf10cc 100644
--- a/foreign/go/contracts/consumer.go
+++ b/foreign/go/contracts/consumer.go
@@ -52,3 +52,14 @@ func NewGroupConsumer(id Identifier) Consumer {
Id: id,
}
}
+
+func (c Consumer) MarshalBinary() ([]byte, error) {
+ idBytes, err := c.Id.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ bytes := make([]byte, 0, 1+len(idBytes))
+ bytes = append(bytes, uint8(c.Kind))
+ bytes = append(bytes, idBytes...)
+ return bytes, nil
+}
diff --git a/foreign/go/contracts/consumer_groups.go
b/foreign/go/contracts/consumer_groups.go
index 5018fd037..3863a6b8e 100644
--- a/foreign/go/contracts/consumer_groups.go
+++ b/foreign/go/contracts/consumer_groups.go
@@ -35,32 +35,105 @@ type ConsumerGroupMember struct {
Partitions []uint32
}
-type CreateConsumerGroupRequest struct {
- StreamId Identifier `json:"streamId"`
- TopicId Identifier `json:"topicId"`
- Name string `json:"name"`
+type TopicPath struct {
+ StreamId Identifier
+ TopicId Identifier
}
-type DeleteConsumerGroupRequest struct {
- StreamId Identifier `json:"streamId"`
- TopicId Identifier `json:"topicId"`
- ConsumerGroupId Identifier `json:"consumerGroupId"`
+type CreateConsumerGroup struct {
+ TopicPath
+ Name string
}
-type JoinConsumerGroupRequest struct {
- StreamId Identifier `json:"streamId"`
- TopicId Identifier `json:"topicId"`
- ConsumerGroupId Identifier `json:"consumerGroupId"`
+func (c *CreateConsumerGroup) Code() CommandCode {
+ return CreateGroupCode
}
-type LeaveConsumerGroupRequest struct {
- StreamId Identifier `json:"streamId"`
- TopicId Identifier `json:"topicId"`
- ConsumerGroupId Identifier `json:"consumerGroupId"`
+func (c *CreateConsumerGroup) MarshalBinary() ([]byte, error) {
+ streamIdBytes, err := c.StreamId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ topicIdBytes, err := c.TopicId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ offset := len(streamIdBytes) + len(topicIdBytes)
+ bytes := make([]byte, offset+1+len(c.Name))
+ copy(bytes[0:len(streamIdBytes)], streamIdBytes)
+ copy(bytes[len(streamIdBytes):offset], topicIdBytes)
+ bytes[offset] = byte(len(c.Name))
+ copy(bytes[offset+1:], c.Name)
+ return bytes, nil
+}
+
+type DeleteConsumerGroup struct {
+ TopicPath
+ GroupId Identifier
+}
+
+func (d *DeleteConsumerGroup) Code() CommandCode {
+ return DeleteGroupCode
+}
+
+func (d *DeleteConsumerGroup) MarshalBinary() ([]byte, error) {
+ return marshalIdentifiers(d.StreamId, d.TopicId, d.GroupId)
+}
+
+type JoinConsumerGroup struct {
+ TopicPath
+ GroupId Identifier
+}
+
+func (j *JoinConsumerGroup) Code() CommandCode {
+ return JoinGroupCode
+}
+
+func (j *JoinConsumerGroup) MarshalBinary() ([]byte, error) {
+ return marshalIdentifiers(j.StreamId, j.TopicId, j.GroupId)
+}
+
+type LeaveConsumerGroup struct {
+ TopicPath
+ GroupId Identifier
+}
+
+func (l *LeaveConsumerGroup) Code() CommandCode {
+ return LeaveGroupCode
+}
+
+func (l *LeaveConsumerGroup) MarshalBinary() ([]byte, error) {
+ return marshalIdentifiers(l.StreamId, l.TopicId, l.GroupId)
+}
+
+type GetConsumerGroup struct {
+ TopicPath
+ GroupId Identifier
+}
+
+func (g *GetConsumerGroup) Code() CommandCode {
+ return GetGroupCode
+}
+
+func (g *GetConsumerGroup) MarshalBinary() ([]byte, error) {
+ return marshalIdentifiers(g.StreamId, g.TopicId, g.GroupId)
+}
+
+type GetConsumerGroups struct {
+ StreamId Identifier
+ TopicId Identifier
+}
+
+func (g *GetConsumerGroups) Code() CommandCode {
+ return GetGroupsCode
+}
+
+func (g *GetConsumerGroups) MarshalBinary() ([]byte, error) {
+ return marshalIdentifiers(g.StreamId, g.TopicId)
}
type ConsumerGroupInfo struct {
- StreamId uint32 `json:"streamId"`
- TopicId uint32 `json:"topicId"`
- ConsumerGroupId uint32 `json:"consumerGroupId"`
+ StreamId uint32 `json:"streamId"`
+ TopicId uint32 `json:"topicId"`
+ GroupId uint32 `json:"groupId"`
}
diff --git a/foreign/go/binary_serialization/create_stream_serializer.go
b/foreign/go/contracts/create_stream.go
similarity index 79%
rename from foreign/go/binary_serialization/create_stream_serializer.go
rename to foreign/go/contracts/create_stream.go
index be12c4b16..9c4c1a631 100644
--- a/foreign/go/binary_serialization/create_stream_serializer.go
+++ b/foreign/go/contracts/create_stream.go
@@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-package binaryserialization
+package iggcon
-type TcpCreateStreamRequest struct {
+type CreateStream struct {
Name string
}
@@ -26,10 +26,14 @@ const (
payloadOffset = 1
)
-func (request *TcpCreateStreamRequest) Serialize() []byte {
+func (request *CreateStream) Code() CommandCode {
+ return CreateStreamCode
+}
+
+func (request *CreateStream) MarshalBinary() ([]byte, error) {
nameLength := len(request.Name)
serialized := make([]byte, payloadOffset+nameLength)
serialized[nameLengthOffset] = byte(nameLength)
- copy(serialized[payloadOffset:], []byte(request.Name))
- return serialized
+ copy(serialized[payloadOffset:], request.Name)
+ return serialized, nil
}
diff --git a/foreign/go/binary_serialization/create_stream_serializer_test.go
b/foreign/go/contracts/create_stream_test.go
similarity index 89%
rename from foreign/go/binary_serialization/create_stream_serializer_test.go
rename to foreign/go/contracts/create_stream_test.go
index 45a520df1..19a9c15f9 100644
--- a/foreign/go/binary_serialization/create_stream_serializer_test.go
+++ b/foreign/go/contracts/create_stream_test.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package binaryserialization
+package iggcon
import (
"reflect"
@@ -23,14 +23,16 @@ import (
)
func TestSerialize_TcpCreateStreamRequest(t *testing.T) {
- // Create a sample TcpCreateStreamRequest
- request := TcpCreateStreamRequest{
+ // Create a sample CreateStream
+ request := CreateStream{
Name: "test_stream",
}
// Serialize the request
- serialized := request.Serialize()
-
+ serialized, err := request.MarshalBinary()
+ if err != nil {
+ t.Errorf("Failed to serialize CreateStream: %v", err)
+ }
// Expected serialized bytes
expectedNameLength := byte(11) // Length of "test_stream"
expectedPayload := []byte("test_stream")
diff --git a/foreign/go/binary_serialization/create_topic_serializer.go
b/foreign/go/contracts/create_topic.go
similarity index 55%
rename from foreign/go/binary_serialization/create_topic_serializer.go
rename to foreign/go/contracts/create_topic.go
index fc5f5f9ec..92a799a86 100644
--- a/foreign/go/binary_serialization/create_topic_serializer.go
+++ b/foreign/go/contracts/create_topic.go
@@ -15,30 +15,36 @@
// specific language governing permissions and limitations
// under the License.
-package binaryserialization
+package iggcon
import (
"encoding/binary"
- iggcon "github.com/apache/iggy/foreign/go/contracts"
)
-type TcpCreateTopicRequest struct {
- StreamId iggcon.Identifier `json:"streamId"`
- PartitionsCount uint32
`json:"partitionsCount"`
- CompressionAlgorithm iggcon.CompressionAlgorithm
`json:"compressionAlgorithm"`
- MessageExpiry iggcon.Duration `json:"messageExpiry"`
- MaxTopicSize uint64 `json:"maxTopicSize"`
- Name string `json:"name"`
- ReplicationFactor *uint8
`json:"replicationFactor"`
+type CreateTopic struct {
+ StreamId Identifier `json:"streamId"`
+ PartitionsCount uint32 `json:"partitionsCount"`
+ CompressionAlgorithm CompressionAlgorithm `json:"compressionAlgorithm"`
+ MessageExpiry Duration `json:"messageExpiry"`
+ MaxTopicSize uint64 `json:"maxTopicSize"`
+ Name string `json:"name"`
+ ReplicationFactor *uint8 `json:"replicationFactor"`
}
-func (request *TcpCreateTopicRequest) Serialize() []byte {
- if request.ReplicationFactor == nil {
- request.ReplicationFactor = new(uint8)
+func (t *CreateTopic) Code() CommandCode {
+ return CreateTopicCode
+}
+
+func (t *CreateTopic) MarshalBinary() ([]byte, error) {
+ if t.ReplicationFactor == nil {
+ t.ReplicationFactor = new(uint8)
}
- streamIdBytes := SerializeIdentifier(request.StreamId)
- nameBytes := []byte(request.Name)
+ streamIdBytes, err := t.StreamId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ nameBytes := []byte(t.Name)
totalLength := len(streamIdBytes) + // StreamId
4 + // PartitionsCount
@@ -57,23 +63,23 @@ func (request *TcpCreateTopicRequest) Serialize() []byte {
position += len(streamIdBytes)
// PartitionsCount
- binary.LittleEndian.PutUint32(bytes[position:], request.PartitionsCount)
+ binary.LittleEndian.PutUint32(bytes[position:], t.PartitionsCount)
position += 4
// CompressionAlgorithm
- bytes[position] = byte(request.CompressionAlgorithm)
+ bytes[position] = byte(t.CompressionAlgorithm)
position++
// MessageExpiry
- binary.LittleEndian.PutUint64(bytes[position:],
uint64(request.MessageExpiry))
+ binary.LittleEndian.PutUint64(bytes[position:], uint64(t.MessageExpiry))
position += 8
// MaxTopicSize
- binary.LittleEndian.PutUint64(bytes[position:], request.MaxTopicSize)
+ binary.LittleEndian.PutUint64(bytes[position:], t.MaxTopicSize)
position += 8
// ReplicationFactor
- bytes[position] = *request.ReplicationFactor
+ bytes[position] = *t.ReplicationFactor
position++
// Name
@@ -81,5 +87,5 @@ func (request *TcpCreateTopicRequest) Serialize() []byte {
position++
copy(bytes[position:], nameBytes)
- return bytes
+ return bytes, nil
}
diff --git a/foreign/go/contracts/create_user.go
b/foreign/go/contracts/create_user.go
new file mode 100644
index 000000000..df8298719
--- /dev/null
+++ b/foreign/go/contracts/create_user.go
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iggcon
+
+import "encoding/binary"
+
+type CreateUser struct {
+ Username string `json:"username"`
+ Password string `json:"Password"`
+ Status UserStatus `json:"Status"`
+ Permissions *Permissions `json:"Permissions,omitempty"`
+}
+
+func (c *CreateUser) Code() CommandCode {
+ return CreateUserCode
+}
+
+func (c *CreateUser) MarshalBinary() ([]byte, error) {
+ capacity := 4 + len(c.Username) + len(c.Password)
+ if c.Permissions != nil {
+ capacity += 1 + 4 + c.Permissions.Size()
+ }
+
+ bytes := make([]byte, capacity)
+ position := 0
+
+ bytes[position] = byte(len(c.Username))
+ position += 1
+ copy(bytes[position:position+len(c.Username)], []byte(c.Username))
+ position += len(c.Username)
+
+ bytes[position] = byte(len(c.Password))
+ position += 1
+ copy(bytes[position:position+len(c.Password)], []byte(c.Password))
+ position += len(c.Password)
+
+ statusByte := byte(0)
+ switch c.Status {
+ case Active:
+ statusByte = byte(1)
+ case Inactive:
+ statusByte = byte(2)
+ }
+ bytes[position] = statusByte
+ position += 1
+
+ if c.Permissions != nil {
+ bytes[position] = byte(1)
+ position += 1
+ permissionsBytes, err := c.Permissions.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ binary.LittleEndian.PutUint32(bytes[position:position+4],
uint32(len(permissionsBytes)))
+ position += 4
+ copy(bytes[position:position+len(permissionsBytes)],
permissionsBytes)
+ } else {
+ bytes[position] = byte(0)
+ }
+
+ return bytes, nil
+}
diff --git a/foreign/go/binary_serialization/consumer_serializer.go
b/foreign/go/contracts/delete_stream.go
similarity index 71%
rename from foreign/go/binary_serialization/consumer_serializer.go
rename to foreign/go/contracts/delete_stream.go
index a359ed51f..fad72e388 100644
--- a/foreign/go/binary_serialization/consumer_serializer.go
+++ b/foreign/go/contracts/delete_stream.go
@@ -15,14 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-package binaryserialization
+package iggcon
-import iggcon "github.com/apache/iggy/foreign/go/contracts"
+type DeleteStream struct {
+ StreamId Identifier
+}
+
+func (d *DeleteStream) Code() CommandCode {
+ return DeleteStreamCode
+}
-func SerializeConsumer(consumer iggcon.Consumer) []byte {
- idBytes := SerializeIdentifier(consumer.Id)
- bytes := make([]byte, 0, 1+len(idBytes))
- bytes = append(bytes, uint8(consumer.Kind))
- bytes = append(bytes, idBytes...)
- return bytes
+func (d *DeleteStream) MarshalBinary() ([]byte, error) {
+ return d.StreamId.MarshalBinary()
}
diff --git a/foreign/go/contracts/command.go
b/foreign/go/contracts/delete_topic.go
similarity index 77%
copy from foreign/go/contracts/command.go
copy to foreign/go/contracts/delete_topic.go
index d8b735da0..d5b062168 100644
--- a/foreign/go/contracts/command.go
+++ b/foreign/go/contracts/delete_topic.go
@@ -17,11 +17,15 @@
package iggcon
-import "encoding"
+type DeleteTopic struct {
+ StreamId Identifier
+ TopicId Identifier
+}
-type Command interface {
- // Code returns the command code associated with this command.
- Code() CommandCode
+func (d *DeleteTopic) Code() CommandCode {
+ return DeleteTopicCode
+}
- encoding.BinaryMarshaler
+func (d *DeleteTopic) MarshalBinary() ([]byte, error) {
+ return marshalIdentifiers(d.StreamId, d.TopicId)
}
diff --git a/foreign/go/contracts/command.go
b/foreign/go/contracts/delete_user.go
similarity index 80%
copy from foreign/go/contracts/command.go
copy to foreign/go/contracts/delete_user.go
index d8b735da0..013f8e1ca 100644
--- a/foreign/go/contracts/command.go
+++ b/foreign/go/contracts/delete_user.go
@@ -17,11 +17,14 @@
package iggcon
-import "encoding"
+type DeleteUser struct {
+ Id Identifier
+}
-type Command interface {
- // Code returns the command code associated with this command.
- Code() CommandCode
+func (d *DeleteUser) Code() CommandCode {
+ return DeleteUserCode
+}
- encoding.BinaryMarshaler
+func (d *DeleteUser) MarshalBinary() ([]byte, error) {
+ return d.Id.MarshalBinary()
}
diff --git a/foreign/go/contracts/command.go b/foreign/go/contracts/get_stats.go
similarity index 83%
copy from foreign/go/contracts/command.go
copy to foreign/go/contracts/get_stats.go
index d8b735da0..ec6e42106 100644
--- a/foreign/go/contracts/command.go
+++ b/foreign/go/contracts/get_stats.go
@@ -17,11 +17,12 @@
package iggcon
-import "encoding"
+type GetStats struct{}
-type Command interface {
- // Code returns the command code associated with this command.
- Code() CommandCode
+func (c *GetStats) Code() CommandCode {
+ return GetStatsCode
+}
- encoding.BinaryMarshaler
+func (c *GetStats) MarshalBinary() ([]byte, error) {
+ return []byte{}, nil
}
diff --git a/foreign/go/contracts/command.go
b/foreign/go/contracts/get_stream.go
similarity index 80%
copy from foreign/go/contracts/command.go
copy to foreign/go/contracts/get_stream.go
index d8b735da0..9cc1a7d18 100644
--- a/foreign/go/contracts/command.go
+++ b/foreign/go/contracts/get_stream.go
@@ -17,11 +17,14 @@
package iggcon
-import "encoding"
+type GetStream struct {
+ StreamId Identifier
+}
-type Command interface {
- // Code returns the command code associated with this command.
- Code() CommandCode
+func (g *GetStream) Code() CommandCode {
+ return GetStreamCode
+}
- encoding.BinaryMarshaler
+func (g *GetStream) MarshalBinary() ([]byte, error) {
+ return g.StreamId.MarshalBinary()
}
diff --git a/foreign/go/contracts/command.go
b/foreign/go/contracts/get_streams.go
similarity index 82%
copy from foreign/go/contracts/command.go
copy to foreign/go/contracts/get_streams.go
index d8b735da0..e8b0e6083 100644
--- a/foreign/go/contracts/command.go
+++ b/foreign/go/contracts/get_streams.go
@@ -17,11 +17,12 @@
package iggcon
-import "encoding"
+type GetStreams struct{}
-type Command interface {
- // Code returns the command code associated with this command.
- Code() CommandCode
+func (g *GetStreams) Code() CommandCode {
+ return GetStreamsCode
+}
- encoding.BinaryMarshaler
+func (g *GetStreams) MarshalBinary() ([]byte, error) {
+ return []byte{}, nil
}
diff --git a/foreign/go/contracts/command.go b/foreign/go/contracts/get_topic.go
similarity index 77%
copy from foreign/go/contracts/command.go
copy to foreign/go/contracts/get_topic.go
index d8b735da0..1026bfd5c 100644
--- a/foreign/go/contracts/command.go
+++ b/foreign/go/contracts/get_topic.go
@@ -17,11 +17,15 @@
package iggcon
-import "encoding"
+type GetTopic struct {
+ StreamId Identifier
+ TopicId Identifier
+}
-type Command interface {
- // Code returns the command code associated with this command.
- Code() CommandCode
+func (g *GetTopic) Code() CommandCode {
+ return GetTopicCode
+}
- encoding.BinaryMarshaler
+func (g *GetTopic) MarshalBinary() ([]byte, error) {
+ return marshalIdentifiers(g.StreamId, g.TopicId)
}
diff --git a/foreign/go/contracts/command.go
b/foreign/go/contracts/get_topics.go
similarity index 80%
copy from foreign/go/contracts/command.go
copy to foreign/go/contracts/get_topics.go
index d8b735da0..23dbbd53d 100644
--- a/foreign/go/contracts/command.go
+++ b/foreign/go/contracts/get_topics.go
@@ -17,11 +17,14 @@
package iggcon
-import "encoding"
+type GetTopics struct {
+ StreamId Identifier
+}
-type Command interface {
- // Code returns the command code associated with this command.
- Code() CommandCode
+func (g *GetTopics) Code() CommandCode {
+ return GetTopicsCode
+}
- encoding.BinaryMarshaler
+func (g *GetTopics) MarshalBinary() ([]byte, error) {
+ return g.StreamId.MarshalBinary()
}
diff --git a/foreign/go/contracts/command.go b/foreign/go/contracts/get_user.go
similarity index 81%
copy from foreign/go/contracts/command.go
copy to foreign/go/contracts/get_user.go
index d8b735da0..0ef25435c 100644
--- a/foreign/go/contracts/command.go
+++ b/foreign/go/contracts/get_user.go
@@ -17,11 +17,14 @@
package iggcon
-import "encoding"
+type GetUser struct {
+ Id Identifier
+}
-type Command interface {
- // Code returns the command code associated with this command.
- Code() CommandCode
+func (c *GetUser) Code() CommandCode {
+ return GetUserCode
+}
- encoding.BinaryMarshaler
+func (c *GetUser) MarshalBinary() ([]byte, error) {
+ return c.Id.MarshalBinary()
}
diff --git a/foreign/go/contracts/command.go b/foreign/go/contracts/get_users.go
similarity index 83%
copy from foreign/go/contracts/command.go
copy to foreign/go/contracts/get_users.go
index d8b735da0..21c5b3de5 100644
--- a/foreign/go/contracts/command.go
+++ b/foreign/go/contracts/get_users.go
@@ -17,11 +17,12 @@
package iggcon
-import "encoding"
+type GetUsers struct{}
-type Command interface {
- // Code returns the command code associated with this command.
- Code() CommandCode
+func (g *GetUsers) Code() CommandCode {
+ return GetUsersCode
+}
- encoding.BinaryMarshaler
+func (g *GetUsers) MarshalBinary() ([]byte, error) {
+ return []byte{}, nil
}
diff --git a/foreign/go/contracts/identifier.go
b/foreign/go/contracts/identifier.go
index 0e3401acb..3938d490a 100644
--- a/foreign/go/contracts/identifier.go
+++ b/foreign/go/contracts/identifier.go
@@ -88,3 +88,35 @@ func (id Identifier) String() (string, error) {
return string(id.Value), nil
}
+
+func (id Identifier) MarshalBinary() ([]byte, error) {
+ bytes := make([]byte, id.Length+2)
+ bytes[0] = byte(id.Kind)
+ bytes[1] = byte(id.Length)
+ copy(bytes[2:], id.Value)
+ return bytes, nil
+}
+
+func (id Identifier) AppendBinary(b []byte) ([]byte, error) {
+ b = append(b, byte(id.Kind), byte(id.Length))
+ b = append(b, id.Value...)
+ return b, nil
+}
+
+func marshalIdentifiers(identifiers ...Identifier) ([]byte, error) {
+ size := 0
+ for i := 0; i < len(identifiers); i++ {
+ size += 2 + identifiers[i].Length
+ }
+ bytes := make([]byte, 0, size)
+
+ for i := 0; i < len(identifiers); i++ {
+ var err error
+ bytes, err = identifiers[i].AppendBinary(bytes)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return bytes, nil
+}
diff --git a/foreign/go/binary_serialization/identifier_serializer_test.go
b/foreign/go/contracts/identifier_test.go
similarity index 82%
rename from foreign/go/binary_serialization/identifier_serializer_test.go
rename to foreign/go/contracts/identifier_test.go
index 06fde46e1..a83a89367 100644
--- a/foreign/go/binary_serialization/identifier_serializer_test.go
+++ b/foreign/go/contracts/identifier_test.go
@@ -15,23 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-package binaryserialization
+package iggcon
import (
+ "bytes"
"errors"
"testing"
ierror "github.com/apache/iggy/foreign/go/errors"
-
- iggcon "github.com/apache/iggy/foreign/go/contracts"
)
func TestSerializeIdentifier_StringId(t *testing.T) {
// Test case for StringId
- identifier, _ := iggcon.NewIdentifier("Hello")
+ identifier, _ := NewIdentifier("Hello")
// Serialize the identifier
- serialized := SerializeIdentifier(identifier)
+ serialized, err := identifier.MarshalBinary()
+ if err != nil {
+ t.Errorf("Error serializing identifier: %v", err)
+ }
// Expected serialized bytes for StringId
expected := []byte{
@@ -41,17 +43,20 @@ func TestSerializeIdentifier_StringId(t *testing.T) {
}
// Check if the serialized bytes match the expected bytes
- if !areBytesEqual(serialized, expected) {
+ if !bytes.Equal(serialized, expected) {
t.Errorf("Serialized bytes are incorrect for StringId.
\nExpected:\t%v\nGot:\t\t%v", expected, serialized)
}
}
func TestSerializeIdentifier_NumericId(t *testing.T) {
// Test case for NumericId
- identifier, _ := iggcon.NewIdentifier(uint32(123))
+ identifier, _ := NewIdentifier(uint32(123))
// Serialize the identifier
- serialized := SerializeIdentifier(identifier)
+ serialized, err := identifier.MarshalBinary()
+ if err != nil {
+ t.Errorf("Error serializing identifier: %v", err)
+ }
// Expected serialized bytes for NumericId
expected := []byte{
@@ -61,14 +66,14 @@ func TestSerializeIdentifier_NumericId(t *testing.T) {
}
// Check if the serialized bytes match the expected bytes
- if !areBytesEqual(serialized, expected) {
+ if !bytes.Equal(serialized, expected) {
t.Errorf("Serialized bytes are incorrect for NumericId.
\nExpected:\t%v\nGot:\t\t%v", expected, serialized)
}
}
func TestSerializeIdentifier_EmptyStringId(t *testing.T) {
// Test case for an empty StringId
- _, err := iggcon.NewIdentifier("")
+ _, err := NewIdentifier("")
// Check if the serialized bytes match the expected bytes
if !errors.Is(err, ierror.ErrInvalidIdentifier) {
diff --git a/foreign/go/contracts/login.go b/foreign/go/contracts/login.go
deleted file mode 100644
index 31c60b9ed..000000000
--- a/foreign/go/contracts/login.go
+++ /dev/null
@@ -1,36 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-type LoginUserRequest struct {
- Username string `json:"username"`
- Password string `json:"password"`
- Version string `json:"version,omitempty"`
- Context string `json:"context,omitempty"`
-}
-
-type LoginWithPersonalAccessTokenRequest struct {
- Token string `json:"token"`
-}
-
-type IdentityInfo struct {
- // Unique identifier (numeric) of the user.
- UserId uint32 `json:"userId"`
- // The optional tokens, used only by HTTP transport.
- AccessToken *string `json:"accessToken"`
-}
diff --git a/foreign/go/contracts/offsets.go b/foreign/go/contracts/offsets.go
index d85c25f4f..8726a5957 100644
--- a/foreign/go/contracts/offsets.go
+++ b/foreign/go/contracts/offsets.go
@@ -17,6 +17,8 @@
package iggcon
+import "encoding/binary"
+
type StoreConsumerOffsetRequest struct {
StreamId Identifier `json:"streamId"`
TopicId Identifier `json:"topicId"`
@@ -25,22 +27,134 @@ type StoreConsumerOffsetRequest struct {
Offset uint64 `json:"offset"`
}
-type GetConsumerOffsetRequest struct {
+func (s *StoreConsumerOffsetRequest) Code() CommandCode {
+ return StoreOffsetCode
+}
+
+func (s *StoreConsumerOffsetRequest) MarshalBinary() ([]byte, error) {
+ hasPartition := byte(0)
+ var partition uint32 = 0
+ if s.PartitionId != nil {
+ hasPartition = 1
+ partition = *s.PartitionId
+ }
+ consumerBytes, err := s.Consumer.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ streamIdBytes, err := s.StreamId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ topicIdBytes, err := s.TopicId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ // consumer + stream_id + topic_id + hasPartition(1) + partition(4) +
offset(8)
+ bytes := make([]byte,
len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+13)
+ position := 0
+ copy(bytes[position:], consumerBytes)
+ position += len(consumerBytes)
+ copy(bytes[position:], streamIdBytes)
+ position += len(streamIdBytes)
+ copy(bytes[position:], topicIdBytes)
+ position += len(topicIdBytes)
+ bytes[position] = hasPartition
+ binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition)
+ binary.LittleEndian.PutUint64(bytes[position+5:position+13], s.Offset)
+ return bytes, nil
+}
+
+type GetConsumerOffset struct {
StreamId Identifier `json:"streamId"`
TopicId Identifier `json:"topicId"`
Consumer Consumer `json:"consumer"`
PartitionId *uint32 `json:"partitionId"`
}
+func (g *GetConsumerOffset) Code() CommandCode {
+ return GetOffsetCode
+}
+
+func (g *GetConsumerOffset) MarshalBinary() ([]byte, error) {
+ hasPartition := byte(0)
+ var partition uint32 = 0
+ if g.PartitionId != nil {
+ hasPartition = 1
+ partition = *g.PartitionId
+ }
+ consumerBytes, err := g.Consumer.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ streamIdBytes, err := g.StreamId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ topicIdBytes, err := g.TopicId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ // consumer + stream_id + topic_id + hasPartition(1) + partition(4)
+ bytes := make([]byte,
len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+5)
+ position := 0
+ copy(bytes[position:], consumerBytes)
+ position += len(consumerBytes)
+ copy(bytes[position:], streamIdBytes)
+ position += len(streamIdBytes)
+ copy(bytes[position:], topicIdBytes)
+ position += len(topicIdBytes)
+ bytes[position] = hasPartition
+ binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition)
+ return bytes, nil
+}
+
type ConsumerOffsetInfo struct {
PartitionId uint32 `json:"partitionId"`
CurrentOffset uint64 `json:"currentOffset"`
StoredOffset uint64 `json:"storedOffset"`
}
-type DeleteConsumerOffsetRequest struct {
+type DeleteConsumerOffset struct {
Consumer Consumer
StreamId Identifier
TopicId Identifier
PartitionId *uint32
}
+
+func (d *DeleteConsumerOffset) Code() CommandCode {
+ return DeleteConsumerOffsetCode
+}
+
+func (d *DeleteConsumerOffset) MarshalBinary() ([]byte, error) {
+ hasPartition := byte(0)
+ var partition uint32 = 0
+ if d.PartitionId != nil {
+ hasPartition = 1
+ partition = *d.PartitionId
+ }
+ consumerBytes, err := d.Consumer.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ streamIdBytes, err := d.StreamId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ topicIdBytes, err := d.TopicId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ // consumer + stream_id + topic_id + hasPartition(1) + partition(4)
+ bytes := make([]byte,
len(consumerBytes)+len(streamIdBytes)+len(topicIdBytes)+5)
+ position := 0
+ copy(bytes[position:], consumerBytes)
+ position += len(consumerBytes)
+ copy(bytes[position:], streamIdBytes)
+ position += len(streamIdBytes)
+ copy(bytes[position:], topicIdBytes)
+ position += len(topicIdBytes)
+ bytes[position] = hasPartition
+ binary.LittleEndian.PutUint32(bytes[position+1:position+5], partition)
+ return bytes, nil
+}
diff --git a/foreign/go/contracts/partitions.go
b/foreign/go/contracts/partitions.go
index ff32a1708..00ab0244c 100644
--- a/foreign/go/contracts/partitions.go
+++ b/foreign/go/contracts/partitions.go
@@ -33,18 +33,66 @@ type PartitionContract struct {
SizeBytes uint64 `json:"sizeBytes"`
}
-type CreatePartitionsRequest struct {
+type CreatePartitions struct {
StreamId Identifier `json:"streamId"`
TopicId Identifier `json:"topicId"`
PartitionsCount uint32 `json:"partitionsCount"`
}
-type DeletePartitionsRequest struct {
+func (c *CreatePartitions) Code() CommandCode {
+ return CreatePartitionsCode
+}
+
+func (c *CreatePartitions) MarshalBinary() ([]byte, error) {
+ streamIdBytes, err := c.StreamId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ topicIdBytes, err := c.TopicId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ bytes := make([]byte, len(streamIdBytes)+len(topicIdBytes)+4)
+ position := 0
+ copy(bytes[position:], streamIdBytes)
+ position += len(streamIdBytes)
+ copy(bytes[position:], topicIdBytes)
+ position += len(topicIdBytes)
+ binary.LittleEndian.PutUint32(bytes[position:position+4],
c.PartitionsCount)
+
+ return bytes, nil
+}
+
+type DeletePartitions struct {
StreamId Identifier `json:"streamId"`
TopicId Identifier `json:"topicId"`
PartitionsCount uint32 `json:"partitionsCount"`
}
+func (d *DeletePartitions) Code() CommandCode {
+ return DeletePartitionsCode
+}
+
+func (d *DeletePartitions) MarshalBinary() ([]byte, error) {
+ streamIdBytes, err := d.StreamId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ topicIdBytes, err := d.TopicId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ bytes := make([]byte, len(streamIdBytes)+len(topicIdBytes)+4)
+ position := 0
+ copy(bytes[position:], streamIdBytes)
+ position += len(streamIdBytes)
+ copy(bytes[position:], topicIdBytes)
+ position += len(topicIdBytes)
+ binary.LittleEndian.PutUint32(bytes[position:position+4],
d.PartitionsCount)
+
+ return bytes, nil
+}
+
type PartitioningKind int
const (
@@ -130,3 +178,11 @@ func EntityIdGuid(value uuid.UUID) Partitioning {
Value: bytes,
}
}
+
+func (p Partitioning) MarshalBinary() ([]byte, error) {
+ bytes := make([]byte, 2+p.Length)
+ bytes[0] = byte(p.Kind)
+ bytes[1] = byte(p.Length)
+ copy(bytes[2:], p.Value)
+ return bytes, nil
+}
diff --git a/foreign/go/contracts/command.go b/foreign/go/contracts/ping.go
similarity index 84%
copy from foreign/go/contracts/command.go
copy to foreign/go/contracts/ping.go
index d8b735da0..dd43a6aad 100644
--- a/foreign/go/contracts/command.go
+++ b/foreign/go/contracts/ping.go
@@ -17,11 +17,12 @@
package iggcon
-import "encoding"
+type Ping struct{}
-type Command interface {
- // Code returns the command code associated with this command.
- Code() CommandCode
+func (p *Ping) Code() CommandCode {
+ return PingCode
+}
- encoding.BinaryMarshaler
+func (p *Ping) MarshalBinary() ([]byte, error) {
+ return []byte{}, nil
}
diff --git
a/foreign/go/binary_serialization/fetch_messages_request_serializer.go
b/foreign/go/contracts/poll_messages.go
similarity index 62%
rename from foreign/go/binary_serialization/fetch_messages_request_serializer.go
rename to foreign/go/contracts/poll_messages.go
index 798918e48..efeaf8171 100644
--- a/foreign/go/binary_serialization/fetch_messages_request_serializer.go
+++ b/foreign/go/contracts/poll_messages.go
@@ -15,12 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-package binaryserialization
+package iggcon
import (
"encoding/binary"
-
- iggcon "github.com/apache/iggy/foreign/go/contracts"
)
const (
@@ -31,24 +29,37 @@ const (
commitFlagSize = 1
)
-type TcpFetchMessagesRequest struct {
- StreamId iggcon.Identifier `json:"streamId"`
- TopicId iggcon.Identifier `json:"topicId"`
- Consumer iggcon.Consumer `json:"consumer"`
- PartitionId *uint32 `json:"partitionId"`
- Strategy iggcon.PollingStrategy `json:"pollingStrategy"`
- Count uint32 `json:"count"`
- AutoCommit bool `json:"autoCommit"`
+type PollMessages struct {
+ StreamId Identifier `json:"streamId"`
+ TopicId Identifier `json:"topicId"`
+ Consumer Consumer `json:"consumer"`
+ PartitionId *uint32 `json:"partitionId"`
+ Strategy PollingStrategy `json:"pollingStrategy"`
+ Count uint32 `json:"count"`
+ AutoCommit bool `json:"autoCommit"`
+}
+
+func (m *PollMessages) Code() CommandCode {
+ return PollMessagesCode
}
-func (request *TcpFetchMessagesRequest) Serialize() []byte {
- consumerIdBytes := SerializeIdentifier(request.Consumer.Id)
- streamIdBytes := SerializeIdentifier(request.StreamId)
- topicIdBytes := SerializeIdentifier(request.TopicId)
+func (m *PollMessages) MarshalBinary() ([]byte, error) {
+ consumerIdBytes, err := m.Consumer.Id.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ streamIdBytes, err := m.StreamId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ topicIdBytes, err := m.TopicId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
messageSize := 1 + len(consumerIdBytes) + len(streamIdBytes) +
len(topicIdBytes) + partitionStrategySize + offsetSize + commitFlagSize
bytes := make([]byte, messageSize)
- bytes[0] = byte(request.Consumer.Kind)
+ bytes[0] = byte(m.Consumer.Kind)
position := 1
copy(bytes[position:position+len(consumerIdBytes)], consumerIdBytes)
position += len(consumerIdBytes)
@@ -57,26 +68,26 @@ func (request *TcpFetchMessagesRequest) Serialize() []byte {
position += len(streamIdBytes)
copy(bytes[position:position+len(topicIdBytes)], topicIdBytes)
position += len(topicIdBytes)
- if request.PartitionId != nil {
+ if m.PartitionId != nil {
bytes[position] = 1
- binary.LittleEndian.PutUint32(bytes[position+1:position+1+4],
*request.PartitionId)
+ binary.LittleEndian.PutUint32(bytes[position+1:position+1+4],
*m.PartitionId)
} else {
bytes[position] = 0
binary.LittleEndian.PutUint32(bytes[position+1:position+1+4], 0)
}
- bytes[position+1+4] = byte(request.Strategy.Kind)
+ bytes[position+1+4] = byte(m.Strategy.Kind)
position += partitionStrategySize
- binary.LittleEndian.PutUint64(bytes[position:position+8],
request.Strategy.Value)
- binary.LittleEndian.PutUint32(bytes[position+8:position+12],
request.Count)
+ binary.LittleEndian.PutUint64(bytes[position:position+8],
m.Strategy.Value)
+ binary.LittleEndian.PutUint32(bytes[position+8:position+12], m.Count)
position += offsetSize
- if request.AutoCommit {
+ if m.AutoCommit {
bytes[position] = 1
} else {
bytes[position] = 0
}
- return bytes
+ return bytes, nil
}
diff --git
a/foreign/go/binary_serialization/fetch_messages_request_serializer_test.go
b/foreign/go/contracts/poll_messages_test.go
similarity index 85%
rename from
foreign/go/binary_serialization/fetch_messages_request_serializer_test.go
rename to foreign/go/contracts/poll_messages_test.go
index 0c46d0ca2..a1455432c 100644
--- a/foreign/go/binary_serialization/fetch_messages_request_serializer_test.go
+++ b/foreign/go/contracts/poll_messages_test.go
@@ -15,32 +15,33 @@
// specific language governing permissions and limitations
// under the License.
-package binaryserialization
+package iggcon
import (
"testing"
-
- iggcon "github.com/apache/iggy/foreign/go/contracts"
)
func TestSerialize_TcpFetchMessagesRequest(t *testing.T) {
partitionId := uint32(123)
- consumerId, _ := iggcon.NewIdentifier(uint32(42))
- streamId, _ := iggcon.NewIdentifier("test_stream_id")
- topicId, _ := iggcon.NewIdentifier("test_topic_id")
- // Create a sample TcpFetchMessagesRequest
- request := TcpFetchMessagesRequest{
- Consumer: iggcon.NewSingleConsumer(consumerId),
+ consumerId, _ := NewIdentifier(uint32(42))
+ streamId, _ := NewIdentifier("test_stream_id")
+ topicId, _ := NewIdentifier("test_topic_id")
+ // Create a sample PollMessages
+ request := PollMessages{
+ Consumer: NewSingleConsumer(consumerId),
StreamId: streamId,
TopicId: topicId,
PartitionId: &partitionId,
- Strategy: iggcon.FirstPollingStrategy(),
+ Strategy: FirstPollingStrategy(),
Count: 100,
AutoCommit: true,
}
// Serialize the request
- serialized := request.Serialize()
+ serialized, err := request.MarshalBinary()
+ if err != nil {
+ t.Error(err)
+ }
// Expected serialized bytes based on the provided sample request
expected := []byte{
diff --git
a/foreign/go/binary_serialization/send_messages_request_serializer.go
b/foreign/go/contracts/send_messages.go
similarity index 62%
rename from foreign/go/binary_serialization/send_messages_request_serializer.go
rename to foreign/go/contracts/send_messages.go
index 2ee145d41..3effccfc3 100644
--- a/foreign/go/binary_serialization/send_messages_request_serializer.go
+++ b/foreign/go/contracts/send_messages.go
@@ -15,60 +15,74 @@
// specific language governing permissions and limitations
// under the License.
-package binaryserialization
+package iggcon
import (
"encoding/binary"
- iggcon "github.com/apache/iggy/foreign/go/contracts"
"github.com/klauspost/compress/s2"
)
-type TcpSendMessagesRequest struct {
- StreamId iggcon.Identifier `json:"streamId"`
- TopicId iggcon.Identifier `json:"topicId"`
- Partitioning iggcon.Partitioning `json:"partitioning"`
- Messages []iggcon.IggyMessage `json:"messages"`
+type SendMessages struct {
+ Compression IggyMessageCompression
+
+ StreamId Identifier `json:"streamId"`
+ TopicId Identifier `json:"topicId"`
+ Partitioning Partitioning `json:"partitioning"`
+ Messages []IggyMessage `json:"messages"`
}
const indexSize = 16
-func (request *TcpSendMessagesRequest) Serialize(compression
iggcon.IggyMessageCompression) []byte {
- for i, message := range request.Messages {
- switch compression {
- case iggcon.MESSAGE_COMPRESSION_S2:
+func (s *SendMessages) Code() CommandCode {
+ return SendMessagesCode
+}
+
+func (s *SendMessages) MarshalBinary() ([]byte, error) {
+ for i, message := range s.Messages {
+ switch s.Compression {
+ case MESSAGE_COMPRESSION_S2:
if len(message.Payload) < 32 {
break
}
- request.Messages[i].Payload = s2.Encode(nil,
message.Payload)
+ s.Messages[i].Payload = s2.Encode(nil, message.Payload)
message.Header.PayloadLength =
uint32(len(message.Payload))
- case iggcon.MESSAGE_COMPRESSION_S2_BETTER:
+ case MESSAGE_COMPRESSION_S2_BETTER:
if len(message.Payload) < 32 {
break
}
- request.Messages[i].Payload = s2.EncodeBetter(nil,
message.Payload)
+ s.Messages[i].Payload = s2.EncodeBetter(nil,
message.Payload)
message.Header.PayloadLength =
uint32(len(message.Payload))
- case iggcon.MESSAGE_COMPRESSION_S2_BEST:
+ case MESSAGE_COMPRESSION_S2_BEST:
if len(message.Payload) < 32 {
break
}
- request.Messages[i].Payload = s2.EncodeBest(nil,
message.Payload)
+ s.Messages[i].Payload = s2.EncodeBest(nil,
message.Payload)
message.Header.PayloadLength =
uint32(len(message.Payload))
}
}
- streamIdBytes := SerializeIdentifier(request.StreamId)
- topicIdBytes := SerializeIdentifier(request.TopicId)
- partitioningBytes := SerializePartitioning(request.Partitioning)
+ streamIdBytes, err := s.StreamId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ topicIdBytes, err := s.TopicId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ partitioningBytes, err := s.Partitioning.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
metadataLenFieldSize := 4 // uint32
- messageCount := len(request.Messages)
+ messageCount := len(s.Messages)
messagesCountFieldSize := 4 // uint32
metadataLen := len(streamIdBytes) +
len(topicIdBytes) +
len(partitioningBytes) +
messagesCountFieldSize
indexesSize := messageCount * indexSize
- messageBytesCount := calculateMessageBytesCount(request.Messages)
+ messageBytesCount := calculateMessageBytesCount(s.Messages)
totalSize := metadataLenFieldSize +
len(streamIdBytes) +
len(topicIdBytes) +
@@ -103,14 +117,14 @@ func (request *TcpSendMessagesRequest)
Serialize(compression iggcon.IggyMessageC
position += indexesSize
msgSize := uint32(0)
- for _, message := range request.Messages {
- copy(bytes[position:position+iggcon.MessageHeaderSize],
message.Header.ToBytes())
-
copy(bytes[position+iggcon.MessageHeaderSize:position+iggcon.MessageHeaderSize+int(message.Header.PayloadLength)],
message.Payload)
- position += iggcon.MessageHeaderSize +
int(message.Header.PayloadLength)
+ for _, message := range s.Messages {
+ copy(bytes[position:position+MessageHeaderSize],
message.Header.ToBytes())
+
copy(bytes[position+MessageHeaderSize:position+MessageHeaderSize+int(message.Header.PayloadLength)],
message.Payload)
+ position += MessageHeaderSize +
int(message.Header.PayloadLength)
copy(bytes[position:position+int(message.Header.UserHeaderLength)],
message.UserHeaders)
position += int(message.Header.UserHeaderLength)
- msgSize += iggcon.MessageHeaderSize +
message.Header.PayloadLength + message.Header.UserHeaderLength
+ msgSize += MessageHeaderSize + message.Header.PayloadLength +
message.Header.UserHeaderLength
binary.LittleEndian.PutUint32(bytes[currentIndexPosition:currentIndexPosition+4],
0)
binary.LittleEndian.PutUint32(bytes[currentIndexPosition+4:currentIndexPosition+8],
uint32(msgSize))
@@ -118,13 +132,13 @@ func (request *TcpSendMessagesRequest)
Serialize(compression iggcon.IggyMessageC
currentIndexPosition += indexSize
}
- return bytes
+ return bytes, nil
}
-func calculateMessageBytesCount(messages []iggcon.IggyMessage) int {
+func calculateMessageBytesCount(messages []IggyMessage) int {
count := 0
for _, msg := range messages {
- count += iggcon.MessageHeaderSize + len(msg.Payload) +
len(msg.UserHeaders)
+ count += MessageHeaderSize + len(msg.Payload) +
len(msg.UserHeaders)
}
return count
}
diff --git
a/foreign/go/binary_serialization/send_messages_request_serializer_test.go
b/foreign/go/contracts/send_messages_test.go
similarity index 70%
rename from
foreign/go/binary_serialization/send_messages_request_serializer_test.go
rename to foreign/go/contracts/send_messages_test.go
index c319e11ae..1ddf4f831 100644
--- a/foreign/go/binary_serialization/send_messages_request_serializer_test.go
+++ b/foreign/go/contracts/send_messages_test.go
@@ -15,30 +15,34 @@
// specific language governing permissions and limitations
// under the License.
-package binaryserialization
+package iggcon
import (
+ "bytes"
"testing"
- iggcon "github.com/apache/iggy/foreign/go/contracts"
"github.com/google/uuid"
)
func TestSerialize_SendMessagesRequest(t *testing.T) {
message1 := generateTestMessage("data1")
- streamId, _ := iggcon.NewIdentifier("test_stream_id")
- topicId, _ := iggcon.NewIdentifier("test_topic_id")
- request := TcpSendMessagesRequest{
+ streamId, _ := NewIdentifier("test_stream_id")
+ topicId, _ := NewIdentifier("test_topic_id")
+ request := SendMessages{
StreamId: streamId,
TopicId: topicId,
- Partitioning: iggcon.PartitionId(1),
- Messages: []iggcon.IggyMessage{
+ Partitioning: PartitionId(1),
+ Messages: []IggyMessage{
message1,
},
+ Compression: MESSAGE_COMPRESSION_NONE,
}
// Serialize the request
- serialized := request.Serialize(iggcon.MESSAGE_COMPRESSION_NONE)
+ serialized, err := request.MarshalBinary()
+ if err != nil {
+ t.Error(err)
+ }
// Expected serialized bytes based on the provided sample request
expected := []byte{
@@ -61,22 +65,22 @@ func TestSerialize_SendMessagesRequest(t *testing.T) {
expected = append(expected, message1.UserHeaders...)
// Check if the serialized bytes match the expected bytes
- if !areBytesEqual(serialized, expected) {
+ if !bytes.Equal(serialized, expected) {
t.Errorf("Serialized bytes are incorrect.
\nExpected:\t%v\nGot:\t\t%v", expected, serialized)
}
}
-func createDefaultMessageHeaders() []iggcon.HeaderEntry {
- return []iggcon.HeaderEntry{
- {Key: iggcon.HeaderKey{Kind: iggcon.String, Value:
[]byte("HeaderKey1")}, Value: iggcon.HeaderValue{Kind: iggcon.String, Value:
[]byte("Value 1")}},
- {Key: iggcon.HeaderKey{Kind: iggcon.String, Value:
[]byte("HeaderKey2")}, Value: iggcon.HeaderValue{Kind: iggcon.Uint32, Value:
[]byte{0x01, 0x02, 0x03, 0x04}}},
+func createDefaultMessageHeaders() []HeaderEntry {
+ return []HeaderEntry{
+ {Key: HeaderKey{Kind: String, Value: []byte("HeaderKey1")},
Value: HeaderValue{Kind: String, Value: []byte("Value 1")}},
+ {Key: HeaderKey{Kind: String, Value: []byte("HeaderKey2")},
Value: HeaderValue{Kind: Uint32, Value: []byte{0x01, 0x02, 0x03, 0x04}}},
}
}
-func generateTestMessage(payload string) iggcon.IggyMessage {
- msg, _ := iggcon.NewIggyMessage(
+func generateTestMessage(payload string) IggyMessage {
+ msg, _ := NewIggyMessage(
[]byte(payload),
- iggcon.WithID(uuid.New()),
- iggcon.WithUserHeaders(createDefaultMessageHeaders()))
+ WithID(uuid.New()),
+ WithUserHeaders(createDefaultMessageHeaders()))
return msg
}
diff --git a/foreign/go/binary_serialization/log_in_request_serializer.go
b/foreign/go/contracts/session.go
similarity index 62%
rename from foreign/go/binary_serialization/log_in_request_serializer.go
rename to foreign/go/contracts/session.go
index bd714be6b..4e62ab88c 100644
--- a/foreign/go/binary_serialization/log_in_request_serializer.go
+++ b/foreign/go/contracts/session.go
@@ -15,20 +15,22 @@
// specific language governing permissions and limitations
// under the License.
-package binaryserialization
+package iggcon
-import (
- "encoding/binary"
-)
+import "encoding/binary"
-type TcpLogInRequest struct {
+type LoginUser struct {
Username string `json:"username"`
Password string `json:"password"`
}
-func (request *TcpLogInRequest) Serialize() []byte {
- usernameBytes := []byte(request.Username)
- passwordBytes := []byte(request.Password)
+func (lu *LoginUser) Code() CommandCode {
+ return LoginUserCode
+}
+
+func (lu *LoginUser) MarshalBinary() ([]byte, error) {
+ usernameBytes := []byte(lu.Username)
+ passwordBytes := []byte(lu.Password)
versionBytes := []byte("")
contextBytes := []byte("")
@@ -62,5 +64,38 @@ func (request *TcpLogInRequest) Serialize() []byte {
position += 4
copy(result[position:], contextBytes)
- return result
+ return result, nil
+}
+
+type LoginWithPersonalAccessToken struct {
+ Token string `json:"token"`
+}
+
+func (lw *LoginWithPersonalAccessToken) Code() CommandCode {
+ return LoginWithAccessTokenCode
+}
+
+func (lw *LoginWithPersonalAccessToken) MarshalBinary() ([]byte, error) {
+ length := 1 + len(lw.Token)
+ bytes := make([]byte, length)
+ bytes[0] = byte(len(lw.Token))
+ copy(bytes[1:], lw.Token)
+ return bytes, nil
+}
+
+type IdentityInfo struct {
+ // Unique identifier (numeric) of the user.
+ UserId uint32 `json:"userId"`
+ // The optional tokens, used only by HTTP transport.
+ AccessToken *string `json:"accessToken"`
+}
+
+type LogoutUser struct{}
+
+func (lu *LogoutUser) Code() CommandCode {
+ return LogoutUserCode
+}
+
+func (lu *LogoutUser) MarshalBinary() ([]byte, error) {
+ return []byte{}, nil
}
diff --git a/foreign/go/binary_serialization/update_stream_serializer.go
b/foreign/go/contracts/update_stream.go
similarity index 69%
rename from foreign/go/binary_serialization/update_stream_serializer.go
rename to foreign/go/contracts/update_stream.go
index c6a95bc94..2da4869ad 100644
--- a/foreign/go/binary_serialization/update_stream_serializer.go
+++ b/foreign/go/contracts/update_stream.go
@@ -15,24 +15,26 @@
// specific language governing permissions and limitations
// under the License.
-package binaryserialization
+package iggcon
-import (
- iggcon "github.com/apache/iggy/foreign/go/contracts"
-)
-
-type TcpUpdateStreamRequest struct {
- StreamId iggcon.Identifier `json:"streamId"`
- Name string `json:"name"`
+type UpdateStream struct {
+ StreamId Identifier `json:"streamId"`
+ Name string `json:"name"`
}
-func (request *TcpUpdateStreamRequest) Serialize() []byte {
- streamIdBytes := SerializeIdentifier(request.StreamId)
- nameLength := len(request.Name)
+func (u *UpdateStream) Code() CommandCode {
+ return UpdateStreamCode
+}
+func (u *UpdateStream) MarshalBinary() ([]byte, error) {
+ streamIdBytes, err := u.StreamId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ nameLength := len(u.Name)
bytes := make([]byte, len(streamIdBytes)+1+nameLength)
copy(bytes[0:len(streamIdBytes)], streamIdBytes)
position := len(streamIdBytes)
bytes[position] = byte(nameLength)
- copy(bytes[position+1:], []byte(request.Name))
- return bytes
+ copy(bytes[position+1:], u.Name)
+ return bytes, nil
}
diff --git a/foreign/go/binary_serialization/update_stream_serializer_test.go
b/foreign/go/contracts/update_stream_test.go
similarity index 85%
rename from foreign/go/binary_serialization/update_stream_serializer_test.go
rename to foreign/go/contracts/update_stream_test.go
index ab0f22300..abb480bb0 100644
--- a/foreign/go/binary_serialization/update_stream_serializer_test.go
+++ b/foreign/go/contracts/update_stream_test.go
@@ -15,22 +15,24 @@
// specific language governing permissions and limitations
// under the License.
-package binaryserialization
+package iggcon
import (
+ "bytes"
"testing"
-
- iggcon "github.com/apache/iggy/foreign/go/contracts"
)
func TestSerialize_UpdateStream(t *testing.T) {
- streamId, _ := iggcon.NewIdentifier("stream")
- request := TcpUpdateStreamRequest{
+ streamId, _ := NewIdentifier("stream")
+ request := UpdateStream{
StreamId: streamId,
Name: "update_stream",
}
- serialized1 := request.Serialize()
+ serialized1, err := request.MarshalBinary()
+ if err != nil {
+ t.Errorf("Failed to serialize UpdateStream: %v", err)
+ }
expected := []byte{
0x02, // StreamId Kind (StringId)
@@ -40,7 +42,7 @@ func TestSerialize_UpdateStream(t *testing.T) {
0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x5F, 0x73, 0x74, 0x72,
0x65, 0x61, 0x6D, // Name ("update_stream")
}
- if !areBytesEqual(serialized1, expected) {
+ if !bytes.Equal(serialized1, expected) {
t.Errorf("Test case 1 failed. \nExpected:\t%v\nGot:\t\t%v",
expected, serialized1)
}
}
diff --git a/foreign/go/contracts/update_topic.go
b/foreign/go/contracts/update_topic.go
new file mode 100644
index 000000000..ef5de4863
--- /dev/null
+++ b/foreign/go/contracts/update_topic.go
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iggcon
+
+import (
+ "encoding/binary"
+)
+
+type UpdateTopic struct {
+ StreamId Identifier `json:"streamId"`
+ TopicId Identifier `json:"topicId"`
+ CompressionAlgorithm CompressionAlgorithm `json:"compressionAlgorithm"`
+ MessageExpiry Duration `json:"messageExpiry"`
+ MaxTopicSize uint64 `json:"maxTopicSize"`
+ ReplicationFactor *uint8 `json:"replicationFactor"`
+ Name string `json:"name"`
+}
+
+func (u *UpdateTopic) Code() CommandCode {
+ return UpdateTopicCode
+}
+
+func (u *UpdateTopic) MarshalBinary() ([]byte, error) {
+ if u.ReplicationFactor == nil {
+ u.ReplicationFactor = new(uint8)
+ }
+ streamIdBytes, err := u.StreamId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ topicIdBytes, err := u.TopicId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+
+ buffer := make([]byte,
19+len(streamIdBytes)+len(topicIdBytes)+len(u.Name))
+
+ offset := 0
+
+ offset += copy(buffer[offset:], streamIdBytes)
+ offset += copy(buffer[offset:], topicIdBytes)
+
+ buffer[offset] = byte(u.CompressionAlgorithm)
+ offset++
+
+ binary.LittleEndian.PutUint64(buffer[offset:], uint64(u.MessageExpiry))
+ offset += 8
+
+ binary.LittleEndian.PutUint64(buffer[offset:], u.MaxTopicSize)
+ offset += 8
+
+ buffer[offset] = *u.ReplicationFactor
+ offset++
+
+ buffer[offset] = uint8(len(u.Name))
+ offset++
+
+ copy(buffer[offset:], u.Name)
+
+ return buffer, nil
+}
diff --git a/foreign/go/binary_serialization/update_topic_serializer_test.go
b/foreign/go/contracts/update_topic_test.go
similarity index 85%
rename from foreign/go/binary_serialization/update_topic_serializer_test.go
rename to foreign/go/contracts/update_topic_test.go
index f9df22222..319b7b43c 100644
--- a/foreign/go/binary_serialization/update_topic_serializer_test.go
+++ b/foreign/go/contracts/update_topic_test.go
@@ -15,26 +15,28 @@
// specific language governing permissions and limitations
// under the License.
-package binaryserialization
+package iggcon
import (
+ "bytes"
"testing"
-
- iggcon "github.com/apache/iggy/foreign/go/contracts"
)
func TestSerialize_UpdateTopic(t *testing.T) {
- streamId, _ := iggcon.NewIdentifier("stream")
- topicId, _ := iggcon.NewIdentifier(uint32(1))
- request := TcpUpdateTopicRequest{
+ streamId, _ := NewIdentifier("stream")
+ topicId, _ := NewIdentifier(uint32(1))
+ request := UpdateTopic{
StreamId: streamId,
TopicId: topicId,
Name: "update_topic",
- MessageExpiry: 100 * iggcon.Microsecond,
+ MessageExpiry: 100 * Microsecond,
MaxTopicSize: 100,
}
- serialized1 := request.Serialize()
+ serialized1, err := request.MarshalBinary()
+ if err != nil {
+ t.Errorf("Failed to serialize UpdateTopic: %v", err)
+ }
expected := []byte{
0x02, // StreamId Kind (StringId)
@@ -51,7 +53,7 @@ func TestSerialize_UpdateTopic(t *testing.T) {
0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x5F, 0x74, 0x6F, 0x70,
0x69, 0x63, // Name ("update_topic")
}
- if !areBytesEqual(serialized1, expected) {
+ if !bytes.Equal(serialized1, expected) {
t.Errorf("Test case 1 failed. \nExpected:\t%v\nGot:\t\t%v",
expected, serialized1)
}
}
diff --git a/foreign/go/contracts/update_user.go
b/foreign/go/contracts/update_user.go
new file mode 100644
index 000000000..1dad2db14
--- /dev/null
+++ b/foreign/go/contracts/update_user.go
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iggcon
+
+type UpdateUser struct {
+ UserID Identifier `json:"-"`
+ Username *string `json:"username"`
+ Status *UserStatus `json:"userStatus"`
+}
+
+func (u *UpdateUser) Code() CommandCode {
+ return UpdateUserCode
+}
+
+func (u *UpdateUser) MarshalBinary() ([]byte, error) {
+ userIdBytes, err := u.UserID.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ length := len(userIdBytes)
+
+ if u.Username == nil {
+ u.Username = new(string)
+ }
+
+ username := *u.Username
+
+ if len(username) != 0 {
+ length += 2 + len(username)
+ }
+
+ if u.Status != nil {
+ length += 2
+ }
+
+ bytes := make([]byte, length+1)
+ position := 0
+
+ copy(bytes[position:position+len(userIdBytes)], userIdBytes)
+ position += len(userIdBytes)
+
+ if len(username) != 0 {
+ bytes[position] = 1
+ position++
+ bytes[position] = byte(len(username))
+ position++
+ copy(bytes[position:position+len(username)], username)
+ position += len(username)
+ } else {
+ bytes[position] = 0
+ position++
+ }
+
+ if u.Status != nil {
+ bytes[position] = 1
+ position++
+ statusByte := byte(0)
+ switch *u.Status {
+ case Active:
+ statusByte = 1
+ case Inactive:
+ statusByte = 2
+ }
+ bytes[position] = statusByte
+ } else {
+ bytes[position] = 0
+ }
+
+ return bytes, nil
+}
diff --git a/foreign/go/contracts/update_user_permissions.go
b/foreign/go/contracts/update_user_permissions.go
new file mode 100644
index 000000000..952bd14d0
--- /dev/null
+++ b/foreign/go/contracts/update_user_permissions.go
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iggcon
+
+import "encoding/binary"
+
+type UpdatePermissions struct {
+ UserID Identifier `json:"-"`
+ Permissions *Permissions `json:"Permissions,omitempty"`
+}
+
+func (u *UpdatePermissions) Code() CommandCode {
+ return UpdatePermissionsCode
+}
+
+func (u *UpdatePermissions) MarshalBinary() ([]byte, error) {
+ userIdBytes, err := u.UserID.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ length := len(userIdBytes)
+
+ if u.Permissions != nil {
+ length += 1 + 4 + u.Permissions.Size()
+ }
+
+ bytes := make([]byte, length)
+ position := 0
+
+ copy(bytes[position:position+len(userIdBytes)], userIdBytes)
+ position += len(userIdBytes)
+
+ if u.Permissions != nil {
+ bytes[position] = 1
+ position++
+ permissionsBytes, err := u.Permissions.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ binary.LittleEndian.PutUint32(bytes[position:position+4],
uint32(len(permissionsBytes)))
+ position += 4
+ copy(bytes[position:position+len(permissionsBytes)],
permissionsBytes)
+ } else {
+ bytes[position] = 0
+ }
+
+ return bytes, nil
+}
diff --git a/foreign/go/contracts/users.go b/foreign/go/contracts/users.go
index 3fcc051ce..4667dfc2c 100644
--- a/foreign/go/contracts/users.go
+++ b/foreign/go/contracts/users.go
@@ -17,29 +17,7 @@
package iggcon
-type ChangePasswordRequest struct {
- UserID Identifier `json:"-"`
- CurrentPassword string `json:"CurrentPassword"`
- NewPassword string `json:"NewPassword"`
-}
-
-type UpdatePermissionsRequest struct {
- UserID Identifier `json:"-"`
- Permissions *Permissions `json:"Permissions,omitempty"`
-}
-
-type UpdateUserRequest struct {
- UserID Identifier `json:"-"`
- Username *string `json:"username"`
- Status *UserStatus `json:"userStatus"`
-}
-
-type CreateUserRequest struct {
- Username string `json:"username"`
- Password string `json:"Password"`
- Status UserStatus `json:"Status"`
- Permissions *Permissions `json:"Permissions,omitempty"`
-}
+import "encoding/binary"
type UserInfo struct {
Id uint32 `json:"Id"`
@@ -65,6 +43,100 @@ type Permissions struct {
Streams map[int]*StreamPermissions `json:"Streams,omitempty"`
}
+func (p *Permissions) MarshalBinary() ([]byte, error) {
+ size := p.Size()
+ bytes := make([]byte, size)
+
+ bytes[0] = boolToByte(p.Global.ManageServers)
+ bytes[1] = boolToByte(p.Global.ReadServers)
+ bytes[2] = boolToByte(p.Global.ManageUsers)
+ bytes[3] = boolToByte(p.Global.ReadUsers)
+ bytes[4] = boolToByte(p.Global.ManageStreams)
+ bytes[5] = boolToByte(p.Global.ReadStreams)
+ bytes[6] = boolToByte(p.Global.ManageTopics)
+ bytes[7] = boolToByte(p.Global.ReadTopics)
+ bytes[8] = boolToByte(p.Global.PollMessages)
+ bytes[9] = boolToByte(p.Global.SendMessages)
+
+ position := 10
+
+ if p.Streams != nil {
+ bytes[position] = byte(1)
+ position += 1
+
+ for streamID, stream := range p.Streams {
+
binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(streamID))
+ position += 4
+
+ bytes[position] = boolToByte(stream.ManageStream)
+ bytes[position+1] = boolToByte(stream.ReadStream)
+ bytes[position+2] = boolToByte(stream.ManageTopics)
+ bytes[position+3] = boolToByte(stream.ReadTopics)
+ bytes[position+4] = boolToByte(stream.PollMessages)
+ bytes[position+5] = boolToByte(stream.SendMessages)
+ position += 6
+
+ if stream.Topics != nil {
+ bytes[position] = byte(1)
+ position += 1
+
+ for topicID, topic := range stream.Topics {
+
binary.LittleEndian.PutUint32(bytes[position:position+4], uint32(topicID))
+ position += 4
+
+ bytes[position] =
boolToByte(topic.ManageTopic)
+ bytes[position+1] =
boolToByte(topic.ReadTopic)
+ bytes[position+2] =
boolToByte(topic.PollMessages)
+ bytes[position+3] =
boolToByte(topic.SendMessages)
+ position += 4
+
+ bytes[position] = byte(0)
+ position += 1
+ }
+ } else {
+ bytes[position] = byte(0)
+ position += 1
+ }
+ }
+ } else {
+ bytes[0] = byte(0)
+ }
+
+ return bytes, nil
+}
+
+func (p *Permissions) Size() int {
+ size := 10
+
+ if p.Streams != nil {
+ size += 1
+
+ for _, stream := range p.Streams {
+ size += 4
+ size += 6
+ size += 1
+
+ if stream.Topics != nil {
+ size += 1
+ size += len(stream.Topics) * 9
+ } else {
+ size += 1
+ }
+ }
+ } else {
+ size += 1
+ }
+
+ return size
+}
+
+func boolToByte(b bool) byte {
+ if b {
+ return 1
+ }
+ return 0
+}
+
type GlobalPermissions struct {
ManageServers bool `json:"ManageServers"`
ReadServers bool `json:"ReadServers"`