Steve, Have you looked at grouping your putAll() requests into groups that align to Geode’s buckets? In your application code, you can determine the hash for each data item and self-partition the entries. This allows you to send the requests on separate threads in parallel while optimizing network traffic.
I have seen this used for very high-throughput ingest use cases. Anthony > On Apr 16, 2020, at 11:09 AM, Anilkumar Gingade <aging...@pivotal.io> wrote: > >>> PutAllPRMessage.* > > These are internal APIs/message protocols used to handle PartitionedRegin > messages. > The messages are sent from originator node to peer nodes to operate on a > given partitioned region; not intended as application APIs. > > We could consider, looking at the code, which determines bucket-id for each > of putAll keys. If there is routing info that identifies a common data > store (bucket); the code could be optimized there... > > My recommendation is still using the existing APIs and trying to tune the > putAll map size. By reducing the map size, you will be pushing small chunks > of data to the server, while remaining data is acted upon (at client); > which keeps both client and server busy at the same time. You can also look > at tuning socket buffer size, to fit your data size so that the data is > written/read in a single chunk. > > -Anil > > > On Wed, Apr 15, 2020 at 7:01 PM steve mathew <steve.mathe...@gmail.com> > wrote: > >> Anil, yes its a kind of custom hash (which involves calculating hash on all >> fields of row). Have to stick to the predefined mechanism based on which >> source files are generated. >> >> It would be great help if some-one guide me about any available >> *server-side >> internal API that provides bucket level data-ingestion if any*. While >> exploring came across "*PartitionRegion.sendMsgByBucket(bucketId, >> PutAllPRMessage)*"..Can this API internally takes care of redundancy >> (ingestion into secondary buckets on peer nodes)..? >> >> Can someone explain about >> *PutAllPRMessage.operateOnPartitionedRegion(ClusterDistributionManager >> dm, PartitionedRegion pr,..)*, it seems this handles putAll msg from peer.. >> When is this required..? >> >> Thanks >> >> Steve M. >> >> On Wed, Apr 15, 2020 at 11:06 PM Anilkumar Gingade <aging...@pivotal.io> >> wrote: >> >>> About api: I would not recommend using bucketId in api, as it is internal >>> and there are other internal/external apis that rely on bucket id >>> calculations; which could be compromised here. >>> >>> Instead of adding new APIs, probably looking at minimizing/reducing the >>> time spent may be a good start. >>> >>> BucketRegin.waitUntilLocked - A putAll thread could spend time here, when >>> there are multiple threads acting upon the same thread; one way to reduce >>> this is by tuning the putall size, can you try changing our putall size >>> (say start with 100). >>> >>> I am wondering about the time spent in hashcode(); is it a custom code? >>> >>> If you want to create the buckets upfront, you can try calling the >> method: >>> PartitionRegionHelper.assignBucketsToPartitions(). >>> >>> -Anil >>> >>> >>> On Wed, Apr 15, 2020 at 8:37 AM steve mathew <steve.mathe...@gmail.com> >>> wrote: >>> >>>> Thanks Den, Anil and Udo for your inputs. Extremely sorry for late rely >>> as >>>> I took bit of time to explore and understand geode internals. >>>> >>>> It seems BucketRegion/Bucket terminology is not exposed to user but >>> still i >>>> am trying to achieve something that is uncommon and for which client >> API >>> is >>>> not exposed. >>>> >>>> *Details about Use-case/Client * >>>> - MultiThreadClient - Each task perform data-ingestion on specific >>> bucket. >>>> Each task knows the bucket number to ingest data. In-short client knows >>>> task-->bucket mapping. >>>> - Each task iteratively ingest-data into batch (configurable) of 1000 >>>> records to the bucket assigned to it. >>>> - Parallelism is achieved by running multiple tasks concurrently. >>>> >>>> >>>> *When i tried with exisitng R.putAll() API, observed slow performance >> and >>>> related observations are* - Few tasks takes quite a longer time >>> (ThreaDump >>>> shows--> Thread WAITING on BucketRegin.waitUntilLocked), hence overall >>>> client takes longer time. >>>> - Code profiling shows good amount of time spent during hash-code >>>> calculation. It seems key.hashCode() gets calculated in on both client >>> and >>>> server, which is not required for my use-case as task-->bucket mapping >>>> known before. >>>> - putAll() client implementation takes care of Parallelism (using >>>> PRMetadata enabled thread-pool and reshuffle the keys internally), but >> in >>>> my-case that's taken care by multiple tasks each per buckrt within my >>>> client. >>>> >>>> *I have forked the Geode codebase and trying to extend it by providing >> a >>>> client API like, * >>>> //Region.java >>>> /** >>>> * putAll records in specified bucket >>>> */ >>>> *public void putAll(int bucketId, map) * >>>> >>>> Already added client side message and related code (similar to putAllOp >>> and >>>> its impl) , I am adding server-side code/BaseCommand, similar to putAll >>>> code-path (cmdExecute()/virtualPut() etc),* is there any API (internal) >>>> that provides bucket specific putAll and take care of redundancy - >>>> secondary bucket ingestion - as well and i can use/hook it directly >> ..?* >>>> >>>> It seems, if i isolate bucket-creation and actual put flow (create >> bucket >>>> prior to putAll call) it may work better in my scenario, hence >>>> *Is there any (recommendations) to create buckets explicitly prior to >>>> actual PUT and not within putAll flow lazily on actual PUT. Is there >> any >>>> internal API available for this, that can be used or other means like >> FE >>>> etc....?* >>>> >>>> *Data processing/retrieval : *I am not going to use get/getAll API but >>>> will process the data using FE and Querying mechanism once achieve >> bucket >>>> specific ingestion. >>>> >>>> >>>> *Overall thoughts on this API impl. ..?* >>>> >>>> Looking forward to the inputs.. >>>> Thanks in advance. >>>> >>>> *Steve M* >>>> >>>> >>>> >>>> >>>> >>>> On Sat, Apr 11, 2020 at 7:12 PM Udo Kohlmeyer <u...@vmware.com.invalid> >>>> wrote: >>>> >>>>> Hi there Steve, >>>>> >>>>> Firstly, you are correct, the pattern you are describing is not >>>>> recommended and possibly not even correctly supported. I've seen many >>>>> implementations of Geode systems and none of them ever needed to do >>> what >>>>> you are intending to do. Seems like you are will to go through A LOT >> of >>>>> effort for a benefit I don't immediately realize. >>>>> >>>>> Also, I'm confused on what part of the "hashing" you are trying to >>> avoid. >>>>> You will ALWAYS have the hashing overhead. At the very least the key >>>> will >>>>> have to be hashed for put() and later on get(). >>>>> As for the "file-per-bucket" request, there will always be some form >> of >>>>> bucket resolution that needs to happen. Be it a custom >>>> PartitionResolution >>>>> or default partition bucket resolver. >>>>> >>>>> In the code that Dan provided, you now have to manage the bucket >> number >>>>> explicitly in the client. When you insert data, you have to provide >> the >>>>> correct bucket number and if you retrieve the data, you have to >> provide >>>> the >>>>> correct bucket number, otherwise you will get "null" back. So this >>> means >>>>> your client has to manage the bucket numbers. Because every >> subsequent >>>>> put/get that does not provide the bucket number, will possibly result >>> in >>>>> some failure. In short, EVERY key operation (put/get) will require a >>>>> bucketNumber to function correctly, as the PartitionResolver is used. >>>>> >>>>> Maybe we can aid you better in suitable solution by understanding >> WHAT >>>> you >>>>> are trying to achieve and WHAT you are trying to avoid. >>>>> >>>>> So in short, you will NOT avoid hashing, as a Region will always hash >>> the >>>>> key, regardless of how you load your data. Think of a Region as a big >>>>> distributed HashMap. Hashing is in its DNA and inner workings. The >> only >>>>> thing step you'd avoid is the bucket allocation calculation, which >> tbh, >>>> is >>>>> lightweight >>>>> >>>>> `bucketNumber = (hashcode % totalNumberBuckets) + 1` >>>>> >>>>> --Udo >>>>> >>>>> On 4/10/20, 3:52 PM, "steve mathew" <steve.mathe...@gmail.com> >> wrote: >>>>> >>>>> Thanks Dan for your quick response. >>>>> >>>>> Though, This may not be a recommended pattern, Here, I am >>> targeting a >>>>> bucket specific putAll and want to exclude hashing as it turn out >>> as >>>> an >>>>> overhead in my scenario. >>>>> Is this achievable...? How should I define a PartionResolver that >>>> works >>>>> generically and returns a respective bucket for specific file. >>>>> What will get impacted if I opt this route (Fix partitioning per >>>>> file), can >>>>> think of horizontal scalability as buckets made fix .. thoughts? >>>>> >>>>> >>>>> -Steave M. >>>>> >>>>> >>>>> On Sat, Apr 11, 2020, 1:54 AM Dan Smith <dsm...@pivotal.io> >> wrote: >>>>> >>>>>> Hi Steve, >>>>>> >>>>>> The bucket that data goes into is generally determined by the >>> key. >>>>> So for >>>>>> example if your data in File-0 is all for customer X, you can >>>> include >>>>>> Customer X in your region key and implement a PartitionResolver >>>> that >>>>>> extracts the customer from your region key and returns it. >> Geode >>>>> will then >>>>>> group all of the data for Customer X into a single bucket. >>>>>> >>>>>> You generally shouldn't have to target a specific bucket number >>> (eg >>>>> bucket >>>>>> 0). But technically you can just by returning an integer from >>> your >>>>>> PartitionResolver. If you return the integer 0, your data will >> go >>>>> into >>>>>> bucket 0. Usually it's just better to return your partition key >>> (eg >>>>>> "Customer X") and let geode hash that to some bucket number. >>>>>> >>>>>> -Dan >>>>>> >>>>>> On Fri, Apr 10, 2020 at 11:04 AM steve mathew < >>>>> steve.mathe...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hello Geode devs and users, >>>>>>> >>>>>>> I have a set of files populated with data, fairly >> distributed, >>> I >>>>> want to >>>>>>> put each file's data in a specific bucket, >>>>>>> like PutAll File-0 data into Geode bucket B0 >>>>>>> PutAll File-1 data into Geode bucket B1 >>>>>>> >>>>>>> and so on... >>>>>>> >>>>>>> How can i achieve this using geode client...? >>>>>>> >>>>>>> Can i achieve this using PartitonResolver or some other >>> means...? >>>>>>> >>>>>>> Thanks in advance >>>>>>> >>>>>>> -Steve M. >>>>>>> >>>>>> >>>>> >>>>> >>>>> >>>> >>> >>