Vectorrent opened a new issue, #43275:
URL: https://github.com/apache/arrow/issues/43275

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   ## Versions
   apache-arrow: ^16.1.0
   parquet-wasm: ^0.6.1
   Node.js: v18.20.3
   
   ## Problem
   I am building a training data sampler in JavaScript, which fetches Parquet 
files from Huggingface, imports them with `parquet-wasm`, and loads them into 
Arrow. In general, this pipeline works well.
   
   My code will download and load 1 Parquet file, every few minutes. The very 
first attempt to do this will ALWAYS succeed.
   
   However, subsequent attempts have high probability of failing with a very 
strange bug:
   
   In normal operation, I will read data from Arrow by using 
`column.get(rowIdx)`. When I do this, large strings of text are returned 
successfully. That said, when loading and reading from subsequent shards, there 
is a VERY high chance that `column.get(rowIdx)` will not return text; it will 
return a `BigInt` like `899n` instead. This is unexpected, and thus - my code 
will fail (because I cannot read from the data). Every batch, column and row 
simply returns a BigInt, when it should be returning a string.
   
   I can resolve this problem by repeatedly fetching and loading shards, until 
one of them finally loads successfully. Of course, this is not a solution.
   
   I can confirm that the data itself is not the problem. Upon re-running a 
script, shards that were previously failing can run correctly. I have also 
tried reaching out to [the author of 
parquet-wasm](https://github.com/kylebarron/parquet-wasm/issues/568), and they 
believe the bug must lie in Arrow itself, not in the Parquet Rust library.
   
   **TL;DR - Row data will unexpectedly return BigInts, instead of strings.** 
   
   Any help would be greatly appreciated.
   
   Please see below for a reproducible script. Run this code for a few minutes, 
and you will see some shards begin to return BigInt data, instead of strings.
   ```js
   import { tableFromIPC } from 'apache-arrow'
   import { readParquet } from 'parquet-wasm'
   
   class CosmopediaDataset {
       constructor() {
           this.dataset = 'HuggingFaceTB/cosmopedia'
           this.slices = [
               { slice: 'auto_math_text', shards: 18 },
               { slice: 'khanacademy', shards: 1 },
               { slice: 'openstax', shards: 2 },
               { slice: 'stanford', shards: 13 },
               { slice: 'stories', shards: 43 },
               { slice: 'web_samples_v1', shards: 139 },
               { slice: 'web_samples_v2', shards: 118 },
               { slice: 'wikihow', shards: 2 }
           ]
           this.split = 'train'
           this.delimiter = '\n\n'
           this.eosToken = '<|eos|>'
           this.cacheSize = 20000
           this.cachedText = ''
           this.cycleShardInterval = 10000 // batches
           this.batches = 0
           this.arrowWasmTable = null
       }
   
       async init() {
           await this.fetchRandomShard()
       }
   
       async fetchRandomShard() {
           const { slice, shards } = this.getWeightedRandomSlice(this.slices)
           const shardIndices = generatePaddedNumbers(0, shards, 5)
           const numShards = shardIndices.slice(-1)
           const allShards = shardIndices.slice(0, -1)
           const shard = randomValueFromArray(allShards)
           console.log('fetching shard:', `${shard}/${numShards}`, 'slice:', 
slice)
           const path = 
`data/${slice}/${this.split}-${shard}-of-${numShards}.parquet`
           this.url = 
`https://huggingface.co/datasets/${this.dataset}/resolve/main/${path}`
           console.log(this.url)
           try {
               const response = await fetch(this.url)
               this.buffer = new Uint8Array(await response.arrayBuffer())
               this.moveDataIntoTable()
               console.log('moved shard to table:', shard)
           } catch (err) {
               console.error(err)
               console.warn(
                   `Failed to fetch shard (${shard}) from HuggingFace! We will 
continue using the old shard for now...`
               )
           }
       }
   
       moveDataIntoTable() {
           // Read Parquet buffer to Arrow Table
           this.arrowWasmTable = readParquet(this.buffer)
           // Convert to JS Arrow Table
           this.table = tableFromIPC(this.arrowWasmTable.intoIPCStream())
           this.buffer = null
       }
   
       getWeightedRandomSlice(slices) {
           // Calculate the total number of shards
           const totalShards = slices.reduce((sum, slice) => sum + 
slice.shards, 0)
   
           // Generate a random number between 0 and the total number of shards
           const randomShard = Math.floor(Math.random() * totalShards)
   
           // Find the slice that corresponds to the random shard
           let accumulatedShards = 0
           for (const slice of slices) {
               accumulatedShards += slice.shards
               if (randomShard < accumulatedShards) {
                   return slice
               }
           }
       }
   
       viewSchema() {
           console.log(table.schema.toString())
       }
   
       loadSchema(array = [{ prompt: 'INPUT: ' }, { text: 'OUTPUT: ' }]) {
           this.schema = []
           array.map((obj) => {
               Object.entries(obj).forEach(([key, value]) => {
                   const idx = this.table.schema.fields.findIndex(
                       (field) => field.name === key
                   )
                   console.assert(
                       idx !== -1,
                       `the key of ${key} does not exist in this dataset`
                   )
                   this.schema.push({ idx, value })
               })
           })
       }
   
       async fillCache() {
           while (this.cachedText.length < this.cacheSize) {
               let shouldSkip = false
               let batchIdx = randomBetween(0, this.table.batches.length - 1)
   
               const text = []
   
               let rowIdx = null
               for (const obj of this.schema) {
                   let column
                   try {
                       column = this.table.batches[batchIdx].getChildAt(obj.idx)
                   } catch (err) {
                       console.error(err)
                       await this.fetchRandomShard()
                       return await this.fillCache()
                   }
                   if (rowIdx === null) {
                       rowIdx = randomBetween(0, column.length - 1)
                   }
                   const prefix = obj.value
                   const data = column.get(rowIdx)
                   // Some 'data' values appear to be random integers, with no 
other information.
                   // We try to skip them here.
                   if (/^-?\d+$/.test(data)) {
                       console.log(
                           'FAILED TO PARSE SHARD: Received a BigInt instead of 
text.'
                       )
                       console.log(data)
                       console.log('prefix was:', prefix)
                       console.log('batchIdx was:', batchIdx)
                       console.log('rowIdx was:', rowIdx)
                       console.log('data was:', data)
                       shouldSkip = true
                       // throw 'data was invalid' // this is temporary, for 
debugging, so we don't spam the terminal
                       await this.fetchRandomShard()
                   }
                   text.push(prefix + data)
               }
               if (shouldSkip) continue
               this.cachedText += text.join(this.delimiter) + this.eosToken
           }
       }
   
       async getSample(size = 512) {
           this.batches++
           if (this.batches % this.cycleShardInterval === 0) {
               await this.fetchRandomShard()
           }
           await this.fillCache()
           const sample = this.cachedText.slice(0, size)
           this.cachedText = this.cachedText.slice(size)
           return sample
       }
   }
   
   function randomBetween(min, max) {
       return Math.floor(Math.random() * (max - min + 1) + min)
   }
   
   function randomValueFromArray(array) {
       const randomIndex = Math.floor(Math.random() * array.length)
       return array[randomIndex]
   }
   
   function generatePaddedNumbers(start, end, totalDigits) {
       const numbers = []
       for (let i = start; i <= end; i++) {
           numbers.push(String(i).padStart(totalDigits, '0'))
       }
       return numbers
   }
   
   const numIterations = 1_000_000_000_000
   async function main() {
       const sampler = new CosmopediaDataset()
       await sampler.init()
       sampler.loadSchema([{ prompt: 'INPUT: ' }, { text: 'OUTPUT: ' }])
       for (let i = 0; i < numIterations; i++) {
           if (i % 1000 === 0) console.log('iterations:', i)
           await sampler.getSample()
       }
   }
   
   main()
   ```
   
   ### Component(s)
   
   JavaScript, Parquet


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to