This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 8c1d844e6 refactor(go): move command /foreign/go/internal/command
(#2881)
8c1d844e6 is described below
commit 8c1d844e6d44dc7388bcdf4169bbbce0223ec41c
Author: Chengxi Luo <[email protected]>
AuthorDate: Mon Mar 9 08:03:44 2026 -0400
refactor(go): move command /foreign/go/internal/command (#2881)
Follow-up to #2737
## Rationale
`foreign/go/contract` currently contains many different types, but they
serve different purposes, such as data definition, and commands.
This PR aims to separate command-related types from the `contract`
directory to improve maintainability and readability. In addition, this
separation will make it easier to implement tests for command
serialization methods.
---
foreign/go/client/tcp/cluster.go | 7 +-
.../go/client/tcp/tcp_access_token_management.go | 7 +-
foreign/go/client/tcp/tcp_clients_management.go | 5 +-
.../go/client/tcp/tcp_consumer_group_management.go | 23 +--
foreign/go/client/tcp/tcp_core.go | 7 +-
foreign/go/client/tcp/tcp_messaging.go | 5 +-
foreign/go/client/tcp/tcp_offset_management.go | 7 +-
foreign/go/client/tcp/tcp_partition_management.go | 5 +-
foreign/go/client/tcp/tcp_session_management.go | 7 +-
foreign/go/client/tcp/tcp_stream_management.go | 11 +-
foreign/go/client/tcp/tcp_topic_management.go | 11 +-
foreign/go/client/tcp/tcp_user_management.go | 15 +-
foreign/go/client/tcp/tcp_utilities.go | 5 +-
foreign/go/contracts/access_tokens.go | 45 -----
foreign/go/contracts/change_password.go | 52 ------
foreign/go/contracts/cluster.go | 29 ----
foreign/go/contracts/command_codes.go | 71 --------
foreign/go/contracts/consumer_groups.go | 97 -----------
foreign/go/contracts/create_stream.go | 39 -----
foreign/go/contracts/create_topic.go | 91 ----------
foreign/go/contracts/create_user.go | 77 ---------
foreign/go/contracts/delete_stream.go | 30 ----
foreign/go/contracts/delete_topic.go | 31 ----
foreign/go/contracts/get_stats.go | 28 ---
foreign/go/contracts/get_stream.go | 30 ----
foreign/go/contracts/get_streams.go | 28 ---
foreign/go/contracts/get_topic.go | 31 ----
foreign/go/contracts/get_topics.go | 30 ----
foreign/go/contracts/get_user.go | 30 ----
foreign/go/contracts/identifier.go | 2 +-
.../contracts/{delete_user.go => identity_info.go} | 15 +-
foreign/go/contracts/{get_users.go => offset.go} | 12 +-
foreign/go/contracts/partitions.go | 60 -------
foreign/go/contracts/poll_messages.go | 93 ----------
foreign/go/contracts/poll_messages_test.go | 85 ---------
foreign/go/contracts/send_messages_test.go | 86 ---------
foreign/go/contracts/update_stream_test.go | 48 ------
foreign/go/contracts/update_topic.go | 76 --------
foreign/go/contracts/update_user_permissions.go | 63 -------
.../command/access_token.go} | 40 ++---
foreign/go/internal/command/code.go | 71 ++++++++
.../ping.go => internal/command/command.go} | 15 +-
.../command/consumer_group.go} | 94 ++++------
.../command/message.go} | 107 ++++++++++--
foreign/go/internal/command/message_test.go | 150 ++++++++++++++++
.../offsets.go => internal/command/offset.go} | 44 +++--
foreign/go/internal/command/partition.go | 84 +++++++++
.../go/{contracts => internal/command}/session.go | 19 +-
.../command/stream.go} | 66 ++++++-
.../command/stream_test.go} | 30 +++-
.../command.go => internal/command/system.go} | 57 ++++--
foreign/go/internal/command/topic.go | 185 ++++++++++++++++++++
.../command/topic_test.go} | 10 +-
.../{contracts => internal/command}/update_user.go | 16 +-
foreign/go/internal/command/user.go | 192 +++++++++++++++++++++
55 files changed, 1077 insertions(+), 1497 deletions(-)
diff --git a/foreign/go/client/tcp/cluster.go b/foreign/go/client/tcp/cluster.go
index f5d6a1d39..af3e33250 100644
--- a/foreign/go/client/tcp/cluster.go
+++ b/foreign/go/client/tcp/cluster.go
@@ -17,10 +17,13 @@
package tcp
-import iggcon "github.com/apache/iggy/foreign/go/contracts"
+import (
+ iggcon "github.com/apache/iggy/foreign/go/contracts"
+ "github.com/apache/iggy/foreign/go/internal/command"
+)
func (c *IggyTcpClient) GetClusterMetadata() (*iggcon.ClusterMetadata, error) {
- response, err := c.do(&iggcon.GetClusterMetadata{})
+ response, err := c.do(&command.GetClusterMetadata{})
if err != nil {
return nil, err
}
diff --git a/foreign/go/client/tcp/tcp_access_token_management.go
b/foreign/go/client/tcp/tcp_access_token_management.go
index fdb487644..68d14a331 100644
--- a/foreign/go/client/tcp/tcp_access_token_management.go
+++ b/foreign/go/client/tcp/tcp_access_token_management.go
@@ -20,10 +20,11 @@ package tcp
import (
binaryserialization
"github.com/apache/iggy/foreign/go/binary_serialization"
iggcon "github.com/apache/iggy/foreign/go/contracts"
+ "github.com/apache/iggy/foreign/go/internal/command"
)
func (c *IggyTcpClient) CreatePersonalAccessToken(name string, expiry uint32)
(*iggcon.RawPersonalAccessToken, error) {
- buffer, err := c.do(&iggcon.CreatePersonalAccessToken{
+ buffer, err := c.do(&command.CreatePersonalAccessToken{
Name: name,
Expiry: expiry,
})
@@ -35,14 +36,14 @@ func (c *IggyTcpClient) CreatePersonalAccessToken(name
string, expiry uint32) (*
}
func (c *IggyTcpClient) DeletePersonalAccessToken(name string) error {
- _, err := c.do(&iggcon.DeletePersonalAccessToken{
+ _, err := c.do(&command.DeletePersonalAccessToken{
Name: name,
})
return err
}
func (c *IggyTcpClient) GetPersonalAccessTokens()
([]iggcon.PersonalAccessTokenInfo, error) {
- buffer, err := c.do(&iggcon.GetPersonalAccessTokens{})
+ buffer, err := c.do(&command.GetPersonalAccessTokens{})
if err != nil {
return nil, err
}
diff --git a/foreign/go/client/tcp/tcp_clients_management.go
b/foreign/go/client/tcp/tcp_clients_management.go
index cb93d66e8..81f0427ab 100644
--- a/foreign/go/client/tcp/tcp_clients_management.go
+++ b/foreign/go/client/tcp/tcp_clients_management.go
@@ -20,10 +20,11 @@ package tcp
import (
binaryserialization
"github.com/apache/iggy/foreign/go/binary_serialization"
iggcon "github.com/apache/iggy/foreign/go/contracts"
+ "github.com/apache/iggy/foreign/go/internal/command"
)
func (c *IggyTcpClient) GetClients() ([]iggcon.ClientInfo, error) {
- buffer, err := c.do(&iggcon.GetClients{})
+ buffer, err := c.do(&command.GetClients{})
if err != nil {
return nil, err
}
@@ -32,7 +33,7 @@ func (c *IggyTcpClient) GetClients() ([]iggcon.ClientInfo,
error) {
}
func (c *IggyTcpClient) GetClient(clientId uint32) (*iggcon.ClientInfoDetails,
error) {
- buffer, err := c.do(&iggcon.GetClient{ClientID: clientId})
+ buffer, err := c.do(&command.GetClient{ClientID: clientId})
if err != nil {
return nil, err
}
diff --git a/foreign/go/client/tcp/tcp_consumer_group_management.go
b/foreign/go/client/tcp/tcp_consumer_group_management.go
index deec1fe91..131dd1afb 100644
--- a/foreign/go/client/tcp/tcp_consumer_group_management.go
+++ b/foreign/go/client/tcp/tcp_consumer_group_management.go
@@ -21,10 +21,11 @@ import (
binaryserialization
"github.com/apache/iggy/foreign/go/binary_serialization"
iggcon "github.com/apache/iggy/foreign/go/contracts"
ierror "github.com/apache/iggy/foreign/go/errors"
+ "github.com/apache/iggy/foreign/go/internal/command"
)
func (c *IggyTcpClient) GetConsumerGroups(streamId, topicId iggcon.Identifier)
([]iggcon.ConsumerGroup, error) {
- buffer, err := c.do(&iggcon.GetConsumerGroups{
+ buffer, err := c.do(&command.GetConsumerGroups{
StreamId: streamId,
TopicId: topicId,
})
@@ -36,8 +37,8 @@ func (c *IggyTcpClient) GetConsumerGroups(streamId, topicId
iggcon.Identifier) (
}
func (c *IggyTcpClient) GetConsumerGroup(streamId, topicId, groupId
iggcon.Identifier) (*iggcon.ConsumerGroupDetails, error) {
- buffer, err := c.do(&iggcon.GetConsumerGroup{
- TopicPath: iggcon.TopicPath{
+ buffer, err := c.do(&command.GetConsumerGroup{
+ TopicPath: command.TopicPath{
StreamId: streamId,
TopicId: topicId,
},
@@ -58,8 +59,8 @@ func (c *IggyTcpClient) CreateConsumerGroup(streamId
iggcon.Identifier, topicId
if MaxStringLength < len(name) || len(name) == 0 {
return nil, ierror.ErrInvalidConsumerGroupName
}
- buffer, err := c.do(&iggcon.CreateConsumerGroup{
- TopicPath: iggcon.TopicPath{
+ buffer, err := c.do(&command.CreateConsumerGroup{
+ TopicPath: command.TopicPath{
StreamId: streamId,
TopicId: topicId,
},
@@ -73,8 +74,8 @@ func (c *IggyTcpClient) CreateConsumerGroup(streamId
iggcon.Identifier, topicId
}
func (c *IggyTcpClient) DeleteConsumerGroup(streamId iggcon.Identifier,
topicId iggcon.Identifier, groupId iggcon.Identifier) error {
- _, err := c.do(&iggcon.DeleteConsumerGroup{
- TopicPath: iggcon.TopicPath{
+ _, err := c.do(&command.DeleteConsumerGroup{
+ TopicPath: command.TopicPath{
StreamId: streamId,
TopicId: topicId,
},
@@ -84,8 +85,8 @@ func (c *IggyTcpClient) DeleteConsumerGroup(streamId
iggcon.Identifier, topicId
}
func (c *IggyTcpClient) JoinConsumerGroup(streamId iggcon.Identifier, topicId
iggcon.Identifier, groupId iggcon.Identifier) error {
- _, err := c.do(&iggcon.JoinConsumerGroup{
- TopicPath: iggcon.TopicPath{
+ _, err := c.do(&command.JoinConsumerGroup{
+ TopicPath: command.TopicPath{
StreamId: streamId,
TopicId: topicId,
},
@@ -95,8 +96,8 @@ func (c *IggyTcpClient) JoinConsumerGroup(streamId
iggcon.Identifier, topicId ig
}
func (c *IggyTcpClient) LeaveConsumerGroup(streamId iggcon.Identifier, topicId
iggcon.Identifier, groupId iggcon.Identifier) error {
- _, err := c.do(&iggcon.LeaveConsumerGroup{
- TopicPath: iggcon.TopicPath{
+ _, err := c.do(&command.LeaveConsumerGroup{
+ TopicPath: command.TopicPath{
StreamId: streamId,
TopicId: topicId,
},
diff --git a/foreign/go/client/tcp/tcp_core.go
b/foreign/go/client/tcp/tcp_core.go
index c5a74b772..02128fb3f 100644
--- a/foreign/go/client/tcp/tcp_core.go
+++ b/foreign/go/client/tcp/tcp_core.go
@@ -30,6 +30,7 @@ import (
iggcon "github.com/apache/iggy/foreign/go/contracts"
ierror "github.com/apache/iggy/foreign/go/errors"
+ "github.com/apache/iggy/foreign/go/internal/command"
"github.com/avast/retry-go"
)
@@ -237,7 +238,7 @@ func (c *IggyTcpClient) write(payload []byte) (int, error) {
}
// do sends the command to the Iggy server and returns the response.
-func (c *IggyTcpClient) do(cmd iggcon.Command) ([]byte, error) {
+func (c *IggyTcpClient) do(cmd command.Command) ([]byte, error) {
data, err := cmd.MarshalBinary()
if err != nil {
return nil, err
@@ -245,7 +246,7 @@ func (c *IggyTcpClient) do(cmd iggcon.Command) ([]byte,
error) {
return c.sendAndFetchResponse(data, cmd.Code())
}
-func (c *IggyTcpClient) sendAndFetchResponse(message []byte, command
iggcon.CommandCode) ([]byte, error) {
+func (c *IggyTcpClient) sendAndFetchResponse(message []byte, command
command.Code) ([]byte, error) {
c.mtx.Lock()
defer c.mtx.Unlock()
@@ -280,7 +281,7 @@ func (c *IggyTcpClient) sendAndFetchResponse(message
[]byte, command iggcon.Comm
return buffer, nil
}
-func createPayload(message []byte, command iggcon.CommandCode) []byte {
+func createPayload(message []byte, command command.Code) []byte {
messageLength := len(message) + 4
messageBytes := make([]byte, RequestInitialBytesLength+messageLength)
binary.LittleEndian.PutUint32(messageBytes[:4], uint32(messageLength))
diff --git a/foreign/go/client/tcp/tcp_messaging.go
b/foreign/go/client/tcp/tcp_messaging.go
index 22a5d008b..bc8174d92 100644
--- a/foreign/go/client/tcp/tcp_messaging.go
+++ b/foreign/go/client/tcp/tcp_messaging.go
@@ -21,6 +21,7 @@ import (
binaryserialization
"github.com/apache/iggy/foreign/go/binary_serialization"
iggcon "github.com/apache/iggy/foreign/go/contracts"
ierror "github.com/apache/iggy/foreign/go/errors"
+ "github.com/apache/iggy/foreign/go/internal/command"
)
func (c *IggyTcpClient) SendMessages(
@@ -36,7 +37,7 @@ func (c *IggyTcpClient) SendMessages(
if len(messages) == 0 {
return ierror.ErrInvalidMessagesCount
}
- _, err := c.do(&iggcon.SendMessages{
+ _, err := c.do(&command.SendMessages{
Compression: c.MessageCompression,
StreamId: streamId,
TopicId: topicId,
@@ -55,7 +56,7 @@ func (c *IggyTcpClient) PollMessages(
autoCommit bool,
partitionId *uint32,
) (*iggcon.PolledMessage, error) {
- buffer, err := c.do(&iggcon.PollMessages{
+ buffer, err := c.do(&command.PollMessages{
StreamId: streamId,
TopicId: topicId,
Consumer: consumer,
diff --git a/foreign/go/client/tcp/tcp_offset_management.go
b/foreign/go/client/tcp/tcp_offset_management.go
index cec3c2c89..bc07582d9 100644
--- a/foreign/go/client/tcp/tcp_offset_management.go
+++ b/foreign/go/client/tcp/tcp_offset_management.go
@@ -20,10 +20,11 @@ package tcp
import (
binaryserialization
"github.com/apache/iggy/foreign/go/binary_serialization"
iggcon "github.com/apache/iggy/foreign/go/contracts"
+ "github.com/apache/iggy/foreign/go/internal/command"
)
func (c *IggyTcpClient) GetConsumerOffset(consumer iggcon.Consumer, streamId
iggcon.Identifier, topicId iggcon.Identifier, partitionId *uint32)
(*iggcon.ConsumerOffsetInfo, error) {
- buffer, err := c.do(&iggcon.GetConsumerOffset{
+ buffer, err := c.do(&command.GetConsumerOffset{
StreamId: streamId,
TopicId: topicId,
Consumer: consumer,
@@ -37,7 +38,7 @@ func (c *IggyTcpClient) GetConsumerOffset(consumer
iggcon.Consumer, streamId igg
}
func (c *IggyTcpClient) StoreConsumerOffset(consumer iggcon.Consumer, streamId
iggcon.Identifier, topicId iggcon.Identifier, offset uint64, partitionId
*uint32) error {
- _, err := c.do(&iggcon.StoreConsumerOffsetRequest{
+ _, err := c.do(&command.StoreConsumerOffsetRequest{
StreamId: streamId,
TopicId: topicId,
Offset: offset,
@@ -48,7 +49,7 @@ func (c *IggyTcpClient) StoreConsumerOffset(consumer
iggcon.Consumer, streamId i
}
func (c *IggyTcpClient) DeleteConsumerOffset(consumer iggcon.Consumer,
streamId iggcon.Identifier, topicId iggcon.Identifier, partitionId *uint32)
error {
- _, err := c.do(&iggcon.DeleteConsumerOffset{
+ _, err := c.do(&command.DeleteConsumerOffset{
Consumer: consumer,
StreamId: streamId,
TopicId: topicId,
diff --git a/foreign/go/client/tcp/tcp_partition_management.go
b/foreign/go/client/tcp/tcp_partition_management.go
index 5ed117164..e42c3be68 100644
--- a/foreign/go/client/tcp/tcp_partition_management.go
+++ b/foreign/go/client/tcp/tcp_partition_management.go
@@ -19,10 +19,11 @@ package tcp
import (
iggcon "github.com/apache/iggy/foreign/go/contracts"
+ "github.com/apache/iggy/foreign/go/internal/command"
)
func (c *IggyTcpClient) CreatePartitions(streamId iggcon.Identifier, topicId
iggcon.Identifier, partitionsCount uint32) error {
- _, err := c.do(&iggcon.CreatePartitions{
+ _, err := c.do(&command.CreatePartitions{
StreamId: streamId,
TopicId: topicId,
PartitionsCount: partitionsCount,
@@ -31,7 +32,7 @@ func (c *IggyTcpClient) CreatePartitions(streamId
iggcon.Identifier, topicId igg
}
func (c *IggyTcpClient) DeletePartitions(streamId iggcon.Identifier, topicId
iggcon.Identifier, partitionsCount uint32) error {
- _, err := c.do(&iggcon.DeletePartitions{
+ _, err := c.do(&command.DeletePartitions{
StreamId: streamId,
TopicId: topicId,
PartitionsCount: partitionsCount,
diff --git a/foreign/go/client/tcp/tcp_session_management.go
b/foreign/go/client/tcp/tcp_session_management.go
index 8b32b522e..5ec47805d 100644
--- a/foreign/go/client/tcp/tcp_session_management.go
+++ b/foreign/go/client/tcp/tcp_session_management.go
@@ -21,13 +21,14 @@ import (
"time"
binaryserialization
"github.com/apache/iggy/foreign/go/binary_serialization"
+ "github.com/apache/iggy/foreign/go/internal/command"
"github.com/apache/iggy/foreign/go/internal/util"
iggcon "github.com/apache/iggy/foreign/go/contracts"
)
func (c *IggyTcpClient) LoginUser(username string, password string)
(*iggcon.IdentityInfo, error) {
- buffer, err := c.do(&iggcon.LoginUser{
+ buffer, err := c.do(&command.LoginUser{
Username: username,
Password: password,
})
@@ -50,7 +51,7 @@ func (c *IggyTcpClient) LoginUser(username string, password
string) (*iggcon.Ide
}
func (c *IggyTcpClient) LoginWithPersonalAccessToken(token string)
(*iggcon.IdentityInfo, error) {
- buffer, err := c.do(&iggcon.LoginWithPersonalAccessToken{
+ buffer, err := c.do(&command.LoginWithPersonalAccessToken{
Token: token,
})
if err != nil {
@@ -72,7 +73,7 @@ func (c *IggyTcpClient) LoginWithPersonalAccessToken(token
string) (*iggcon.Iden
}
func (c *IggyTcpClient) LogoutUser() error {
- _, err := c.do(&iggcon.LogoutUser{})
+ _, err := c.do(&command.LogoutUser{})
return err
}
diff --git a/foreign/go/client/tcp/tcp_stream_management.go
b/foreign/go/client/tcp/tcp_stream_management.go
index acc5b6202..9fbba5b4c 100644
--- a/foreign/go/client/tcp/tcp_stream_management.go
+++ b/foreign/go/client/tcp/tcp_stream_management.go
@@ -21,10 +21,11 @@ import (
binaryserialization
"github.com/apache/iggy/foreign/go/binary_serialization"
iggcon "github.com/apache/iggy/foreign/go/contracts"
ierror "github.com/apache/iggy/foreign/go/errors"
+ "github.com/apache/iggy/foreign/go/internal/command"
)
func (c *IggyTcpClient) GetStreams() ([]iggcon.Stream, error) {
- buffer, err := c.do(&iggcon.GetStreams{})
+ buffer, err := c.do(&command.GetStreams{})
if err != nil {
return nil, err
}
@@ -33,7 +34,7 @@ func (c *IggyTcpClient) GetStreams() ([]iggcon.Stream, error)
{
}
func (c *IggyTcpClient) GetStream(streamId iggcon.Identifier)
(*iggcon.StreamDetails, error) {
- buffer, err := c.do(&iggcon.GetStream{
+ buffer, err := c.do(&command.GetStream{
StreamId: streamId,
})
if err != nil {
@@ -55,7 +56,7 @@ func (c *IggyTcpClient) CreateStream(name string)
(*iggcon.StreamDetails, error)
if len(name) == 0 || MaxStringLength < len(name) {
return nil, ierror.ErrInvalidStreamName
}
- buffer, err := c.do(&iggcon.CreateStream{Name: name})
+ buffer, err := c.do(&command.CreateStream{Name: name})
if err != nil {
return nil, err
}
@@ -71,11 +72,11 @@ func (c *IggyTcpClient) UpdateStream(streamId
iggcon.Identifier, name string) er
if len(name) > MaxStringLength || len(name) == 0 {
return ierror.ErrInvalidStreamName
}
- _, err := c.do(&iggcon.UpdateStream{StreamId: streamId, Name: name})
+ _, err := c.do(&command.UpdateStream{StreamId: streamId, Name: name})
return err
}
func (c *IggyTcpClient) DeleteStream(id iggcon.Identifier) error {
- _, err := c.do(&iggcon.DeleteStream{StreamId: id})
+ _, err := c.do(&command.DeleteStream{StreamId: id})
return err
}
diff --git a/foreign/go/client/tcp/tcp_topic_management.go
b/foreign/go/client/tcp/tcp_topic_management.go
index 0c69ac8e9..6bd007aa2 100644
--- a/foreign/go/client/tcp/tcp_topic_management.go
+++ b/foreign/go/client/tcp/tcp_topic_management.go
@@ -21,10 +21,11 @@ import (
binaryserialization
"github.com/apache/iggy/foreign/go/binary_serialization"
iggcon "github.com/apache/iggy/foreign/go/contracts"
ierror "github.com/apache/iggy/foreign/go/errors"
+ "github.com/apache/iggy/foreign/go/internal/command"
)
func (c *IggyTcpClient) GetTopics(streamId iggcon.Identifier) ([]iggcon.Topic,
error) {
- buffer, err := c.do(&iggcon.GetTopics{StreamId: streamId})
+ buffer, err := c.do(&command.GetTopics{StreamId: streamId})
if err != nil {
return nil, err
}
@@ -33,7 +34,7 @@ func (c *IggyTcpClient) GetTopics(streamId iggcon.Identifier)
([]iggcon.Topic, e
}
func (c *IggyTcpClient) GetTopic(streamId iggcon.Identifier, topicId
iggcon.Identifier) (*iggcon.TopicDetails, error) {
- buffer, err := c.do(&iggcon.GetTopic{StreamId: streamId, TopicId:
topicId})
+ buffer, err := c.do(&command.GetTopic{StreamId: streamId, TopicId:
topicId})
if err != nil {
return nil, err
}
@@ -68,7 +69,7 @@ func (c *IggyTcpClient) CreateTopic(
return nil, ierror.ErrInvalidReplicationFactor
}
- buffer, err := c.do(&iggcon.CreateTopic{
+ buffer, err := c.do(&command.CreateTopic{
StreamId: streamId,
Name: name,
PartitionsCount: partitionsCount,
@@ -99,7 +100,7 @@ func (c *IggyTcpClient) UpdateTopic(
if replicationFactor != nil && *replicationFactor == 0 {
return ierror.ErrInvalidReplicationFactor
}
- _, err := c.do(&iggcon.UpdateTopic{
+ _, err := c.do(&command.UpdateTopic{
StreamId: streamId,
TopicId: topicId,
CompressionAlgorithm: compressionAlgorithm,
@@ -112,6 +113,6 @@ func (c *IggyTcpClient) UpdateTopic(
}
func (c *IggyTcpClient) DeleteTopic(streamId, topicId iggcon.Identifier) error
{
- _, err := c.do(&iggcon.DeleteTopic{StreamId: streamId, TopicId:
topicId})
+ _, err := c.do(&command.DeleteTopic{StreamId: streamId, TopicId:
topicId})
return err
}
diff --git a/foreign/go/client/tcp/tcp_user_management.go
b/foreign/go/client/tcp/tcp_user_management.go
index c9836ebf1..a3575d03c 100644
--- a/foreign/go/client/tcp/tcp_user_management.go
+++ b/foreign/go/client/tcp/tcp_user_management.go
@@ -21,10 +21,11 @@ import (
binaryserialization
"github.com/apache/iggy/foreign/go/binary_serialization"
iggcon "github.com/apache/iggy/foreign/go/contracts"
ierror "github.com/apache/iggy/foreign/go/errors"
+ "github.com/apache/iggy/foreign/go/internal/command"
)
func (c *IggyTcpClient) GetUser(identifier iggcon.Identifier)
(*iggcon.UserInfoDetails, error) {
- buffer, err := c.do(&iggcon.GetUser{Id: identifier})
+ buffer, err := c.do(&command.GetUser{Id: identifier})
if err != nil {
return nil, err
}
@@ -36,7 +37,7 @@ func (c *IggyTcpClient) GetUser(identifier iggcon.Identifier)
(*iggcon.UserInfoD
}
func (c *IggyTcpClient) GetUsers() ([]iggcon.UserInfo, error) {
- buffer, err := c.do(&iggcon.GetUsers{})
+ buffer, err := c.do(&command.GetUsers{})
if err != nil {
return nil, err
}
@@ -45,7 +46,7 @@ func (c *IggyTcpClient) GetUsers() ([]iggcon.UserInfo, error)
{
}
func (c *IggyTcpClient) CreateUser(username string, password string, status
iggcon.UserStatus, permissions *iggcon.Permissions) (*iggcon.UserInfoDetails,
error) {
- buffer, err := c.do(&iggcon.CreateUser{
+ buffer, err := c.do(&command.CreateUser{
Username: username,
Password: password,
Status: status,
@@ -62,7 +63,7 @@ func (c *IggyTcpClient) CreateUser(username string, password
string, status iggc
}
func (c *IggyTcpClient) UpdateUser(userID iggcon.Identifier, username *string,
status *iggcon.UserStatus) error {
- _, err := c.do(&iggcon.UpdateUser{
+ _, err := c.do(&command.UpdateUser{
UserID: userID,
Username: username,
Status: status,
@@ -71,14 +72,14 @@ func (c *IggyTcpClient) UpdateUser(userID
iggcon.Identifier, username *string, s
}
func (c *IggyTcpClient) DeleteUser(identifier iggcon.Identifier) error {
- _, err := c.do(&iggcon.DeleteUser{
+ _, err := c.do(&command.DeleteUser{
Id: identifier,
})
return err
}
func (c *IggyTcpClient) UpdatePermissions(userID iggcon.Identifier,
permissions *iggcon.Permissions) error {
- _, err := c.do(&iggcon.UpdatePermissions{
+ _, err := c.do(&command.UpdatePermissions{
UserID: userID,
Permissions: permissions,
})
@@ -86,7 +87,7 @@ func (c *IggyTcpClient) UpdatePermissions(userID
iggcon.Identifier, permissions
}
func (c *IggyTcpClient) ChangePassword(userID iggcon.Identifier,
currentPassword string, newPassword string) error {
- _, err := c.do(&iggcon.ChangePassword{
+ _, err := c.do(&command.ChangePassword{
UserID: userID,
CurrentPassword: currentPassword,
NewPassword: newPassword,
diff --git a/foreign/go/client/tcp/tcp_utilities.go
b/foreign/go/client/tcp/tcp_utilities.go
index bd5beaef3..0da1784cb 100644
--- a/foreign/go/client/tcp/tcp_utilities.go
+++ b/foreign/go/client/tcp/tcp_utilities.go
@@ -20,10 +20,11 @@ package tcp
import (
binaryserialization
"github.com/apache/iggy/foreign/go/binary_serialization"
iggcon "github.com/apache/iggy/foreign/go/contracts"
+ "github.com/apache/iggy/foreign/go/internal/command"
)
func (c *IggyTcpClient) GetStats() (*iggcon.Stats, error) {
- buffer, err := c.do(&iggcon.GetStats{})
+ buffer, err := c.do(&command.GetStats{})
if err != nil {
return nil, err
}
@@ -35,6 +36,6 @@ func (c *IggyTcpClient) GetStats() (*iggcon.Stats, error) {
}
func (c *IggyTcpClient) Ping() error {
- _, err := c.do(&iggcon.Ping{})
+ _, err := c.do(&command.Ping{})
return err
}
diff --git a/foreign/go/contracts/access_tokens.go
b/foreign/go/contracts/access_tokens.go
index edb9b4ef3..85fc1713f 100644
--- a/foreign/go/contracts/access_tokens.go
+++ b/foreign/go/contracts/access_tokens.go
@@ -18,54 +18,9 @@
package iggcon
import (
- "encoding/binary"
"time"
)
-type CreatePersonalAccessToken struct {
- Name string `json:"Name"`
- Expiry uint32 `json:"Expiry"`
-}
-
-func (c *CreatePersonalAccessToken) Code() CommandCode {
- return CreateAccessTokenCode
-}
-
-func (c *CreatePersonalAccessToken) MarshalBinary() ([]byte, error) {
- length := 1 + len(c.Name) + 8
- bytes := make([]byte, length)
- bytes[0] = byte(len(c.Name))
- copy(bytes[1:], c.Name)
- binary.LittleEndian.PutUint32(bytes[len(bytes)-4:], c.Expiry)
- return bytes, nil
-}
-
-type DeletePersonalAccessToken struct {
- Name string `json:"Name"`
-}
-
-func (d *DeletePersonalAccessToken) Code() CommandCode {
- return DeleteAccessTokenCode
-}
-
-func (d *DeletePersonalAccessToken) MarshalBinary() ([]byte, error) {
- length := 1 + len(d.Name)
- bytes := make([]byte, length)
- bytes[0] = byte(len(d.Name))
- copy(bytes[1:], d.Name)
- return bytes, nil
-}
-
-type GetPersonalAccessTokens struct{}
-
-func (g *GetPersonalAccessTokens) Code() CommandCode {
- return GetAccessTokensCode
-}
-
-func (g *GetPersonalAccessTokens) MarshalBinary() ([]byte, error) {
- return []byte{}, nil
-}
-
type PersonalAccessTokenInfo struct {
Name string `json:"Name"`
Expiry *time.Time `json:"Expiry"`
diff --git a/foreign/go/contracts/change_password.go
b/foreign/go/contracts/change_password.go
deleted file mode 100644
index f2c8c1d85..000000000
--- a/foreign/go/contracts/change_password.go
+++ /dev/null
@@ -1,52 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-type ChangePassword struct {
- UserID Identifier `json:"-"`
- CurrentPassword string `json:"CurrentPassword"`
- NewPassword string `json:"NewPassword"`
-}
-
-func (c *ChangePassword) Code() CommandCode {
- return ChangePasswordCode
-}
-
-func (c *ChangePassword) MarshalBinary() ([]byte, error) {
- userIdBytes, err := c.UserID.MarshalBinary()
- if err != nil {
- return nil, err
- }
- length := len(userIdBytes) + len(c.CurrentPassword) +
len(c.NewPassword) + 2
- bytes := make([]byte, length)
- position := 0
-
- copy(bytes[position:position+len(userIdBytes)], userIdBytes)
- position += len(userIdBytes)
-
- bytes[position] = byte(len(c.CurrentPassword))
- position++
- copy(bytes[position:position+len(c.CurrentPassword)], c.CurrentPassword)
- position += len(c.CurrentPassword)
-
- bytes[position] = byte(len(c.NewPassword))
- position++
- copy(bytes[position:position+len(c.NewPassword)], c.NewPassword)
-
- return bytes, nil
-}
diff --git a/foreign/go/contracts/cluster.go b/foreign/go/contracts/cluster.go
deleted file mode 100644
index 2e8e9808f..000000000
--- a/foreign/go/contracts/cluster.go
+++ /dev/null
@@ -1,29 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-type GetClusterMetadata struct {
-}
-
-func (m *GetClusterMetadata) Code() CommandCode {
- return GetClusterMetadataCode
-}
-
-func (m *GetClusterMetadata) MarshalBinary() ([]byte, error) {
- return []byte{}, nil
-}
diff --git a/foreign/go/contracts/command_codes.go
b/foreign/go/contracts/command_codes.go
deleted file mode 100644
index 2340360e3..000000000
--- a/foreign/go/contracts/command_codes.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-type CommandCode int
-
-const (
- PingCode CommandCode = 1
- GetStatsCode CommandCode = 10
- GetSnapshotFileCode CommandCode = 11
- GetClusterMetadataCode CommandCode = 12
- GetMeCode CommandCode = 20
- GetClientCode CommandCode = 21
- GetClientsCode CommandCode = 22
- GetUserCode CommandCode = 31
- GetUsersCode CommandCode = 32
- CreateUserCode CommandCode = 33
- DeleteUserCode CommandCode = 34
- UpdateUserCode CommandCode = 35
- UpdatePermissionsCode CommandCode = 36
- ChangePasswordCode CommandCode = 37
- LoginUserCode CommandCode = 38
- LogoutUserCode CommandCode = 39
- GetAccessTokensCode CommandCode = 41
- CreateAccessTokenCode CommandCode = 42
- DeleteAccessTokenCode CommandCode = 43
- LoginWithAccessTokenCode CommandCode = 44
- PollMessagesCode CommandCode = 100
- SendMessagesCode CommandCode = 101
- GetOffsetCode CommandCode = 120
- StoreOffsetCode CommandCode = 121
- DeleteConsumerOffsetCode CommandCode = 122
- GetStreamCode CommandCode = 200
- GetStreamsCode CommandCode = 201
- CreateStreamCode CommandCode = 202
- DeleteStreamCode CommandCode = 203
- UpdateStreamCode CommandCode = 204
- GetTopicCode CommandCode = 300
- GetTopicsCode CommandCode = 301
- CreateTopicCode CommandCode = 302
- DeleteTopicCode CommandCode = 303
- UpdateTopicCode CommandCode = 304
- CreatePartitionsCode CommandCode = 402
- DeletePartitionsCode CommandCode = 403
- GetGroupCode CommandCode = 600
- GetGroupsCode CommandCode = 601
- CreateGroupCode CommandCode = 602
- DeleteGroupCode CommandCode = 603
- JoinGroupCode CommandCode = 604
- LeaveGroupCode CommandCode = 605
-)
-
-// internal const int GET_PERSONAL_ACCESS_TOKENS_CODE = 41;
-// internal const int CREATE_PERSONAL_ACCESS_TOKEN_CODE = 42;
-// internal const int DELETE_PERSONAL_ACCESS_TOKEN_CODE = 43;
-// internal const int LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE = 44;
diff --git a/foreign/go/contracts/consumer_groups.go
b/foreign/go/contracts/consumer_groups.go
index 3863a6b8e..124a23e62 100644
--- a/foreign/go/contracts/consumer_groups.go
+++ b/foreign/go/contracts/consumer_groups.go
@@ -35,103 +35,6 @@ type ConsumerGroupMember struct {
Partitions []uint32
}
-type TopicPath struct {
- StreamId Identifier
- TopicId Identifier
-}
-
-type CreateConsumerGroup struct {
- TopicPath
- Name string
-}
-
-func (c *CreateConsumerGroup) Code() CommandCode {
- return CreateGroupCode
-}
-
-func (c *CreateConsumerGroup) MarshalBinary() ([]byte, error) {
- streamIdBytes, err := c.StreamId.MarshalBinary()
- if err != nil {
- return nil, err
- }
- topicIdBytes, err := c.TopicId.MarshalBinary()
- if err != nil {
- return nil, err
- }
- offset := len(streamIdBytes) + len(topicIdBytes)
- bytes := make([]byte, offset+1+len(c.Name))
- copy(bytes[0:len(streamIdBytes)], streamIdBytes)
- copy(bytes[len(streamIdBytes):offset], topicIdBytes)
- bytes[offset] = byte(len(c.Name))
- copy(bytes[offset+1:], c.Name)
- return bytes, nil
-}
-
-type DeleteConsumerGroup struct {
- TopicPath
- GroupId Identifier
-}
-
-func (d *DeleteConsumerGroup) Code() CommandCode {
- return DeleteGroupCode
-}
-
-func (d *DeleteConsumerGroup) MarshalBinary() ([]byte, error) {
- return marshalIdentifiers(d.StreamId, d.TopicId, d.GroupId)
-}
-
-type JoinConsumerGroup struct {
- TopicPath
- GroupId Identifier
-}
-
-func (j *JoinConsumerGroup) Code() CommandCode {
- return JoinGroupCode
-}
-
-func (j *JoinConsumerGroup) MarshalBinary() ([]byte, error) {
- return marshalIdentifiers(j.StreamId, j.TopicId, j.GroupId)
-}
-
-type LeaveConsumerGroup struct {
- TopicPath
- GroupId Identifier
-}
-
-func (l *LeaveConsumerGroup) Code() CommandCode {
- return LeaveGroupCode
-}
-
-func (l *LeaveConsumerGroup) MarshalBinary() ([]byte, error) {
- return marshalIdentifiers(l.StreamId, l.TopicId, l.GroupId)
-}
-
-type GetConsumerGroup struct {
- TopicPath
- GroupId Identifier
-}
-
-func (g *GetConsumerGroup) Code() CommandCode {
- return GetGroupCode
-}
-
-func (g *GetConsumerGroup) MarshalBinary() ([]byte, error) {
- return marshalIdentifiers(g.StreamId, g.TopicId, g.GroupId)
-}
-
-type GetConsumerGroups struct {
- StreamId Identifier
- TopicId Identifier
-}
-
-func (g *GetConsumerGroups) Code() CommandCode {
- return GetGroupsCode
-}
-
-func (g *GetConsumerGroups) MarshalBinary() ([]byte, error) {
- return marshalIdentifiers(g.StreamId, g.TopicId)
-}
-
type ConsumerGroupInfo struct {
StreamId uint32 `json:"streamId"`
TopicId uint32 `json:"topicId"`
diff --git a/foreign/go/contracts/create_stream.go
b/foreign/go/contracts/create_stream.go
deleted file mode 100644
index 9c4c1a631..000000000
--- a/foreign/go/contracts/create_stream.go
+++ /dev/null
@@ -1,39 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-type CreateStream struct {
- Name string
-}
-
-const (
- nameLengthOffset = 0
- payloadOffset = 1
-)
-
-func (request *CreateStream) Code() CommandCode {
- return CreateStreamCode
-}
-
-func (request *CreateStream) MarshalBinary() ([]byte, error) {
- nameLength := len(request.Name)
- serialized := make([]byte, payloadOffset+nameLength)
- serialized[nameLengthOffset] = byte(nameLength)
- copy(serialized[payloadOffset:], request.Name)
- return serialized, nil
-}
diff --git a/foreign/go/contracts/create_topic.go
b/foreign/go/contracts/create_topic.go
deleted file mode 100644
index 92a799a86..000000000
--- a/foreign/go/contracts/create_topic.go
+++ /dev/null
@@ -1,91 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-import (
- "encoding/binary"
-)
-
-type CreateTopic struct {
- StreamId Identifier `json:"streamId"`
- PartitionsCount uint32 `json:"partitionsCount"`
- CompressionAlgorithm CompressionAlgorithm `json:"compressionAlgorithm"`
- MessageExpiry Duration `json:"messageExpiry"`
- MaxTopicSize uint64 `json:"maxTopicSize"`
- Name string `json:"name"`
- ReplicationFactor *uint8 `json:"replicationFactor"`
-}
-
-func (t *CreateTopic) Code() CommandCode {
- return CreateTopicCode
-}
-
-func (t *CreateTopic) MarshalBinary() ([]byte, error) {
- if t.ReplicationFactor == nil {
- t.ReplicationFactor = new(uint8)
- }
-
- streamIdBytes, err := t.StreamId.MarshalBinary()
- if err != nil {
- return nil, err
- }
- nameBytes := []byte(t.Name)
-
- totalLength := len(streamIdBytes) + // StreamId
- 4 + // PartitionsCount
- 1 + // CompressionAlgorithm
- 8 + // MessageExpiry
- 8 + // MaxTopicSize
- 1 + // ReplicationFactor
- 1 + // Name length
- len(nameBytes) // Name
- bytes := make([]byte, totalLength)
-
- position := 0
-
- // StreamId
- copy(bytes[position:], streamIdBytes)
- position += len(streamIdBytes)
-
- // PartitionsCount
- binary.LittleEndian.PutUint32(bytes[position:], t.PartitionsCount)
- position += 4
-
- // CompressionAlgorithm
- bytes[position] = byte(t.CompressionAlgorithm)
- position++
-
- // MessageExpiry
- binary.LittleEndian.PutUint64(bytes[position:], uint64(t.MessageExpiry))
- position += 8
-
- // MaxTopicSize
- binary.LittleEndian.PutUint64(bytes[position:], t.MaxTopicSize)
- position += 8
-
- // ReplicationFactor
- bytes[position] = *t.ReplicationFactor
- position++
-
- // Name
- bytes[position] = byte(len(nameBytes))
- position++
- copy(bytes[position:], nameBytes)
-
- return bytes, nil
-}
diff --git a/foreign/go/contracts/create_user.go
b/foreign/go/contracts/create_user.go
deleted file mode 100644
index df8298719..000000000
--- a/foreign/go/contracts/create_user.go
+++ /dev/null
@@ -1,77 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-import "encoding/binary"
-
-type CreateUser struct {
- Username string `json:"username"`
- Password string `json:"Password"`
- Status UserStatus `json:"Status"`
- Permissions *Permissions `json:"Permissions,omitempty"`
-}
-
-func (c *CreateUser) Code() CommandCode {
- return CreateUserCode
-}
-
-func (c *CreateUser) MarshalBinary() ([]byte, error) {
- capacity := 4 + len(c.Username) + len(c.Password)
- if c.Permissions != nil {
- capacity += 1 + 4 + c.Permissions.Size()
- }
-
- bytes := make([]byte, capacity)
- position := 0
-
- bytes[position] = byte(len(c.Username))
- position += 1
- copy(bytes[position:position+len(c.Username)], []byte(c.Username))
- position += len(c.Username)
-
- bytes[position] = byte(len(c.Password))
- position += 1
- copy(bytes[position:position+len(c.Password)], []byte(c.Password))
- position += len(c.Password)
-
- statusByte := byte(0)
- switch c.Status {
- case Active:
- statusByte = byte(1)
- case Inactive:
- statusByte = byte(2)
- }
- bytes[position] = statusByte
- position += 1
-
- if c.Permissions != nil {
- bytes[position] = byte(1)
- position += 1
- permissionsBytes, err := c.Permissions.MarshalBinary()
- if err != nil {
- return nil, err
- }
- binary.LittleEndian.PutUint32(bytes[position:position+4],
uint32(len(permissionsBytes)))
- position += 4
- copy(bytes[position:position+len(permissionsBytes)],
permissionsBytes)
- } else {
- bytes[position] = byte(0)
- }
-
- return bytes, nil
-}
diff --git a/foreign/go/contracts/delete_stream.go
b/foreign/go/contracts/delete_stream.go
deleted file mode 100644
index fad72e388..000000000
--- a/foreign/go/contracts/delete_stream.go
+++ /dev/null
@@ -1,30 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-type DeleteStream struct {
- StreamId Identifier
-}
-
-func (d *DeleteStream) Code() CommandCode {
- return DeleteStreamCode
-}
-
-func (d *DeleteStream) MarshalBinary() ([]byte, error) {
- return d.StreamId.MarshalBinary()
-}
diff --git a/foreign/go/contracts/delete_topic.go
b/foreign/go/contracts/delete_topic.go
deleted file mode 100644
index d5b062168..000000000
--- a/foreign/go/contracts/delete_topic.go
+++ /dev/null
@@ -1,31 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-type DeleteTopic struct {
- StreamId Identifier
- TopicId Identifier
-}
-
-func (d *DeleteTopic) Code() CommandCode {
- return DeleteTopicCode
-}
-
-func (d *DeleteTopic) MarshalBinary() ([]byte, error) {
- return marshalIdentifiers(d.StreamId, d.TopicId)
-}
diff --git a/foreign/go/contracts/get_stats.go
b/foreign/go/contracts/get_stats.go
deleted file mode 100644
index ec6e42106..000000000
--- a/foreign/go/contracts/get_stats.go
+++ /dev/null
@@ -1,28 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-type GetStats struct{}
-
-func (c *GetStats) Code() CommandCode {
- return GetStatsCode
-}
-
-func (c *GetStats) MarshalBinary() ([]byte, error) {
- return []byte{}, nil
-}
diff --git a/foreign/go/contracts/get_stream.go
b/foreign/go/contracts/get_stream.go
deleted file mode 100644
index 9cc1a7d18..000000000
--- a/foreign/go/contracts/get_stream.go
+++ /dev/null
@@ -1,30 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-type GetStream struct {
- StreamId Identifier
-}
-
-func (g *GetStream) Code() CommandCode {
- return GetStreamCode
-}
-
-func (g *GetStream) MarshalBinary() ([]byte, error) {
- return g.StreamId.MarshalBinary()
-}
diff --git a/foreign/go/contracts/get_streams.go
b/foreign/go/contracts/get_streams.go
deleted file mode 100644
index e8b0e6083..000000000
--- a/foreign/go/contracts/get_streams.go
+++ /dev/null
@@ -1,28 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-type GetStreams struct{}
-
-func (g *GetStreams) Code() CommandCode {
- return GetStreamsCode
-}
-
-func (g *GetStreams) MarshalBinary() ([]byte, error) {
- return []byte{}, nil
-}
diff --git a/foreign/go/contracts/get_topic.go
b/foreign/go/contracts/get_topic.go
deleted file mode 100644
index 1026bfd5c..000000000
--- a/foreign/go/contracts/get_topic.go
+++ /dev/null
@@ -1,31 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-type GetTopic struct {
- StreamId Identifier
- TopicId Identifier
-}
-
-func (g *GetTopic) Code() CommandCode {
- return GetTopicCode
-}
-
-func (g *GetTopic) MarshalBinary() ([]byte, error) {
- return marshalIdentifiers(g.StreamId, g.TopicId)
-}
diff --git a/foreign/go/contracts/get_topics.go
b/foreign/go/contracts/get_topics.go
deleted file mode 100644
index 23dbbd53d..000000000
--- a/foreign/go/contracts/get_topics.go
+++ /dev/null
@@ -1,30 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-type GetTopics struct {
- StreamId Identifier
-}
-
-func (g *GetTopics) Code() CommandCode {
- return GetTopicsCode
-}
-
-func (g *GetTopics) MarshalBinary() ([]byte, error) {
- return g.StreamId.MarshalBinary()
-}
diff --git a/foreign/go/contracts/get_user.go b/foreign/go/contracts/get_user.go
deleted file mode 100644
index 0ef25435c..000000000
--- a/foreign/go/contracts/get_user.go
+++ /dev/null
@@ -1,30 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-type GetUser struct {
- Id Identifier
-}
-
-func (c *GetUser) Code() CommandCode {
- return GetUserCode
-}
-
-func (c *GetUser) MarshalBinary() ([]byte, error) {
- return c.Id.MarshalBinary()
-}
diff --git a/foreign/go/contracts/identifier.go
b/foreign/go/contracts/identifier.go
index 3938d490a..3b1ea7c13 100644
--- a/foreign/go/contracts/identifier.go
+++ b/foreign/go/contracts/identifier.go
@@ -103,7 +103,7 @@ func (id Identifier) AppendBinary(b []byte) ([]byte, error)
{
return b, nil
}
-func marshalIdentifiers(identifiers ...Identifier) ([]byte, error) {
+func MarshalIdentifiers(identifiers ...Identifier) ([]byte, error) {
size := 0
for i := 0; i < len(identifiers); i++ {
size += 2 + identifiers[i].Length
diff --git a/foreign/go/contracts/delete_user.go
b/foreign/go/contracts/identity_info.go
similarity index 80%
rename from foreign/go/contracts/delete_user.go
rename to foreign/go/contracts/identity_info.go
index 013f8e1ca..4a26d1927 100644
--- a/foreign/go/contracts/delete_user.go
+++ b/foreign/go/contracts/identity_info.go
@@ -17,14 +17,9 @@
package iggcon
-type DeleteUser struct {
- Id Identifier
-}
-
-func (d *DeleteUser) Code() CommandCode {
- return DeleteUserCode
-}
-
-func (d *DeleteUser) MarshalBinary() ([]byte, error) {
- return d.Id.MarshalBinary()
+type IdentityInfo struct {
+ // Unique identifier (numeric) of the user.
+ UserId uint32 `json:"userId"`
+ // The optional tokens, used only by HTTP transport.
+ AccessToken *string `json:"accessToken"`
}
diff --git a/foreign/go/contracts/get_users.go b/foreign/go/contracts/offset.go
similarity index 83%
rename from foreign/go/contracts/get_users.go
rename to foreign/go/contracts/offset.go
index 21c5b3de5..e2b0d4f57 100644
--- a/foreign/go/contracts/get_users.go
+++ b/foreign/go/contracts/offset.go
@@ -17,12 +17,8 @@
package iggcon
-type GetUsers struct{}
-
-func (g *GetUsers) Code() CommandCode {
- return GetUsersCode
-}
-
-func (g *GetUsers) MarshalBinary() ([]byte, error) {
- return []byte{}, nil
+type ConsumerOffsetInfo struct {
+ PartitionId uint32 `json:"partitionId"`
+ CurrentOffset uint64 `json:"currentOffset"`
+ StoredOffset uint64 `json:"storedOffset"`
}
diff --git a/foreign/go/contracts/partitions.go
b/foreign/go/contracts/partitions.go
index 00ab0244c..c7043d3c0 100644
--- a/foreign/go/contracts/partitions.go
+++ b/foreign/go/contracts/partitions.go
@@ -33,66 +33,6 @@ type PartitionContract struct {
SizeBytes uint64 `json:"sizeBytes"`
}
-type CreatePartitions struct {
- StreamId Identifier `json:"streamId"`
- TopicId Identifier `json:"topicId"`
- PartitionsCount uint32 `json:"partitionsCount"`
-}
-
-func (c *CreatePartitions) Code() CommandCode {
- return CreatePartitionsCode
-}
-
-func (c *CreatePartitions) MarshalBinary() ([]byte, error) {
- streamIdBytes, err := c.StreamId.MarshalBinary()
- if err != nil {
- return nil, err
- }
- topicIdBytes, err := c.TopicId.MarshalBinary()
- if err != nil {
- return nil, err
- }
- bytes := make([]byte, len(streamIdBytes)+len(topicIdBytes)+4)
- position := 0
- copy(bytes[position:], streamIdBytes)
- position += len(streamIdBytes)
- copy(bytes[position:], topicIdBytes)
- position += len(topicIdBytes)
- binary.LittleEndian.PutUint32(bytes[position:position+4],
c.PartitionsCount)
-
- return bytes, nil
-}
-
-type DeletePartitions struct {
- StreamId Identifier `json:"streamId"`
- TopicId Identifier `json:"topicId"`
- PartitionsCount uint32 `json:"partitionsCount"`
-}
-
-func (d *DeletePartitions) Code() CommandCode {
- return DeletePartitionsCode
-}
-
-func (d *DeletePartitions) MarshalBinary() ([]byte, error) {
- streamIdBytes, err := d.StreamId.MarshalBinary()
- if err != nil {
- return nil, err
- }
- topicIdBytes, err := d.TopicId.MarshalBinary()
- if err != nil {
- return nil, err
- }
- bytes := make([]byte, len(streamIdBytes)+len(topicIdBytes)+4)
- position := 0
- copy(bytes[position:], streamIdBytes)
- position += len(streamIdBytes)
- copy(bytes[position:], topicIdBytes)
- position += len(topicIdBytes)
- binary.LittleEndian.PutUint32(bytes[position:position+4],
d.PartitionsCount)
-
- return bytes, nil
-}
-
type PartitioningKind int
const (
diff --git a/foreign/go/contracts/poll_messages.go
b/foreign/go/contracts/poll_messages.go
deleted file mode 100644
index efeaf8171..000000000
--- a/foreign/go/contracts/poll_messages.go
+++ /dev/null
@@ -1,93 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-import (
- "encoding/binary"
-)
-
-const (
- partitionPresenceSize = 1
- partitionFieldSize = 4
- partitionStrategySize = partitionPresenceSize + partitionFieldSize + 1
- offsetSize = 12
- commitFlagSize = 1
-)
-
-type PollMessages struct {
- StreamId Identifier `json:"streamId"`
- TopicId Identifier `json:"topicId"`
- Consumer Consumer `json:"consumer"`
- PartitionId *uint32 `json:"partitionId"`
- Strategy PollingStrategy `json:"pollingStrategy"`
- Count uint32 `json:"count"`
- AutoCommit bool `json:"autoCommit"`
-}
-
-func (m *PollMessages) Code() CommandCode {
- return PollMessagesCode
-}
-
-func (m *PollMessages) MarshalBinary() ([]byte, error) {
- consumerIdBytes, err := m.Consumer.Id.MarshalBinary()
- if err != nil {
- return nil, err
- }
- streamIdBytes, err := m.StreamId.MarshalBinary()
- if err != nil {
- return nil, err
- }
- topicIdBytes, err := m.TopicId.MarshalBinary()
- if err != nil {
- return nil, err
- }
- messageSize := 1 + len(consumerIdBytes) + len(streamIdBytes) +
len(topicIdBytes) + partitionStrategySize + offsetSize + commitFlagSize
- bytes := make([]byte, messageSize)
-
- bytes[0] = byte(m.Consumer.Kind)
- position := 1
- copy(bytes[position:position+len(consumerIdBytes)], consumerIdBytes)
- position += len(consumerIdBytes)
-
- copy(bytes[position:position+len(streamIdBytes)], streamIdBytes)
- position += len(streamIdBytes)
- copy(bytes[position:position+len(topicIdBytes)], topicIdBytes)
- position += len(topicIdBytes)
- if m.PartitionId != nil {
- bytes[position] = 1
- binary.LittleEndian.PutUint32(bytes[position+1:position+1+4],
*m.PartitionId)
- } else {
- bytes[position] = 0
- binary.LittleEndian.PutUint32(bytes[position+1:position+1+4], 0)
- }
- bytes[position+1+4] = byte(m.Strategy.Kind)
-
- position += partitionStrategySize
- binary.LittleEndian.PutUint64(bytes[position:position+8],
m.Strategy.Value)
- binary.LittleEndian.PutUint32(bytes[position+8:position+12], m.Count)
-
- position += offsetSize
-
- if m.AutoCommit {
- bytes[position] = 1
- } else {
- bytes[position] = 0
- }
-
- return bytes, nil
-}
diff --git a/foreign/go/contracts/poll_messages_test.go
b/foreign/go/contracts/poll_messages_test.go
deleted file mode 100644
index a1455432c..000000000
--- a/foreign/go/contracts/poll_messages_test.go
+++ /dev/null
@@ -1,85 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-import (
- "testing"
-)
-
-func TestSerialize_TcpFetchMessagesRequest(t *testing.T) {
- partitionId := uint32(123)
- consumerId, _ := NewIdentifier(uint32(42))
- streamId, _ := NewIdentifier("test_stream_id")
- topicId, _ := NewIdentifier("test_topic_id")
- // Create a sample PollMessages
- request := PollMessages{
- Consumer: NewSingleConsumer(consumerId),
- StreamId: streamId,
- TopicId: topicId,
- PartitionId: &partitionId,
- Strategy: FirstPollingStrategy(),
- Count: 100,
- AutoCommit: true,
- }
-
- // Serialize the request
- serialized, err := request.MarshalBinary()
- if err != nil {
- t.Error(err)
- }
-
- // Expected serialized bytes based on the provided sample request
- expected := []byte{
- 0x01, // Consumer Kind
- 0x01, // ConsumerId Kind (NumericId)
- 0x04, // ConsumerId Length (4)
- 0x2A, 0x00, 0x0, 0x0, // ConsumerId
-
- 0x02,
// StreamId Kind (StringId)
- 0x0E,
// StreamId Length (14)
- 0x74, 0x65, 0x73, 0x74, 0x5F, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6D, 0x5F, 0x69, 0x64, // StreamId
-
- 0x02,
// TopicId Kind (StringId)
- 0x0D,
// TopicId Length (13)
- 0x74, 0x65, 0x73, 0x74, 0x5F, 0x74, 0x6F, 0x70, 0x69, 0x63,
0x5F, 0x69, 0x64, // TopicId
-
- 0x01, // Partition present
- 0x7B, 0x00, 0x00, 0x00, // PartitionId (123)
- 0x03, //
PollingStrategy Kind
- 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, //
PollingStrategy Value (0)
- 0x64, 0x00, 0x00, 0x00, // Count (100)
- 0x01, // AutoCommit
- }
-
- // Check if the serialized bytes match the expected bytes
- if !areBytesEqual(serialized, expected) {
- t.Errorf("Serialized bytes are incorrect.
\nExpected:\t%v\nGot:\t\t%v", expected, serialized)
- }
-}
-
-func areBytesEqual(a, b []byte) bool {
- if len(a) != len(b) {
- return false
- }
- for i := range a {
- if a[i] != b[i] {
- return false
- }
- }
- return true
-}
diff --git a/foreign/go/contracts/send_messages_test.go
b/foreign/go/contracts/send_messages_test.go
deleted file mode 100644
index 1ddf4f831..000000000
--- a/foreign/go/contracts/send_messages_test.go
+++ /dev/null
@@ -1,86 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-import (
- "bytes"
- "testing"
-
- "github.com/google/uuid"
-)
-
-func TestSerialize_SendMessagesRequest(t *testing.T) {
- message1 := generateTestMessage("data1")
- streamId, _ := NewIdentifier("test_stream_id")
- topicId, _ := NewIdentifier("test_topic_id")
- request := SendMessages{
- StreamId: streamId,
- TopicId: topicId,
- Partitioning: PartitionId(1),
- Messages: []IggyMessage{
- message1,
- },
- Compression: MESSAGE_COMPRESSION_NONE,
- }
-
- // Serialize the request
- serialized, err := request.MarshalBinary()
- if err != nil {
- t.Error(err)
- }
-
- // Expected serialized bytes based on the provided sample request
- expected := []byte{
- 0x29, 0x0, 0x0, 0x0, // metadataLength
- 0x02,
// StreamId Kind (StringId)
- 0x0E,
// StreamId Length (14)
- 0x74, 0x65, 0x73, 0x74, 0x5F, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6D, 0x5F, 0x69, 0x64, // StreamId
-
- 0x02,
// TopicId Kind (StringId)
- 0x0D,
// TopicId Length (13)
- 0x74, 0x65, 0x73, 0x74, 0x5F, 0x74, 0x6F, 0x70, 0x69, 0x63,
0x5F, 0x69, 0x64, // TopicId
- 0x02, // PartitionIdKind
- 0x04, // Partitioning Length
- 0x01, 0x00, 0x00, 0x00, // PartitionId (123)
- 0x01, 0x0, 0x0, 0x0, // MessageCount
- 0, 0, 0, 0, 120, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // Index
(16*1) bytes
- }
- expected = append(expected, message1.Header.ToBytes()...)
- expected = append(expected, message1.Payload...)
- expected = append(expected, message1.UserHeaders...)
-
- // Check if the serialized bytes match the expected bytes
- if !bytes.Equal(serialized, expected) {
- t.Errorf("Serialized bytes are incorrect.
\nExpected:\t%v\nGot:\t\t%v", expected, serialized)
- }
-}
-
-func createDefaultMessageHeaders() []HeaderEntry {
- return []HeaderEntry{
- {Key: HeaderKey{Kind: String, Value: []byte("HeaderKey1")},
Value: HeaderValue{Kind: String, Value: []byte("Value 1")}},
- {Key: HeaderKey{Kind: String, Value: []byte("HeaderKey2")},
Value: HeaderValue{Kind: Uint32, Value: []byte{0x01, 0x02, 0x03, 0x04}}},
- }
-}
-
-func generateTestMessage(payload string) IggyMessage {
- msg, _ := NewIggyMessage(
- []byte(payload),
- WithID(uuid.New()),
- WithUserHeaders(createDefaultMessageHeaders()))
- return msg
-}
diff --git a/foreign/go/contracts/update_stream_test.go
b/foreign/go/contracts/update_stream_test.go
deleted file mode 100644
index abb480bb0..000000000
--- a/foreign/go/contracts/update_stream_test.go
+++ /dev/null
@@ -1,48 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-import (
- "bytes"
- "testing"
-)
-
-func TestSerialize_UpdateStream(t *testing.T) {
- streamId, _ := NewIdentifier("stream")
- request := UpdateStream{
- StreamId: streamId,
- Name: "update_stream",
- }
-
- serialized1, err := request.MarshalBinary()
- if err != nil {
- t.Errorf("Failed to serialize UpdateStream: %v", err)
- }
-
- expected := []byte{
- 0x02, // StreamId Kind (StringId)
- 0x06, // StreamId Length (2)
- 0x73, 0x74, 0x72, 0x65, 0x61, 0x6D, // StreamId Value ("stream")
- 0x0D,
// Name Length (13)
- 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x5F, 0x73, 0x74, 0x72,
0x65, 0x61, 0x6D, // Name ("update_stream")
- }
-
- if !bytes.Equal(serialized1, expected) {
- t.Errorf("Test case 1 failed. \nExpected:\t%v\nGot:\t\t%v",
expected, serialized1)
- }
-}
diff --git a/foreign/go/contracts/update_topic.go
b/foreign/go/contracts/update_topic.go
deleted file mode 100644
index ef5de4863..000000000
--- a/foreign/go/contracts/update_topic.go
+++ /dev/null
@@ -1,76 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-import (
- "encoding/binary"
-)
-
-type UpdateTopic struct {
- StreamId Identifier `json:"streamId"`
- TopicId Identifier `json:"topicId"`
- CompressionAlgorithm CompressionAlgorithm `json:"compressionAlgorithm"`
- MessageExpiry Duration `json:"messageExpiry"`
- MaxTopicSize uint64 `json:"maxTopicSize"`
- ReplicationFactor *uint8 `json:"replicationFactor"`
- Name string `json:"name"`
-}
-
-func (u *UpdateTopic) Code() CommandCode {
- return UpdateTopicCode
-}
-
-func (u *UpdateTopic) MarshalBinary() ([]byte, error) {
- if u.ReplicationFactor == nil {
- u.ReplicationFactor = new(uint8)
- }
- streamIdBytes, err := u.StreamId.MarshalBinary()
- if err != nil {
- return nil, err
- }
- topicIdBytes, err := u.TopicId.MarshalBinary()
- if err != nil {
- return nil, err
- }
-
- buffer := make([]byte,
19+len(streamIdBytes)+len(topicIdBytes)+len(u.Name))
-
- offset := 0
-
- offset += copy(buffer[offset:], streamIdBytes)
- offset += copy(buffer[offset:], topicIdBytes)
-
- buffer[offset] = byte(u.CompressionAlgorithm)
- offset++
-
- binary.LittleEndian.PutUint64(buffer[offset:], uint64(u.MessageExpiry))
- offset += 8
-
- binary.LittleEndian.PutUint64(buffer[offset:], u.MaxTopicSize)
- offset += 8
-
- buffer[offset] = *u.ReplicationFactor
- offset++
-
- buffer[offset] = uint8(len(u.Name))
- offset++
-
- copy(buffer[offset:], u.Name)
-
- return buffer, nil
-}
diff --git a/foreign/go/contracts/update_user_permissions.go
b/foreign/go/contracts/update_user_permissions.go
deleted file mode 100644
index 952bd14d0..000000000
--- a/foreign/go/contracts/update_user_permissions.go
+++ /dev/null
@@ -1,63 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package iggcon
-
-import "encoding/binary"
-
-type UpdatePermissions struct {
- UserID Identifier `json:"-"`
- Permissions *Permissions `json:"Permissions,omitempty"`
-}
-
-func (u *UpdatePermissions) Code() CommandCode {
- return UpdatePermissionsCode
-}
-
-func (u *UpdatePermissions) MarshalBinary() ([]byte, error) {
- userIdBytes, err := u.UserID.MarshalBinary()
- if err != nil {
- return nil, err
- }
- length := len(userIdBytes)
-
- if u.Permissions != nil {
- length += 1 + 4 + u.Permissions.Size()
- }
-
- bytes := make([]byte, length)
- position := 0
-
- copy(bytes[position:position+len(userIdBytes)], userIdBytes)
- position += len(userIdBytes)
-
- if u.Permissions != nil {
- bytes[position] = 1
- position++
- permissionsBytes, err := u.Permissions.MarshalBinary()
- if err != nil {
- return nil, err
- }
- binary.LittleEndian.PutUint32(bytes[position:position+4],
uint32(len(permissionsBytes)))
- position += 4
- copy(bytes[position:position+len(permissionsBytes)],
permissionsBytes)
- } else {
- bytes[position] = 0
- }
-
- return bytes, nil
-}
diff --git a/foreign/go/contracts/access_tokens.go
b/foreign/go/internal/command/access_token.go
similarity index 80%
copy from foreign/go/contracts/access_tokens.go
copy to foreign/go/internal/command/access_token.go
index edb9b4ef3..68260a698 100644
--- a/foreign/go/contracts/access_tokens.go
+++ b/foreign/go/internal/command/access_token.go
@@ -15,19 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-package iggcon
+package command
-import (
- "encoding/binary"
- "time"
-)
+import "encoding/binary"
type CreatePersonalAccessToken struct {
Name string `json:"Name"`
Expiry uint32 `json:"Expiry"`
}
-func (c *CreatePersonalAccessToken) Code() CommandCode {
+func (c *CreatePersonalAccessToken) Code() Code {
return CreateAccessTokenCode
}
@@ -40,11 +37,21 @@ func (c *CreatePersonalAccessToken) MarshalBinary()
([]byte, error) {
return bytes, nil
}
+type GetPersonalAccessTokens struct{}
+
+func (g *GetPersonalAccessTokens) Code() Code {
+ return GetAccessTokensCode
+}
+
+func (g *GetPersonalAccessTokens) MarshalBinary() ([]byte, error) {
+ return []byte{}, nil
+}
+
type DeletePersonalAccessToken struct {
Name string `json:"Name"`
}
-func (d *DeletePersonalAccessToken) Code() CommandCode {
+func (d *DeletePersonalAccessToken) Code() Code {
return DeleteAccessTokenCode
}
@@ -55,22 +62,3 @@ func (d *DeletePersonalAccessToken) MarshalBinary() ([]byte,
error) {
copy(bytes[1:], d.Name)
return bytes, nil
}
-
-type GetPersonalAccessTokens struct{}
-
-func (g *GetPersonalAccessTokens) Code() CommandCode {
- return GetAccessTokensCode
-}
-
-func (g *GetPersonalAccessTokens) MarshalBinary() ([]byte, error) {
- return []byte{}, nil
-}
-
-type PersonalAccessTokenInfo struct {
- Name string `json:"Name"`
- Expiry *time.Time `json:"Expiry"`
-}
-
-type RawPersonalAccessToken struct {
- Token string `json:"token"`
-}
diff --git a/foreign/go/internal/command/code.go
b/foreign/go/internal/command/code.go
new file mode 100644
index 000000000..e4775eb2f
--- /dev/null
+++ b/foreign/go/internal/command/code.go
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package command
+
+type Code int
+
+const (
+ PingCode Code = 1
+ GetStatsCode Code = 10
+ GetSnapshotFileCode Code = 11
+ GetClusterMetadataCode Code = 12
+ GetMeCode Code = 20
+ GetClientCode Code = 21
+ GetClientsCode Code = 22
+ GetUserCode Code = 31
+ GetUsersCode Code = 32
+ CreateUserCode Code = 33
+ DeleteUserCode Code = 34
+ UpdateUserCode Code = 35
+ UpdatePermissionsCode Code = 36
+ ChangePasswordCode Code = 37
+ LoginUserCode Code = 38
+ LogoutUserCode Code = 39
+ GetAccessTokensCode Code = 41
+ CreateAccessTokenCode Code = 42
+ DeleteAccessTokenCode Code = 43
+ LoginWithAccessTokenCode Code = 44
+ PollMessagesCode Code = 100
+ SendMessagesCode Code = 101
+ GetOffsetCode Code = 120
+ StoreOffsetCode Code = 121
+ DeleteConsumerOffsetCode Code = 122
+ GetStreamCode Code = 200
+ GetStreamsCode Code = 201
+ CreateStreamCode Code = 202
+ DeleteStreamCode Code = 203
+ UpdateStreamCode Code = 204
+ GetTopicCode Code = 300
+ GetTopicsCode Code = 301
+ CreateTopicCode Code = 302
+ DeleteTopicCode Code = 303
+ UpdateTopicCode Code = 304
+ CreatePartitionsCode Code = 402
+ DeletePartitionsCode Code = 403
+ GetGroupCode Code = 600
+ GetGroupsCode Code = 601
+ CreateGroupCode Code = 602
+ DeleteGroupCode Code = 603
+ JoinGroupCode Code = 604
+ LeaveGroupCode Code = 605
+)
+
+// internal const int GET_PERSONAL_ACCESS_TOKENS_CODE = 41;
+// internal const int CREATE_PERSONAL_ACCESS_TOKEN_CODE = 42;
+// internal const int DELETE_PERSONAL_ACCESS_TOKEN_CODE = 43;
+// internal const int LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE = 44;
diff --git a/foreign/go/contracts/ping.go
b/foreign/go/internal/command/command.go
similarity index 82%
rename from foreign/go/contracts/ping.go
rename to foreign/go/internal/command/command.go
index dd43a6aad..c78404d5a 100644
--- a/foreign/go/contracts/ping.go
+++ b/foreign/go/internal/command/command.go
@@ -15,14 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-package iggcon
+package command
-type Ping struct{}
+import (
+ "encoding"
+)
-func (p *Ping) Code() CommandCode {
- return PingCode
-}
+type Command interface {
+ // Code returns the command code associated with this command.
+ Code() Code
-func (p *Ping) MarshalBinary() ([]byte, error) {
- return []byte{}, nil
+ encoding.BinaryMarshaler
}
diff --git a/foreign/go/contracts/consumer_groups.go
b/foreign/go/internal/command/consumer_group.go
similarity index 61%
copy from foreign/go/contracts/consumer_groups.go
copy to foreign/go/internal/command/consumer_group.go
index 3863a6b8e..d78d2403f 100644
--- a/foreign/go/contracts/consumer_groups.go
+++ b/foreign/go/internal/command/consumer_group.go
@@ -15,29 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-package iggcon
+package command
-type ConsumerGroup struct {
- Id uint32 `json:"id"`
- Name string `json:"name"`
- PartitionsCount uint32 `json:"partitionsCount"`
- MembersCount uint32 `json:"membersCount"`
-}
-
-type ConsumerGroupDetails struct {
- ConsumerGroup
- Members []ConsumerGroupMember
-}
-
-type ConsumerGroupMember struct {
- ID uint32
- PartitionsCount uint32
- Partitions []uint32
-}
+import iggcon "github.com/apache/iggy/foreign/go/contracts"
type TopicPath struct {
- StreamId Identifier
- TopicId Identifier
+ StreamId iggcon.Identifier
+ TopicId iggcon.Identifier
}
type CreateConsumerGroup struct {
@@ -45,7 +29,7 @@ type CreateConsumerGroup struct {
Name string
}
-func (c *CreateConsumerGroup) Code() CommandCode {
+func (c *CreateConsumerGroup) Code() Code {
return CreateGroupCode
}
@@ -67,73 +51,67 @@ func (c *CreateConsumerGroup) MarshalBinary() ([]byte,
error) {
return bytes, nil
}
-type DeleteConsumerGroup struct {
+type GetConsumerGroup struct {
TopicPath
- GroupId Identifier
+ GroupId iggcon.Identifier
}
-func (d *DeleteConsumerGroup) Code() CommandCode {
- return DeleteGroupCode
+func (g *GetConsumerGroup) Code() Code {
+ return GetGroupCode
}
-func (d *DeleteConsumerGroup) MarshalBinary() ([]byte, error) {
- return marshalIdentifiers(d.StreamId, d.TopicId, d.GroupId)
+func (g *GetConsumerGroup) MarshalBinary() ([]byte, error) {
+ return iggcon.MarshalIdentifiers(g.StreamId, g.TopicId, g.GroupId)
+}
+
+type GetConsumerGroups struct {
+ StreamId iggcon.Identifier
+ TopicId iggcon.Identifier
+}
+
+func (g *GetConsumerGroups) Code() Code {
+ return GetGroupsCode
+}
+
+func (g *GetConsumerGroups) MarshalBinary() ([]byte, error) {
+ return iggcon.MarshalIdentifiers(g.StreamId, g.TopicId)
}
type JoinConsumerGroup struct {
TopicPath
- GroupId Identifier
+ GroupId iggcon.Identifier
}
-func (j *JoinConsumerGroup) Code() CommandCode {
+func (j *JoinConsumerGroup) Code() Code {
return JoinGroupCode
}
func (j *JoinConsumerGroup) MarshalBinary() ([]byte, error) {
- return marshalIdentifiers(j.StreamId, j.TopicId, j.GroupId)
+ return iggcon.MarshalIdentifiers(j.StreamId, j.TopicId, j.GroupId)
}
type LeaveConsumerGroup struct {
TopicPath
- GroupId Identifier
+ GroupId iggcon.Identifier
}
-func (l *LeaveConsumerGroup) Code() CommandCode {
+func (l *LeaveConsumerGroup) Code() Code {
return LeaveGroupCode
}
func (l *LeaveConsumerGroup) MarshalBinary() ([]byte, error) {
- return marshalIdentifiers(l.StreamId, l.TopicId, l.GroupId)
+ return iggcon.MarshalIdentifiers(l.StreamId, l.TopicId, l.GroupId)
}
-type GetConsumerGroup struct {
+type DeleteConsumerGroup struct {
TopicPath
- GroupId Identifier
-}
-
-func (g *GetConsumerGroup) Code() CommandCode {
- return GetGroupCode
-}
-
-func (g *GetConsumerGroup) MarshalBinary() ([]byte, error) {
- return marshalIdentifiers(g.StreamId, g.TopicId, g.GroupId)
+ GroupId iggcon.Identifier
}
-type GetConsumerGroups struct {
- StreamId Identifier
- TopicId Identifier
-}
-
-func (g *GetConsumerGroups) Code() CommandCode {
- return GetGroupsCode
-}
-
-func (g *GetConsumerGroups) MarshalBinary() ([]byte, error) {
- return marshalIdentifiers(g.StreamId, g.TopicId)
+func (d *DeleteConsumerGroup) Code() Code {
+ return DeleteGroupCode
}
-type ConsumerGroupInfo struct {
- StreamId uint32 `json:"streamId"`
- TopicId uint32 `json:"topicId"`
- GroupId uint32 `json:"groupId"`
+func (d *DeleteConsumerGroup) MarshalBinary() ([]byte, error) {
+ return iggcon.MarshalIdentifiers(d.StreamId, d.TopicId, d.GroupId)
}
diff --git a/foreign/go/contracts/send_messages.go
b/foreign/go/internal/command/message.go
similarity index 53%
rename from foreign/go/contracts/send_messages.go
rename to foreign/go/internal/command/message.go
index 3effccfc3..262156b8d 100644
--- a/foreign/go/contracts/send_messages.go
+++ b/foreign/go/internal/command/message.go
@@ -15,45 +15,53 @@
// specific language governing permissions and limitations
// under the License.
-package iggcon
+package command
import (
"encoding/binary"
+ "github.com/apache/iggy/foreign/go/contracts"
"github.com/klauspost/compress/s2"
)
+const (
+ partitionPresenceSize = 1
+ partitionFieldSize = 4
+ partitionStrategySize = partitionPresenceSize + partitionFieldSize + 1
+ offsetSize = 12
+ commitFlagSize = 1
+ indexSize = 16
+)
+
type SendMessages struct {
- Compression IggyMessageCompression
+ Compression iggcon.IggyMessageCompression
- StreamId Identifier `json:"streamId"`
- TopicId Identifier `json:"topicId"`
- Partitioning Partitioning `json:"partitioning"`
- Messages []IggyMessage `json:"messages"`
+ StreamId iggcon.Identifier `json:"streamId"`
+ TopicId iggcon.Identifier `json:"topicId"`
+ Partitioning iggcon.Partitioning `json:"partitioning"`
+ Messages []iggcon.IggyMessage `json:"messages"`
}
-const indexSize = 16
-
-func (s *SendMessages) Code() CommandCode {
+func (s *SendMessages) Code() Code {
return SendMessagesCode
}
func (s *SendMessages) MarshalBinary() ([]byte, error) {
for i, message := range s.Messages {
switch s.Compression {
- case MESSAGE_COMPRESSION_S2:
+ case iggcon.MESSAGE_COMPRESSION_S2:
if len(message.Payload) < 32 {
break
}
s.Messages[i].Payload = s2.Encode(nil, message.Payload)
message.Header.PayloadLength =
uint32(len(message.Payload))
- case MESSAGE_COMPRESSION_S2_BETTER:
+ case iggcon.MESSAGE_COMPRESSION_S2_BETTER:
if len(message.Payload) < 32 {
break
}
s.Messages[i].Payload = s2.EncodeBetter(nil,
message.Payload)
message.Header.PayloadLength =
uint32(len(message.Payload))
- case MESSAGE_COMPRESSION_S2_BEST:
+ case iggcon.MESSAGE_COMPRESSION_S2_BEST:
if len(message.Payload) < 32 {
break
}
@@ -118,13 +126,13 @@ func (s *SendMessages) MarshalBinary() ([]byte, error) {
msgSize := uint32(0)
for _, message := range s.Messages {
- copy(bytes[position:position+MessageHeaderSize],
message.Header.ToBytes())
-
copy(bytes[position+MessageHeaderSize:position+MessageHeaderSize+int(message.Header.PayloadLength)],
message.Payload)
- position += MessageHeaderSize +
int(message.Header.PayloadLength)
+ copy(bytes[position:position+iggcon.MessageHeaderSize],
message.Header.ToBytes())
+
copy(bytes[position+iggcon.MessageHeaderSize:position+iggcon.MessageHeaderSize+int(message.Header.PayloadLength)],
message.Payload)
+ position += iggcon.MessageHeaderSize +
int(message.Header.PayloadLength)
copy(bytes[position:position+int(message.Header.UserHeaderLength)],
message.UserHeaders)
position += int(message.Header.UserHeaderLength)
- msgSize += MessageHeaderSize + message.Header.PayloadLength +
message.Header.UserHeaderLength
+ msgSize += iggcon.MessageHeaderSize +
message.Header.PayloadLength + message.Header.UserHeaderLength
binary.LittleEndian.PutUint32(bytes[currentIndexPosition:currentIndexPosition+4],
0)
binary.LittleEndian.PutUint32(bytes[currentIndexPosition+4:currentIndexPosition+8],
uint32(msgSize))
@@ -135,10 +143,73 @@ func (s *SendMessages) MarshalBinary() ([]byte, error) {
return bytes, nil
}
-func calculateMessageBytesCount(messages []IggyMessage) int {
+func calculateMessageBytesCount(messages []iggcon.IggyMessage) int {
count := 0
for _, msg := range messages {
- count += MessageHeaderSize + len(msg.Payload) +
len(msg.UserHeaders)
+ count += iggcon.MessageHeaderSize + len(msg.Payload) +
len(msg.UserHeaders)
}
return count
}
+
+type PollMessages struct {
+ StreamId iggcon.Identifier `json:"streamId"`
+ TopicId iggcon.Identifier `json:"topicId"`
+ Consumer iggcon.Consumer `json:"consumer"`
+ PartitionId *uint32 `json:"partitionId"`
+ Strategy iggcon.PollingStrategy `json:"pollingStrategy"`
+ Count uint32 `json:"count"`
+ AutoCommit bool `json:"autoCommit"`
+}
+
+func (m *PollMessages) Code() Code {
+ return PollMessagesCode
+}
+
+func (m *PollMessages) MarshalBinary() ([]byte, error) {
+ consumerIdBytes, err := m.Consumer.Id.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ streamIdBytes, err := m.StreamId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ topicIdBytes, err := m.TopicId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ messageSize := 1 + len(consumerIdBytes) + len(streamIdBytes) +
len(topicIdBytes) + partitionStrategySize + offsetSize + commitFlagSize
+ bytes := make([]byte, messageSize)
+
+ bytes[0] = byte(m.Consumer.Kind)
+ position := 1
+ copy(bytes[position:position+len(consumerIdBytes)], consumerIdBytes)
+ position += len(consumerIdBytes)
+
+ copy(bytes[position:position+len(streamIdBytes)], streamIdBytes)
+ position += len(streamIdBytes)
+ copy(bytes[position:position+len(topicIdBytes)], topicIdBytes)
+ position += len(topicIdBytes)
+ if m.PartitionId != nil {
+ bytes[position] = 1
+ binary.LittleEndian.PutUint32(bytes[position+1:position+1+4],
*m.PartitionId)
+ } else {
+ bytes[position] = 0
+ binary.LittleEndian.PutUint32(bytes[position+1:position+1+4], 0)
+ }
+ bytes[position+1+4] = byte(m.Strategy.Kind)
+
+ position += partitionStrategySize
+ binary.LittleEndian.PutUint64(bytes[position:position+8],
m.Strategy.Value)
+ binary.LittleEndian.PutUint32(bytes[position+8:position+12], m.Count)
+
+ position += offsetSize
+
+ if m.AutoCommit {
+ bytes[position] = 1
+ } else {
+ bytes[position] = 0
+ }
+
+ return bytes, nil
+}
diff --git a/foreign/go/internal/command/message_test.go
b/foreign/go/internal/command/message_test.go
new file mode 100644
index 000000000..305f5274b
--- /dev/null
+++ b/foreign/go/internal/command/message_test.go
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package command
+
+import (
+ "bytes"
+ "testing"
+
+ "github.com/apache/iggy/foreign/go/contracts"
+ "github.com/google/uuid"
+)
+
+func TestSerialize_TcpFetchMessagesRequest(t *testing.T) {
+ partitionId := uint32(123)
+ consumerId, _ := iggcon.NewIdentifier(uint32(42))
+ streamId, _ := iggcon.NewIdentifier("test_stream_id")
+ topicId, _ := iggcon.NewIdentifier("test_topic_id")
+ // Create a sample PollMessages
+ request := PollMessages{
+ Consumer: iggcon.NewSingleConsumer(consumerId),
+ StreamId: streamId,
+ TopicId: topicId,
+ PartitionId: &partitionId,
+ Strategy: iggcon.FirstPollingStrategy(),
+ Count: 100,
+ AutoCommit: true,
+ }
+
+ // Serialize the request
+ serialized, err := request.MarshalBinary()
+ if err != nil {
+ t.Error(err)
+ }
+
+ // Expected serialized bytes based on the provided sample request
+ expected := []byte{
+ 0x01, // Consumer Kind
+ 0x01, // ConsumerId Kind (NumericId)
+ 0x04, // ConsumerId Length (4)
+ 0x2A, 0x00, 0x0, 0x0, // ConsumerId
+
+ 0x02,
// StreamId Kind (StringId)
+ 0x0E,
// StreamId Length (14)
+ 0x74, 0x65, 0x73, 0x74, 0x5F, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6D, 0x5F, 0x69, 0x64, // StreamId
+
+ 0x02,
// TopicId Kind (StringId)
+ 0x0D,
// TopicId Length (13)
+ 0x74, 0x65, 0x73, 0x74, 0x5F, 0x74, 0x6F, 0x70, 0x69, 0x63,
0x5F, 0x69, 0x64, // TopicId
+
+ 0x01, // Partition present
+ 0x7B, 0x00, 0x00, 0x00, // PartitionId (123)
+ 0x03, //
PollingStrategy Kind
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, //
PollingStrategy Value (0)
+ 0x64, 0x00, 0x00, 0x00, // Count (100)
+ 0x01, // AutoCommit
+ }
+
+ // Check if the serialized bytes match the expected bytes
+ if !areBytesEqual(serialized, expected) {
+ t.Errorf("Serialized bytes are incorrect.
\nExpected:\t%v\nGot:\t\t%v", expected, serialized)
+ }
+}
+
+func areBytesEqual(a, b []byte) bool {
+ if len(a) != len(b) {
+ return false
+ }
+ for i := range a {
+ if a[i] != b[i] {
+ return false
+ }
+ }
+ return true
+}
+
+func TestSerialize_SendMessagesRequest(t *testing.T) {
+ message1 := generateTestMessage("data1")
+ streamId, _ := iggcon.NewIdentifier("test_stream_id")
+ topicId, _ := iggcon.NewIdentifier("test_topic_id")
+ request := SendMessages{
+ StreamId: streamId,
+ TopicId: topicId,
+ Partitioning: iggcon.PartitionId(1),
+ Messages: []iggcon.IggyMessage{
+ message1,
+ },
+ Compression: iggcon.MESSAGE_COMPRESSION_NONE,
+ }
+
+ // Serialize the request
+ serialized, err := request.MarshalBinary()
+ if err != nil {
+ t.Error(err)
+ }
+
+ // Expected serialized bytes based on the provided sample request
+ expected := []byte{
+ 0x29, 0x0, 0x0, 0x0, // metadataLength
+ 0x02,
// StreamId Kind (StringId)
+ 0x0E,
// StreamId Length (14)
+ 0x74, 0x65, 0x73, 0x74, 0x5F, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6D, 0x5F, 0x69, 0x64, // StreamId
+
+ 0x02,
// TopicId Kind (StringId)
+ 0x0D,
// TopicId Length (13)
+ 0x74, 0x65, 0x73, 0x74, 0x5F, 0x74, 0x6F, 0x70, 0x69, 0x63,
0x5F, 0x69, 0x64, // TopicId
+ 0x02, // PartitionIdKind
+ 0x04, // Partitioning Length
+ 0x01, 0x00, 0x00, 0x00, // PartitionId (123)
+ 0x01, 0x0, 0x0, 0x0, // MessageCount
+ 0, 0, 0, 0, 120, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // Index
(16*1) bytes
+ }
+ expected = append(expected, message1.Header.ToBytes()...)
+ expected = append(expected, message1.Payload...)
+ expected = append(expected, message1.UserHeaders...)
+
+ // Check if the serialized bytes match the expected bytes
+ if !bytes.Equal(serialized, expected) {
+ t.Errorf("Serialized bytes are incorrect.
\nExpected:\t%v\nGot:\t\t%v", expected, serialized)
+ }
+}
+
+func createDefaultMessageHeaders() []iggcon.HeaderEntry {
+ return []iggcon.HeaderEntry{
+ {Key: iggcon.HeaderKey{Kind: iggcon.String, Value:
[]byte("HeaderKey1")}, Value: iggcon.HeaderValue{Kind: iggcon.String, Value:
[]byte("Value 1")}},
+ {Key: iggcon.HeaderKey{Kind: iggcon.String, Value:
[]byte("HeaderKey2")}, Value: iggcon.HeaderValue{Kind: iggcon.Uint32, Value:
[]byte{0x01, 0x02, 0x03, 0x04}}},
+ }
+}
+
+func generateTestMessage(payload string) iggcon.IggyMessage {
+ msg, _ := iggcon.NewIggyMessage(
+ []byte(payload),
+ iggcon.WithID(uuid.New()),
+ iggcon.WithUserHeaders(createDefaultMessageHeaders()))
+ return msg
+}
diff --git a/foreign/go/contracts/offsets.go
b/foreign/go/internal/command/offset.go
similarity index 83%
rename from foreign/go/contracts/offsets.go
rename to foreign/go/internal/command/offset.go
index 8726a5957..ed187f8aa 100644
--- a/foreign/go/contracts/offsets.go
+++ b/foreign/go/internal/command/offset.go
@@ -15,19 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-package iggcon
+package command
-import "encoding/binary"
+import (
+ "encoding/binary"
+
+ iggcon "github.com/apache/iggy/foreign/go/contracts"
+)
type StoreConsumerOffsetRequest struct {
- StreamId Identifier `json:"streamId"`
- TopicId Identifier `json:"topicId"`
- Consumer Consumer `json:"consumer"`
- PartitionId *uint32 `json:"partitionId"`
- Offset uint64 `json:"offset"`
+ StreamId iggcon.Identifier `json:"streamId"`
+ TopicId iggcon.Identifier `json:"topicId"`
+ Consumer iggcon.Consumer `json:"consumer"`
+ PartitionId *uint32 `json:"partitionId"`
+ Offset uint64 `json:"offset"`
}
-func (s *StoreConsumerOffsetRequest) Code() CommandCode {
+func (s *StoreConsumerOffsetRequest) Code() Code {
return StoreOffsetCode
}
@@ -66,13 +70,13 @@ func (s *StoreConsumerOffsetRequest) MarshalBinary()
([]byte, error) {
}
type GetConsumerOffset struct {
- StreamId Identifier `json:"streamId"`
- TopicId Identifier `json:"topicId"`
- Consumer Consumer `json:"consumer"`
- PartitionId *uint32 `json:"partitionId"`
+ StreamId iggcon.Identifier `json:"streamId"`
+ TopicId iggcon.Identifier `json:"topicId"`
+ Consumer iggcon.Consumer `json:"consumer"`
+ PartitionId *uint32 `json:"partitionId"`
}
-func (g *GetConsumerOffset) Code() CommandCode {
+func (g *GetConsumerOffset) Code() Code {
return GetOffsetCode
}
@@ -109,20 +113,14 @@ func (g *GetConsumerOffset) MarshalBinary() ([]byte,
error) {
return bytes, nil
}
-type ConsumerOffsetInfo struct {
- PartitionId uint32 `json:"partitionId"`
- CurrentOffset uint64 `json:"currentOffset"`
- StoredOffset uint64 `json:"storedOffset"`
-}
-
type DeleteConsumerOffset struct {
- Consumer Consumer
- StreamId Identifier
- TopicId Identifier
+ Consumer iggcon.Consumer
+ StreamId iggcon.Identifier
+ TopicId iggcon.Identifier
PartitionId *uint32
}
-func (d *DeleteConsumerOffset) Code() CommandCode {
+func (d *DeleteConsumerOffset) Code() Code {
return DeleteConsumerOffsetCode
}
diff --git a/foreign/go/internal/command/partition.go
b/foreign/go/internal/command/partition.go
new file mode 100644
index 000000000..e7b2757da
--- /dev/null
+++ b/foreign/go/internal/command/partition.go
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package command
+
+import (
+ "encoding/binary"
+
+ iggcon "github.com/apache/iggy/foreign/go/contracts"
+)
+
+type CreatePartitions struct {
+ StreamId iggcon.Identifier `json:"streamId"`
+ TopicId iggcon.Identifier `json:"topicId"`
+ PartitionsCount uint32 `json:"partitionsCount"`
+}
+
+func (c *CreatePartitions) Code() Code {
+ return CreatePartitionsCode
+}
+
+func (c *CreatePartitions) MarshalBinary() ([]byte, error) {
+ streamIdBytes, err := c.StreamId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ topicIdBytes, err := c.TopicId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ bytes := make([]byte, len(streamIdBytes)+len(topicIdBytes)+4)
+ position := 0
+ copy(bytes[position:], streamIdBytes)
+ position += len(streamIdBytes)
+ copy(bytes[position:], topicIdBytes)
+ position += len(topicIdBytes)
+ binary.LittleEndian.PutUint32(bytes[position:position+4],
c.PartitionsCount)
+
+ return bytes, nil
+}
+
+type DeletePartitions struct {
+ StreamId iggcon.Identifier `json:"streamId"`
+ TopicId iggcon.Identifier `json:"topicId"`
+ PartitionsCount uint32 `json:"partitionsCount"`
+}
+
+func (d *DeletePartitions) Code() Code {
+ return DeletePartitionsCode
+}
+
+func (d *DeletePartitions) MarshalBinary() ([]byte, error) {
+ streamIdBytes, err := d.StreamId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ topicIdBytes, err := d.TopicId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ bytes := make([]byte, len(streamIdBytes)+len(topicIdBytes)+4)
+ position := 0
+ copy(bytes[position:], streamIdBytes)
+ position += len(streamIdBytes)
+ copy(bytes[position:], topicIdBytes)
+ position += len(topicIdBytes)
+ binary.LittleEndian.PutUint32(bytes[position:position+4],
d.PartitionsCount)
+
+ return bytes, nil
+}
diff --git a/foreign/go/contracts/session.go
b/foreign/go/internal/command/session.go
similarity index 86%
rename from foreign/go/contracts/session.go
rename to foreign/go/internal/command/session.go
index 4e62ab88c..606ce8821 100644
--- a/foreign/go/contracts/session.go
+++ b/foreign/go/internal/command/session.go
@@ -15,16 +15,18 @@
// specific language governing permissions and limitations
// under the License.
-package iggcon
+package command
-import "encoding/binary"
+import (
+ "encoding/binary"
+)
type LoginUser struct {
Username string `json:"username"`
Password string `json:"password"`
}
-func (lu *LoginUser) Code() CommandCode {
+func (lu *LoginUser) Code() Code {
return LoginUserCode
}
@@ -71,7 +73,7 @@ type LoginWithPersonalAccessToken struct {
Token string `json:"token"`
}
-func (lw *LoginWithPersonalAccessToken) Code() CommandCode {
+func (lw *LoginWithPersonalAccessToken) Code() Code {
return LoginWithAccessTokenCode
}
@@ -83,16 +85,9 @@ func (lw *LoginWithPersonalAccessToken) MarshalBinary()
([]byte, error) {
return bytes, nil
}
-type IdentityInfo struct {
- // Unique identifier (numeric) of the user.
- UserId uint32 `json:"userId"`
- // The optional tokens, used only by HTTP transport.
- AccessToken *string `json:"accessToken"`
-}
-
type LogoutUser struct{}
-func (lu *LogoutUser) Code() CommandCode {
+func (lu *LogoutUser) Code() Code {
return LogoutUserCode
}
diff --git a/foreign/go/contracts/update_stream.go
b/foreign/go/internal/command/stream.go
similarity index 50%
rename from foreign/go/contracts/update_stream.go
rename to foreign/go/internal/command/stream.go
index 2da4869ad..d3c3c4098 100644
--- a/foreign/go/contracts/update_stream.go
+++ b/foreign/go/internal/command/stream.go
@@ -15,16 +15,62 @@
// specific language governing permissions and limitations
// under the License.
-package iggcon
+package command
+
+import iggcon "github.com/apache/iggy/foreign/go/contracts"
+
+const (
+ nameLengthOffset = 0
+ payloadOffset = 1
+)
+
+type CreateStream struct {
+ Name string
+}
+
+func (request *CreateStream) Code() Code {
+ return CreateStreamCode
+}
+
+func (request *CreateStream) MarshalBinary() ([]byte, error) {
+ nameLength := len(request.Name)
+ serialized := make([]byte, payloadOffset+nameLength)
+ serialized[nameLengthOffset] = byte(nameLength)
+ copy(serialized[payloadOffset:], request.Name)
+ return serialized, nil
+}
+
+type GetStream struct {
+ StreamId iggcon.Identifier
+}
+
+func (g *GetStream) Code() Code {
+ return GetStreamCode
+}
+
+func (g *GetStream) MarshalBinary() ([]byte, error) {
+ return g.StreamId.MarshalBinary()
+}
+
+type GetStreams struct{}
+
+func (g *GetStreams) Code() Code {
+ return GetStreamsCode
+}
+
+func (g *GetStreams) MarshalBinary() ([]byte, error) {
+ return []byte{}, nil
+}
type UpdateStream struct {
- StreamId Identifier `json:"streamId"`
- Name string `json:"name"`
+ StreamId iggcon.Identifier `json:"streamId"`
+ Name string `json:"name"`
}
-func (u *UpdateStream) Code() CommandCode {
+func (u *UpdateStream) Code() Code {
return UpdateStreamCode
}
+
func (u *UpdateStream) MarshalBinary() ([]byte, error) {
streamIdBytes, err := u.StreamId.MarshalBinary()
if err != nil {
@@ -38,3 +84,15 @@ func (u *UpdateStream) MarshalBinary() ([]byte, error) {
copy(bytes[position+1:], u.Name)
return bytes, nil
}
+
+type DeleteStream struct {
+ StreamId iggcon.Identifier
+}
+
+func (d *DeleteStream) Code() Code {
+ return DeleteStreamCode
+}
+
+func (d *DeleteStream) MarshalBinary() ([]byte, error) {
+ return d.StreamId.MarshalBinary()
+}
diff --git a/foreign/go/contracts/create_stream_test.go
b/foreign/go/internal/command/stream_test.go
similarity index 66%
rename from foreign/go/contracts/create_stream_test.go
rename to foreign/go/internal/command/stream_test.go
index 19a9c15f9..b3859fef3 100644
--- a/foreign/go/contracts/create_stream_test.go
+++ b/foreign/go/internal/command/stream_test.go
@@ -15,11 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-package iggcon
+package command
import (
+ "bytes"
"reflect"
"testing"
+
+ iggcon "github.com/apache/iggy/foreign/go/contracts"
)
func TestSerialize_TcpCreateStreamRequest(t *testing.T) {
@@ -52,3 +55,28 @@ func TestSerialize_TcpCreateStreamRequest(t *testing.T) {
t.Errorf("Payload is incorrect. Expected: %v, Got: %v",
expectedPayload, serialized[payloadOffset:])
}
}
+
+func TestSerialize_UpdateStream(t *testing.T) {
+ streamId, _ := iggcon.NewIdentifier("stream")
+ request := UpdateStream{
+ StreamId: streamId,
+ Name: "update_stream",
+ }
+
+ serialized1, err := request.MarshalBinary()
+ if err != nil {
+ t.Errorf("Failed to serialize UpdateStream: %v", err)
+ }
+
+ expected := []byte{
+ 0x02, // StreamId Kind (StringId)
+ 0x06, // StreamId Length (2)
+ 0x73, 0x74, 0x72, 0x65, 0x61, 0x6D, // StreamId Value ("stream")
+ 0x0D,
// Name Length (13)
+ 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x5F, 0x73, 0x74, 0x72,
0x65, 0x61, 0x6D, // Name ("update_stream")
+ }
+
+ if !bytes.Equal(serialized1, expected) {
+ t.Errorf("Test case 1 failed. \nExpected:\t%v\nGot:\t\t%v",
expected, serialized1)
+ }
+}
diff --git a/foreign/go/contracts/command.go
b/foreign/go/internal/command/system.go
similarity index 65%
rename from foreign/go/contracts/command.go
rename to foreign/go/internal/command/system.go
index 2c0bbdbfb..778d7043f 100644
--- a/foreign/go/contracts/command.go
+++ b/foreign/go/internal/command/system.go
@@ -15,23 +15,27 @@
// specific language governing permissions and limitations
// under the License.
-package iggcon
+package command
-import (
- "encoding"
- "encoding/binary"
-)
+import "encoding/binary"
-type Command interface {
- // Code returns the command code associated with this command.
- Code() CommandCode
+type GetClient struct {
+ ClientID uint32
+}
- encoding.BinaryMarshaler
+func (c *GetClient) Code() Code {
+ return GetClientCode
+}
+
+func (c *GetClient) MarshalBinary() ([]byte, error) {
+ bytes := make([]byte, 4)
+ binary.LittleEndian.PutUint32(bytes, c.ClientID)
+ return bytes, nil
}
type GetClients struct{}
-func (c *GetClients) Code() CommandCode {
+func (c *GetClients) Code() Code {
return GetClientsCode
}
@@ -39,16 +43,33 @@ func (c *GetClients) MarshalBinary() ([]byte, error) {
return []byte{}, nil
}
-type GetClient struct {
- ClientID uint32
+type GetClusterMetadata struct {
}
-func (c *GetClient) Code() CommandCode {
- return GetClientCode
+func (m *GetClusterMetadata) Code() Code {
+ return GetClusterMetadataCode
}
-func (c *GetClient) MarshalBinary() ([]byte, error) {
- bytes := make([]byte, 4)
- binary.LittleEndian.PutUint32(bytes, c.ClientID)
- return bytes, nil
+func (m *GetClusterMetadata) MarshalBinary() ([]byte, error) {
+ return []byte{}, nil
+}
+
+type GetStats struct{}
+
+func (c *GetStats) Code() Code {
+ return GetStatsCode
+}
+
+func (c *GetStats) MarshalBinary() ([]byte, error) {
+ return []byte{}, nil
+}
+
+type Ping struct{}
+
+func (p *Ping) Code() Code {
+ return PingCode
+}
+
+func (p *Ping) MarshalBinary() ([]byte, error) {
+ return []byte{}, nil
}
diff --git a/foreign/go/internal/command/topic.go
b/foreign/go/internal/command/topic.go
new file mode 100644
index 000000000..1ce8a988d
--- /dev/null
+++ b/foreign/go/internal/command/topic.go
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package command
+
+import (
+ "encoding/binary"
+
+ "github.com/apache/iggy/foreign/go/contracts"
+)
+
+type CreateTopic struct {
+ StreamId iggcon.Identifier `json:"streamId"`
+ PartitionsCount uint32
`json:"partitionsCount"`
+ CompressionAlgorithm iggcon.CompressionAlgorithm
`json:"compressionAlgorithm"`
+ MessageExpiry iggcon.Duration `json:"messageExpiry"`
+ MaxTopicSize uint64 `json:"maxTopicSize"`
+ Name string `json:"name"`
+ ReplicationFactor *uint8
`json:"replicationFactor"`
+}
+
+func (t *CreateTopic) Code() Code {
+ return CreateTopicCode
+}
+
+func (t *CreateTopic) MarshalBinary() ([]byte, error) {
+ if t.ReplicationFactor == nil {
+ t.ReplicationFactor = new(uint8)
+ }
+
+ streamIdBytes, err := t.StreamId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ nameBytes := []byte(t.Name)
+
+ totalLength := len(streamIdBytes) + // StreamId
+ 4 + // PartitionsCount
+ 1 + // CompressionAlgorithm
+ 8 + // MessageExpiry
+ 8 + // MaxTopicSize
+ 1 + // ReplicationFactor
+ 1 + // Name length
+ len(nameBytes) // Name
+ bytes := make([]byte, totalLength)
+
+ position := 0
+
+ // StreamId
+ copy(bytes[position:], streamIdBytes)
+ position += len(streamIdBytes)
+
+ // PartitionsCount
+ binary.LittleEndian.PutUint32(bytes[position:], t.PartitionsCount)
+ position += 4
+
+ // CompressionAlgorithm
+ bytes[position] = byte(t.CompressionAlgorithm)
+ position++
+
+ // MessageExpiry
+ binary.LittleEndian.PutUint64(bytes[position:], uint64(t.MessageExpiry))
+ position += 8
+
+ // MaxTopicSize
+ binary.LittleEndian.PutUint64(bytes[position:], t.MaxTopicSize)
+ position += 8
+
+ // ReplicationFactor
+ bytes[position] = *t.ReplicationFactor
+ position++
+
+ // Name
+ bytes[position] = byte(len(nameBytes))
+ position++
+ copy(bytes[position:], nameBytes)
+
+ return bytes, nil
+}
+
+type GetTopic struct {
+ StreamId iggcon.Identifier
+ TopicId iggcon.Identifier
+}
+
+func (g *GetTopic) Code() Code {
+ return GetTopicCode
+}
+
+func (g *GetTopic) MarshalBinary() ([]byte, error) {
+ return iggcon.MarshalIdentifiers(g.StreamId, g.TopicId)
+}
+
+type GetTopics struct {
+ StreamId iggcon.Identifier
+}
+
+func (g *GetTopics) Code() Code {
+ return GetTopicsCode
+}
+
+func (g *GetTopics) MarshalBinary() ([]byte, error) {
+ return g.StreamId.MarshalBinary()
+}
+
+type DeleteTopic struct {
+ StreamId iggcon.Identifier
+ TopicId iggcon.Identifier
+}
+
+func (d *DeleteTopic) Code() Code {
+ return DeleteTopicCode
+}
+
+func (d *DeleteTopic) MarshalBinary() ([]byte, error) {
+ return iggcon.MarshalIdentifiers(d.StreamId, d.TopicId)
+}
+
+type UpdateTopic struct {
+ StreamId iggcon.Identifier `json:"streamId"`
+ TopicId iggcon.Identifier `json:"topicId"`
+ CompressionAlgorithm iggcon.CompressionAlgorithm
`json:"compressionAlgorithm"`
+ MessageExpiry iggcon.Duration `json:"messageExpiry"`
+ MaxTopicSize uint64 `json:"maxTopicSize"`
+ ReplicationFactor *uint8
`json:"replicationFactor"`
+ Name string `json:"name"`
+}
+
+func (u *UpdateTopic) Code() Code {
+ return UpdateTopicCode
+}
+
+func (u *UpdateTopic) MarshalBinary() ([]byte, error) {
+ if u.ReplicationFactor == nil {
+ u.ReplicationFactor = new(uint8)
+ }
+ streamIdBytes, err := u.StreamId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ topicIdBytes, err := u.TopicId.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+
+ buffer := make([]byte,
19+len(streamIdBytes)+len(topicIdBytes)+len(u.Name))
+
+ offset := 0
+
+ offset += copy(buffer[offset:], streamIdBytes)
+ offset += copy(buffer[offset:], topicIdBytes)
+
+ buffer[offset] = byte(u.CompressionAlgorithm)
+ offset++
+
+ binary.LittleEndian.PutUint64(buffer[offset:], uint64(u.MessageExpiry))
+ offset += 8
+
+ binary.LittleEndian.PutUint64(buffer[offset:], u.MaxTopicSize)
+ offset += 8
+
+ buffer[offset] = *u.ReplicationFactor
+ offset++
+
+ buffer[offset] = uint8(len(u.Name))
+ offset++
+
+ copy(buffer[offset:], u.Name)
+
+ return buffer, nil
+}
diff --git a/foreign/go/contracts/update_topic_test.go
b/foreign/go/internal/command/topic_test.go
similarity index 91%
rename from foreign/go/contracts/update_topic_test.go
rename to foreign/go/internal/command/topic_test.go
index 319b7b43c..12357ed4d 100644
--- a/foreign/go/contracts/update_topic_test.go
+++ b/foreign/go/internal/command/topic_test.go
@@ -15,21 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-package iggcon
+package command
import (
"bytes"
"testing"
+
+ iggcon "github.com/apache/iggy/foreign/go/contracts"
)
func TestSerialize_UpdateTopic(t *testing.T) {
- streamId, _ := NewIdentifier("stream")
- topicId, _ := NewIdentifier(uint32(1))
+ streamId, _ := iggcon.NewIdentifier("stream")
+ topicId, _ := iggcon.NewIdentifier(uint32(1))
request := UpdateTopic{
StreamId: streamId,
TopicId: topicId,
Name: "update_topic",
- MessageExpiry: 100 * Microsecond,
+ MessageExpiry: 100 * iggcon.Microsecond,
MaxTopicSize: 100,
}
diff --git a/foreign/go/contracts/update_user.go
b/foreign/go/internal/command/update_user.go
similarity index 86%
rename from foreign/go/contracts/update_user.go
rename to foreign/go/internal/command/update_user.go
index 1dad2db14..a20d647fb 100644
--- a/foreign/go/contracts/update_user.go
+++ b/foreign/go/internal/command/update_user.go
@@ -15,15 +15,17 @@
// specific language governing permissions and limitations
// under the License.
-package iggcon
+package command
+
+import "github.com/apache/iggy/foreign/go/contracts"
type UpdateUser struct {
- UserID Identifier `json:"-"`
- Username *string `json:"username"`
- Status *UserStatus `json:"userStatus"`
+ UserID iggcon.Identifier `json:"-"`
+ Username *string `json:"username"`
+ Status *iggcon.UserStatus `json:"userStatus"`
}
-func (u *UpdateUser) Code() CommandCode {
+func (u *UpdateUser) Code() Code {
return UpdateUserCode
}
@@ -71,9 +73,9 @@ func (u *UpdateUser) MarshalBinary() ([]byte, error) {
position++
statusByte := byte(0)
switch *u.Status {
- case Active:
+ case iggcon.Active:
statusByte = 1
- case Inactive:
+ case iggcon.Inactive:
statusByte = 2
}
bytes[position] = statusByte
diff --git a/foreign/go/internal/command/user.go
b/foreign/go/internal/command/user.go
new file mode 100644
index 000000000..628140a43
--- /dev/null
+++ b/foreign/go/internal/command/user.go
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package command
+
+import (
+ "encoding/binary"
+
+ iggcon "github.com/apache/iggy/foreign/go/contracts"
+)
+
+type CreateUser struct {
+ Username string `json:"username"`
+ Password string `json:"Password"`
+ Status iggcon.UserStatus `json:"Status"`
+ Permissions *iggcon.Permissions `json:"Permissions,omitempty"`
+}
+
+func (c *CreateUser) Code() Code {
+ return CreateUserCode
+}
+
+func (c *CreateUser) MarshalBinary() ([]byte, error) {
+ capacity := 4 + len(c.Username) + len(c.Password)
+ if c.Permissions != nil {
+ capacity += 1 + 4 + c.Permissions.Size()
+ }
+
+ bytes := make([]byte, capacity)
+ position := 0
+
+ bytes[position] = byte(len(c.Username))
+ position += 1
+ copy(bytes[position:position+len(c.Username)], []byte(c.Username))
+ position += len(c.Username)
+
+ bytes[position] = byte(len(c.Password))
+ position += 1
+ copy(bytes[position:position+len(c.Password)], []byte(c.Password))
+ position += len(c.Password)
+
+ statusByte := byte(0)
+ switch c.Status {
+ case iggcon.Active:
+ statusByte = byte(1)
+ case iggcon.Inactive:
+ statusByte = byte(2)
+ }
+ bytes[position] = statusByte
+ position += 1
+
+ if c.Permissions != nil {
+ bytes[position] = byte(1)
+ position += 1
+ permissionsBytes, err := c.Permissions.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ binary.LittleEndian.PutUint32(bytes[position:position+4],
uint32(len(permissionsBytes)))
+ position += 4
+ copy(bytes[position:position+len(permissionsBytes)],
permissionsBytes)
+ } else {
+ bytes[position] = byte(0)
+ }
+
+ return bytes, nil
+}
+
+type GetUser struct {
+ Id iggcon.Identifier
+}
+
+func (c *GetUser) Code() Code {
+ return GetUserCode
+}
+
+func (c *GetUser) MarshalBinary() ([]byte, error) {
+ return c.Id.MarshalBinary()
+}
+
+type GetUsers struct{}
+
+func (g *GetUsers) Code() Code {
+ return GetUsersCode
+}
+
+func (g *GetUsers) MarshalBinary() ([]byte, error) {
+ return []byte{}, nil
+}
+
+type UpdatePermissions struct {
+ UserID iggcon.Identifier `json:"-"`
+ Permissions *iggcon.Permissions `json:"Permissions,omitempty"`
+}
+
+func (u *UpdatePermissions) Code() Code {
+ return UpdatePermissionsCode
+}
+
+func (u *UpdatePermissions) MarshalBinary() ([]byte, error) {
+ userIdBytes, err := u.UserID.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ length := len(userIdBytes)
+
+ if u.Permissions != nil {
+ length += 1 + 4 + u.Permissions.Size()
+ }
+
+ bytes := make([]byte, length)
+ position := 0
+
+ copy(bytes[position:position+len(userIdBytes)], userIdBytes)
+ position += len(userIdBytes)
+
+ if u.Permissions != nil {
+ bytes[position] = 1
+ position++
+ permissionsBytes, err := u.Permissions.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ binary.LittleEndian.PutUint32(bytes[position:position+4],
uint32(len(permissionsBytes)))
+ position += 4
+ copy(bytes[position:position+len(permissionsBytes)],
permissionsBytes)
+ } else {
+ bytes[position] = 0
+ }
+
+ return bytes, nil
+}
+
+type ChangePassword struct {
+ UserID iggcon.Identifier `json:"-"`
+ CurrentPassword string `json:"CurrentPassword"`
+ NewPassword string `json:"NewPassword"`
+}
+
+func (c *ChangePassword) Code() Code {
+ return ChangePasswordCode
+}
+
+func (c *ChangePassword) MarshalBinary() ([]byte, error) {
+ userIdBytes, err := c.UserID.MarshalBinary()
+ if err != nil {
+ return nil, err
+ }
+ length := len(userIdBytes) + len(c.CurrentPassword) +
len(c.NewPassword) + 2
+ bytes := make([]byte, length)
+ position := 0
+
+ copy(bytes[position:position+len(userIdBytes)], userIdBytes)
+ position += len(userIdBytes)
+
+ bytes[position] = byte(len(c.CurrentPassword))
+ position++
+ copy(bytes[position:position+len(c.CurrentPassword)], c.CurrentPassword)
+ position += len(c.CurrentPassword)
+
+ bytes[position] = byte(len(c.NewPassword))
+ position++
+ copy(bytes[position:position+len(c.NewPassword)], c.NewPassword)
+
+ return bytes, nil
+}
+
+type DeleteUser struct {
+ Id iggcon.Identifier
+}
+
+func (d *DeleteUser) Code() Code {
+ return DeleteUserCode
+}
+
+func (d *DeleteUser) MarshalBinary() ([]byte, error) {
+ return d.Id.MarshalBinary()
+}