Steve,

Have you looked at grouping your putAll() requests into groups that align to 
Geode’s buckets?  In your application code, you can determine the hash for each 
data item and self-partition the entries.  This allows you to send the requests 
on separate threads in parallel while optimizing network traffic.

I have seen this used for very high-throughput ingest use cases.

Anthony


> On Apr 16, 2020, at 11:09 AM, Anilkumar Gingade <aging...@pivotal.io> wrote:
> 
>>> PutAllPRMessage.*
> 
> These are internal APIs/message protocols used to handle PartitionedRegin
> messages.
> The messages are sent from originator node to peer nodes to operate on a
> given partitioned region; not intended as application APIs.
> 
> We could consider, looking at the code, which determines bucket-id for each
> of putAll keys. If there is routing info that identifies a common data
> store (bucket); the code could be optimized there...
> 
> My recommendation is still using the existing APIs and trying to tune the
> putAll map size. By reducing the map size, you will be pushing small chunks
> of data to the server, while remaining data is acted upon (at client);
> which keeps both client and server busy at the same time. You can also look
> at tuning socket buffer size, to fit your data size so that the data is
> written/read in a single chunk.
> 
> -Anil
> 
> 
> On Wed, Apr 15, 2020 at 7:01 PM steve mathew <steve.mathe...@gmail.com>
> wrote:
> 
>> Anil, yes its a kind of custom hash (which involves calculating hash on all
>> fields of row). Have to stick to the predefined mechanism based on which
>> source files are generated.
>> 
>> It would be great help if some-one guide me about any available
>> *server-side
>> internal API that provides bucket level data-ingestion if any*. While
>> exploring came across "*PartitionRegion.sendMsgByBucket(bucketId,
>> PutAllPRMessage)*"..Can this API internally takes care of redundancy
>> (ingestion into secondary buckets on peer nodes)..?
>> 
>> Can someone explain about
>> *PutAllPRMessage.operateOnPartitionedRegion(ClusterDistributionManager
>> dm, PartitionedRegion pr,..)*, it seems this handles putAll msg from peer..
>> When is this required..?
>> 
>> Thanks
>> 
>> Steve M.
>> 
>> On Wed, Apr 15, 2020 at 11:06 PM Anilkumar Gingade <aging...@pivotal.io>
>> wrote:
>> 
>>> About api: I would not recommend using bucketId in api, as it is internal
>>> and there are other internal/external apis that rely on bucket id
>>> calculations; which could be compromised here.
>>> 
>>> Instead of adding new APIs, probably looking at minimizing/reducing the
>>> time spent may be a good start.
>>> 
>>> BucketRegin.waitUntilLocked - A putAll thread could spend time here, when
>>> there are multiple threads acting upon the same thread; one way to reduce
>>> this is by tuning the putall size, can you try changing our putall size
>>> (say start with 100).
>>> 
>>> I am wondering about the time spent in hashcode(); is it a custom code?
>>> 
>>> If you want to create the buckets upfront, you can try calling the
>> method:
>>> PartitionRegionHelper.assignBucketsToPartitions().
>>> 
>>> -Anil
>>> 
>>> 
>>> On Wed, Apr 15, 2020 at 8:37 AM steve mathew <steve.mathe...@gmail.com>
>>> wrote:
>>> 
>>>> Thanks Den, Anil and Udo for your inputs. Extremely sorry for late rely
>>> as
>>>> I took bit of time to explore and understand geode internals.
>>>> 
>>>> It seems BucketRegion/Bucket terminology is not exposed to user but
>>> still i
>>>> am trying to achieve something that is uncommon and for which client
>> API
>>> is
>>>> not exposed.
>>>> 
>>>> *Details about Use-case/Client *
>>>> - MultiThreadClient - Each task perform data-ingestion on specific
>>> bucket.
>>>> Each task knows the bucket number to ingest data. In-short client knows
>>>> task-->bucket mapping.
>>>> - Each task iteratively ingest-data into batch (configurable) of 1000
>>>> records to the bucket assigned to it.
>>>> - Parallelism is achieved by running multiple tasks concurrently.
>>>> 
>>>> 
>>>> *When i tried with exisitng R.putAll() API, observed slow performance
>> and
>>>> related observations are* - Few tasks takes quite a longer time
>>> (ThreaDump
>>>> shows--> Thread WAITING on BucketRegin.waitUntilLocked), hence overall
>>>> client takes longer time.
>>>> - Code profiling shows good amount of time spent during hash-code
>>>> calculation. It seems key.hashCode() gets calculated in on both client
>>> and
>>>> server, which is not required for my use-case as task-->bucket mapping
>>>> known before.
>>>> - putAll() client implementation takes care of Parallelism (using
>>>> PRMetadata enabled thread-pool and reshuffle the keys internally), but
>> in
>>>> my-case that's taken care by multiple tasks each per buckrt within my
>>>> client.
>>>> 
>>>> *I have forked the Geode codebase and trying to extend it by providing
>> a
>>>> client API like, *
>>>> //Region.java
>>>> /**
>>>> * putAll records in specified bucket
>>>> */
>>>> *public void putAll(int bucketId, map) *
>>>> 
>>>> Already added client side message and related code (similar to putAllOp
>>> and
>>>> its impl) , I am adding server-side code/BaseCommand, similar to putAll
>>>> code-path (cmdExecute()/virtualPut() etc),* is there any API (internal)
>>>> that provides bucket specific putAll and take care of redundancy -
>>>> secondary bucket ingestion - as well and i can use/hook it directly
>> ..?*
>>>> 
>>>> It seems, if i isolate bucket-creation and actual put flow (create
>> bucket
>>>> prior to putAll call) it may work better in my scenario, hence
>>>> *Is there any (recommendations) to create buckets explicitly prior to
>>>> actual PUT and not within putAll flow lazily on actual PUT. Is there
>> any
>>>> internal API available for this, that can be used or other means like
>> FE
>>>> etc....?*
>>>> 
>>>> *Data processing/retrieval :  *I am not going to use get/getAll API but
>>>> will process the data using FE and Querying mechanism once achieve
>> bucket
>>>> specific ingestion.
>>>> 
>>>> 
>>>> *Overall thoughts on this API impl. ..?*
>>>> 
>>>> Looking forward to the inputs..
>>>> Thanks in advance.
>>>> 
>>>> *Steve M*
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Sat, Apr 11, 2020 at 7:12 PM Udo Kohlmeyer <u...@vmware.com.invalid>
>>>> wrote:
>>>> 
>>>>> Hi there Steve,
>>>>> 
>>>>> Firstly, you are correct, the pattern you are describing is not
>>>>> recommended and possibly not even correctly supported. I've seen many
>>>>> implementations of Geode systems and none of them ever needed to do
>>> what
>>>>> you are intending to do. Seems like you are will to go through A LOT
>> of
>>>>> effort for a benefit I don't immediately realize.
>>>>> 
>>>>> Also, I'm confused on what part of the "hashing" you are trying to
>>> avoid.
>>>>> You will ALWAYS  have the hashing overhead. At the very least the key
>>>> will
>>>>> have to be hashed for put() and later on get().
>>>>> As for the "file-per-bucket" request, there will always be some form
>> of
>>>>> bucket resolution that needs to happen. Be it a custom
>>>> PartitionResolution
>>>>> or default partition bucket resolver.
>>>>> 
>>>>> In the code that Dan provided, you now have to manage the bucket
>> number
>>>>> explicitly in the client. When you insert data, you have to provide
>> the
>>>>> correct bucket number and if you retrieve the data, you have to
>> provide
>>>> the
>>>>> correct bucket number, otherwise you will get "null" back. So this
>>> means
>>>>> your client has to manage the bucket numbers. Because every
>> subsequent
>>>>> put/get that does not provide the bucket number, will possibly result
>>> in
>>>>> some failure. In short, EVERY key operation (put/get) will require a
>>>>> bucketNumber to function correctly, as the PartitionResolver is used.
>>>>> 
>>>>> Maybe we can aid you better in suitable solution by understanding
>> WHAT
>>>> you
>>>>> are trying to achieve and WHAT you are trying to avoid.
>>>>> 
>>>>> So in short, you will NOT avoid hashing, as a Region will always hash
>>> the
>>>>> key, regardless of how you load your data. Think of a Region as a big
>>>>> distributed HashMap. Hashing is in its DNA and inner workings. The
>> only
>>>>> thing step you'd avoid is the bucket allocation calculation, which
>> tbh,
>>>> is
>>>>> lightweight
>>>>> 
>>>>> `bucketNumber = (hashcode % totalNumberBuckets) + 1`
>>>>> 
>>>>> --Udo
>>>>> 
>>>>> On 4/10/20, 3:52 PM, "steve mathew" <steve.mathe...@gmail.com>
>> wrote:
>>>>> 
>>>>>    Thanks Dan for your quick response.
>>>>> 
>>>>>    Though, This may not be a recommended pattern, Here, I am
>>> targeting a
>>>>>    bucket specific putAll and want to exclude hashing as it turn out
>>> as
>>>> an
>>>>>    overhead in my scenario.
>>>>>    Is this achievable...? How should I define a PartionResolver that
>>>> works
>>>>>    generically and returns a respective bucket for specific file.
>>>>>    What will get impacted if I opt this route (Fix partitioning per
>>>>> file), can
>>>>>    think of horizontal scalability as buckets made fix .. thoughts?
>>>>> 
>>>>> 
>>>>>    -Steave M.
>>>>> 
>>>>> 
>>>>>    On Sat, Apr 11, 2020, 1:54 AM Dan Smith <dsm...@pivotal.io>
>> wrote:
>>>>> 
>>>>>> Hi Steve,
>>>>>> 
>>>>>> The bucket that data goes into is generally determined by the
>>> key.
>>>>> So for
>>>>>> example if your data in File-0 is all for customer X, you can
>>>> include
>>>>>> Customer X in your region key and implement a PartitionResolver
>>>> that
>>>>>> extracts the customer from your region key and returns it.
>> Geode
>>>>> will then
>>>>>> group all of the data for Customer X into a single bucket.
>>>>>> 
>>>>>> You generally shouldn't have to target a specific bucket number
>>> (eg
>>>>> bucket
>>>>>> 0). But technically you can just by returning an integer from
>>> your
>>>>>> PartitionResolver. If you return the integer 0, your data will
>> go
>>>>> into
>>>>>> bucket 0. Usually it's just better to return your partition key
>>> (eg
>>>>>> "Customer X") and let geode hash that to some bucket number.
>>>>>> 
>>>>>> -Dan
>>>>>> 
>>>>>> On Fri, Apr 10, 2020 at 11:04 AM steve mathew <
>>>>> steve.mathe...@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hello Geode devs and users,
>>>>>>> 
>>>>>>> I have a set of files populated with data, fairly
>> distributed,
>>> I
>>>>> want to
>>>>>>> put each file's data in a specific bucket,
>>>>>>> like PutAll File-0 data into Geode bucket B0
>>>>>>>      PutAll File-1 data into Geode bucket B1
>>>>>>> 
>>>>>>>      and so on...
>>>>>>> 
>>>>>>> How can i achieve this using geode client...?
>>>>>>> 
>>>>>>> Can i achieve this using PartitonResolver or some other
>>> means...?
>>>>>>> 
>>>>>>> Thanks in advance
>>>>>>> 
>>>>>>> -Steve M.
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Reply via email to