Author: ecn Date: Fri Oct 5 19:47:32 2012 New Revision: 1394755 URL: http://svn.apache.org/viewvc?rev=1394755&view=rev Log: ACCUMULO-786: squeeze bytes out of the serialized form of Mutation
Added: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/OldMutation.java (with props) Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/ColumnUpdate.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/Mutation.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/thrift/TMutation.java accumulo/trunk/core/src/main/thrift/data.thrift accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/ColumnUpdate.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/ColumnUpdate.java?rev=1394755&r1=1394754&r2=1394755&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/ColumnUpdate.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/ColumnUpdate.java Fri Oct 5 19:47:32 2012 @@ -31,11 +31,10 @@ public class ColumnUpdate { private long timestamp; private boolean hasTimestamp; private byte[] val; - private byte[] data; - private int tsOffset; private boolean deleted; + private Mutation parent; - public ColumnUpdate(byte[] cf, byte[] cq, byte[] cv, boolean hasts, long ts, boolean deleted, byte[] val, byte[] data, int tsOffset) { + public ColumnUpdate(byte[] cf, byte[] cq, byte[] cv, boolean hasts, long ts, boolean deleted, byte[] val, Mutation m) { this.columnFamily = cf; this.columnQualifier = cq; this.columnVisibility = cv; @@ -43,25 +42,14 @@ public class ColumnUpdate { this.timestamp = ts; this.deleted = deleted; this.val = val; - this.data = data; - this.tsOffset = tsOffset; + this.parent = m; } + // @Deprecated use org.apache.accumulo.data.Mutation#setSystemTimestamp(long); public void setSystemTimestamp(long v) { if (hasTimestamp) throw new IllegalStateException("Cannot set system timestamp when user set a timestamp"); - - int tso = this.tsOffset; - data[tso++] = (byte) (v >>> 56); - data[tso++] = (byte) (v >>> 48); - data[tso++] = (byte) (v >>> 40); - data[tso++] = (byte) (v >>> 32); - data[tso++] = (byte) (v >>> 24); - data[tso++] = (byte) (v >>> 16); - data[tso++] = (byte) (v >>> 8); - data[tso++] = (byte) (v >>> 0); - - this.timestamp = v; + parent.setSystemTimestamp(v); } public boolean hasTimestamp() { @@ -85,7 +73,9 @@ public class ColumnUpdate { } public long getTimestamp() { - return this.timestamp; + if (hasTimestamp) + return this.timestamp; + return parent.systemTime; } public boolean isDeleted() { Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/Mutation.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/Mutation.java?rev=1394755&r1=1394754&r2=1394755&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/Mutation.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/Mutation.java Fri Oct 5 19:47:32 2012 @@ -30,6 +30,7 @@ import org.apache.accumulo.core.util.Byt import org.apache.accumulo.core.util.TextUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; /** * <p> @@ -55,10 +56,12 @@ public class Mutation implements Writabl static final int VALUE_SIZE_COPY_CUTOFF = 1 << 15; - private byte[] row; - private byte[] data; - private int entries; - private List<byte[]> values; + boolean useOldDeserialize = false; + byte[] row; + byte[] data; + int entries; + List<byte[]> values; + long systemTime = 0l; // created this little class instead of using ByteArrayOutput stream and DataOutputStream // because both are synchronized... lots of small syncs slow things down @@ -100,32 +103,41 @@ public class Mutation implements Writabl data[offset++] = 0; } - void add(long v) { - reserve(8); - data[offset++] = (byte) (v >>> 56); - data[offset++] = (byte) (v >>> 48); - data[offset++] = (byte) (v >>> 40); - data[offset++] = (byte) (v >>> 32); - data[offset++] = (byte) (v >>> 24); - data[offset++] = (byte) (v >>> 16); - data[offset++] = (byte) (v >>> 8); - data[offset++] = (byte) (v >>> 0); - } - - void add(int i) { - reserve(4); - data[offset++] = (byte) (i >>> 24); - data[offset++] = (byte) (i >>> 16); - data[offset++] = (byte) (i >>> 8); - data[offset++] = (byte) (i >>> 0); - } - public byte[] toArray() { byte ret[] = new byte[offset]; System.arraycopy(data, 0, ret, 0, offset); return ret; } + public void writeVLong(long i) { + reserve(9); + if (i >= -112 && i <= 127) { + data[offset++] = (byte)i; + return; + } + + int len = -112; + if (i < 0) { + i ^= -1L; // take one's complement' + len = -120; + } + + long tmp = i; + while (tmp != 0) { + tmp = tmp >> 8; + len--; + } + + data[offset++] = (byte)len; + + len = (len < -120) ? -(len + 120) : -(len + 112); + + for (int idx = len; idx != 0; idx--) { + int shiftbits = (idx - 1) * 8; + long mask = 0xFFL << shiftbits; + data[offset++] = (byte)((i & mask) >> shiftbits); + } + } } private static class SimpleReader { @@ -135,10 +147,9 @@ public class Mutation implements Writabl SimpleReader(byte b[]) { this.data = b; } - + int readInt() { return (data[offset++] << 24) + ((data[offset++] & 255) << 16) + ((data[offset++] & 255) << 8) + ((data[offset++] & 255) << 0); - } long readLong() { @@ -155,6 +166,20 @@ public class Mutation implements Writabl return (data[offset++] == 1); } + long readVLong() { + byte firstByte = data[offset++]; + int len = WritableUtils.decodeVIntSize(firstByte); + if (len == 1) { + return firstByte; + } + long i = 0; + for (int idx = 0; idx < len-1; idx++) { + byte b = data[offset++]; + i = i << 8; + i = i | (b & 0xFF); + } + return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i); + } } private ByteBuffer buffer; @@ -202,12 +227,12 @@ public class Mutation implements Writabl } private void put(byte b[]) { - buffer.add(b.length); + buffer.writeVLong(b.length); buffer.add(b); } private void put(Text t) { - buffer.add(t.getLength()); + buffer.writeVLong(t.getLength()); buffer.add(t.getBytes(), 0, t.getLength()); } @@ -216,11 +241,11 @@ public class Mutation implements Writabl } private void put(int i) { - buffer.add(i); + buffer.writeVLong(i); } private void put(long l) { - buffer.add(l); + buffer.writeVLong(l); } private void put(Text cf, Text cq, byte[] cv, boolean hasts, long ts, boolean deleted, byte[] val) { @@ -232,7 +257,8 @@ public class Mutation implements Writabl put(cq); put(cv); put(hasts); - put(ts); + if (hasts) + put(ts); put(deleted); if (val.length < VALUE_SIZE_COPY_CUTOFF) { @@ -337,7 +363,7 @@ public class Mutation implements Writabl put(columnFamily, columnQualifier, columnVisibility.getExpression(), true, timestamp, false, value); } - private byte[] readBytes(SimpleReader in) { + private byte[] oldReadBytes(SimpleReader in) { int len = in.readInt(); if (len == 0) return EMPTY_BYTES; @@ -347,6 +373,16 @@ public class Mutation implements Writabl return bytes; } + private byte[] readBytes(SimpleReader in) { + int len = (int)in.readVLong(); + if (len == 0) + return EMPTY_BYTES; + + byte bytes[] = new byte[len]; + in.readBytes(bytes); + return bytes; + } + public List<ColumnUpdate> getUpdates() { serialize(); @@ -369,16 +405,48 @@ public class Mutation implements Writabl } private ColumnUpdate deserializeColumnUpdate(SimpleReader in) { + if (useOldDeserialize) + return oldDeserializeColumnUpdate(in); + return newDeserializeColumnUpdate(in); + } + + private ColumnUpdate oldDeserializeColumnUpdate(SimpleReader in) { + byte[] cf = oldReadBytes(in); + byte[] cq = oldReadBytes(in); + byte[] cv = oldReadBytes(in); + boolean hasts = in.readBoolean(); + long ts = in.readLong(); + if (!hasts && ts != 0) + this.systemTime = ts; + boolean deleted = in.readBoolean(); + + byte[] val; + int valLen = in.readInt(); + + if (valLen < 0) { + val = values.get((-1 * valLen) - 1); + } else if (valLen == 0) { + val = EMPTY_BYTES; + } else { + val = new byte[valLen]; + in.readBytes(val); + } + + return new ColumnUpdate(cf, cq, cv, hasts, ts, deleted, val, this); + } + + private ColumnUpdate newDeserializeColumnUpdate(SimpleReader in) { byte[] cf = readBytes(in); byte[] cq = readBytes(in); byte[] cv = readBytes(in); boolean hasts = in.readBoolean(); - int tso = in.offset; - long ts = in.readLong(); + long ts = 0; + if (hasts) + ts = in.readVLong(); boolean deleted = in.readBoolean(); byte[] val; - int valLen = in.readInt(); + int valLen = (int)in.readVLong(); if (valLen < 0) { val = values.get((-1 * valLen) - 1); @@ -389,7 +457,7 @@ public class Mutation implements Writabl in.readBytes(val); } - return new ColumnUpdate(cf, cq, cv, hasts, ts, deleted, val, data, tso); + return new ColumnUpdate(cf, cq, cv, hasts, ts, deleted, val, this); } private int cachedValLens = -1; @@ -416,7 +484,7 @@ public class Mutation implements Writabl } public long estimatedMemoryUsed() { - return numBytes() + 230; + return numBytes() + 238; } /** @@ -428,6 +496,12 @@ public class Mutation implements Writabl @Override public void readFields(DataInput in) throws IOException { + byte first = in.readByte(); + if ((first & 0x80) != 0x80) { + oldReadFields(first, in); + return; + } + // Clear out cached column updates and value lengths so // that we recalculate them based on the (potentially) new // data we are about to read in. @@ -435,7 +509,44 @@ public class Mutation implements Writabl cachedValLens = -1; buffer = null; - int len = in.readInt(); + int len = WritableUtils.readVInt(in); + row = new byte[len]; + in.readFully(row); + len = WritableUtils.readVInt(in); + data = new byte[len]; + in.readFully(data); + entries = WritableUtils.readVInt(in); + + boolean valuesPresent = (first & 0x01) == 0x01; + if (!valuesPresent) { + values = null; + } else { + values = new ArrayList<byte[]>(); + int numValues = WritableUtils.readVInt(in); + for (int i = 0; i < numValues; i++) { + len = WritableUtils.readVInt(in); + byte val[] = new byte[len]; + in.readFully(val); + values.add(val); + } + } + systemTime = WritableUtils.readVLong(in); + } + + public void oldReadFields(byte first, DataInput in) throws IOException { + // Clear out cached column updates and value lengths so + // that we recalculate them based on the (potentially) new + // data we are about to read in. + useOldDeserialize = true; + updates = null; + cachedValLens = -1; + buffer = null; + byte b = (byte)in.readByte(); + byte c = (byte)in.readByte(); + byte d = (byte)in.readByte(); + + int len = (((first & 0xff) << 24) | ((b & 0xff) << 16) | + ((c & 0xff) << 8) | (d & 0xff)); row = new byte[len]; in.readFully(row); len = in.readInt(); @@ -458,27 +569,30 @@ public class Mutation implements Writabl } } + + + @Override public void write(DataOutput out) throws IOException { serialize(); - out.writeInt(row.length); + byte hasValues = (values == null) ? 0 : (byte)1; + out.write((byte)(0x80 | hasValues)); + + WritableUtils.writeVInt(out, row.length); out.write(row); - out.writeInt(data.length); + WritableUtils.writeVInt(out, data.length); out.write(data); - out.writeInt(entries); + WritableUtils.writeVInt(out, entries); - if (values == null) - out.writeBoolean(false); - else { - out.writeBoolean(true); - out.writeInt(values.size()); + if (hasValues > 0) { + WritableUtils.writeVInt(out, values.size()); for (int i = 0; i < values.size(); i++) { byte val[] = values.get(i); - out.writeInt(val.length); + WritableUtils.writeVInt(out, val.length); out.write(val); } } - + WritableUtils.writeVLong(out, systemTime); } @Override @@ -515,7 +629,11 @@ public class Mutation implements Writabl public TMutation toThrift() { serialize(); - return new TMutation(java.nio.ByteBuffer.wrap(row), java.nio.ByteBuffer.wrap(data), ByteBufferUtil.toByteBuffers(values), entries); + return new TMutation(java.nio.ByteBuffer.wrap(row), java.nio.ByteBuffer.wrap(data), ByteBufferUtil.toByteBuffers(values), entries, systemTime); + } + + public void setSystemTimestamp(long v) { + this.systemTime = v; } } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/thrift/TMutation.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/thrift/TMutation.java?rev=1394755&r1=1394754&r2=1394755&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/thrift/TMutation.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/thrift/TMutation.java Fri Oct 5 19:47:32 2012 @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; private static final org.apache.thrift.protocol.TField DATA_FIELD_DESC = new org.apache.thrift.protocol.TField("data", org.apache.thrift.protocol.TType.STRING, (short)2); private static final org.apache.thrift.protocol.TField VALUES_FIELD_DESC = new org.apache.thrift.protocol.TField("values", org.apache.thrift.protocol.TType.LIST, (short)3); private static final org.apache.thrift.protocol.TField ENTRIES_FIELD_DESC = new org.apache.thrift.protocol.TField("entries", org.apache.thrift.protocol.TType.I32, (short)4); + private static final org.apache.thrift.protocol.TField SYSTEM_TIME_FIELD_DESC = new org.apache.thrift.protocol.TField("systemTime", org.apache.thrift.protocol.TType.I64, (short)5); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -45,13 +46,15 @@ import org.slf4j.LoggerFactory; public ByteBuffer data; // required public List<ByteBuffer> values; // required public int entries; // required + public long systemTime; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { ROW((short)1, "row"), DATA((short)2, "data"), VALUES((short)3, "values"), - ENTRIES((short)4, "entries"); + ENTRIES((short)4, "entries"), + SYSTEM_TIME((short)5, "systemTime"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -74,6 +77,8 @@ import org.slf4j.LoggerFactory; return VALUES; case 4: // ENTRIES return ENTRIES; + case 5: // SYSTEM_TIME + return SYSTEM_TIME; default: return null; } @@ -115,7 +120,8 @@ import org.slf4j.LoggerFactory; // isset id assignments private static final int __ENTRIES_ISSET_ID = 0; - private BitSet __isset_bit_vector = new BitSet(1); + private static final int __SYSTEMTIME_ISSET_ID = 1; + private BitSet __isset_bit_vector = new BitSet(2); public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -128,6 +134,8 @@ import org.slf4j.LoggerFactory; new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true)))); tmpMap.put(_Fields.ENTRIES, new org.apache.thrift.meta_data.FieldMetaData("entries", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); + tmpMap.put(_Fields.SYSTEM_TIME, new org.apache.thrift.meta_data.FieldMetaData("systemTime", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TMutation.class, metaDataMap); } @@ -139,7 +147,8 @@ import org.slf4j.LoggerFactory; ByteBuffer row, ByteBuffer data, List<ByteBuffer> values, - int entries) + int entries, + long systemTime) { this(); this.row = row; @@ -147,6 +156,8 @@ import org.slf4j.LoggerFactory; this.values = values; this.entries = entries; setEntriesIsSet(true); + this.systemTime = systemTime; + setSystemTimeIsSet(true); } /** @@ -173,6 +184,7 @@ import org.slf4j.LoggerFactory; this.values = __this__values; } this.entries = other.entries; + this.systemTime = other.systemTime; } public TMutation deepCopy() { @@ -186,6 +198,8 @@ import org.slf4j.LoggerFactory; this.values = null; setEntriesIsSet(false); this.entries = 0; + setSystemTimeIsSet(false); + this.systemTime = 0; } public byte[] getRow() { @@ -318,6 +332,29 @@ import org.slf4j.LoggerFactory; __isset_bit_vector.set(__ENTRIES_ISSET_ID, value); } + public long getSystemTime() { + return this.systemTime; + } + + public TMutation setSystemTime(long systemTime) { + this.systemTime = systemTime; + setSystemTimeIsSet(true); + return this; + } + + public void unsetSystemTime() { + __isset_bit_vector.clear(__SYSTEMTIME_ISSET_ID); + } + + /** Returns true if field systemTime is set (has been assigned a value) and false otherwise */ + public boolean isSetSystemTime() { + return __isset_bit_vector.get(__SYSTEMTIME_ISSET_ID); + } + + public void setSystemTimeIsSet(boolean value) { + __isset_bit_vector.set(__SYSTEMTIME_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case ROW: @@ -352,6 +389,14 @@ import org.slf4j.LoggerFactory; } break; + case SYSTEM_TIME: + if (value == null) { + unsetSystemTime(); + } else { + setSystemTime((Long)value); + } + break; + } } @@ -369,6 +414,9 @@ import org.slf4j.LoggerFactory; case ENTRIES: return Integer.valueOf(getEntries()); + case SYSTEM_TIME: + return Long.valueOf(getSystemTime()); + } throw new IllegalStateException(); } @@ -388,6 +436,8 @@ import org.slf4j.LoggerFactory; return isSetValues(); case ENTRIES: return isSetEntries(); + case SYSTEM_TIME: + return isSetSystemTime(); } throw new IllegalStateException(); } @@ -441,6 +491,15 @@ import org.slf4j.LoggerFactory; return false; } + boolean this_present_systemTime = true; + boolean that_present_systemTime = true; + if (this_present_systemTime || that_present_systemTime) { + if (!(this_present_systemTime && that_present_systemTime)) + return false; + if (this.systemTime != that.systemTime) + return false; + } + return true; } @@ -497,6 +556,16 @@ import org.slf4j.LoggerFactory; return lastComparison; } } + lastComparison = Boolean.valueOf(isSetSystemTime()).compareTo(typedOther.isSetSystemTime()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetSystemTime()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.systemTime, typedOther.systemTime); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -544,6 +613,10 @@ import org.slf4j.LoggerFactory; sb.append("entries:"); sb.append(this.entries); first = false; + if (!first) sb.append(", "); + sb.append("systemTime:"); + sb.append(this.systemTime); + first = false; sb.append(")"); return sb.toString(); } @@ -630,6 +703,14 @@ import org.slf4j.LoggerFactory; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 5: // SYSTEM_TIME + if (schemeField.type == org.apache.thrift.protocol.TType.I64) { + struct.systemTime = iprot.readI64(); + struct.setSystemTimeIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -670,6 +751,9 @@ import org.slf4j.LoggerFactory; oprot.writeFieldBegin(ENTRIES_FIELD_DESC); oprot.writeI32(struct.entries); oprot.writeFieldEnd(); + oprot.writeFieldBegin(SYSTEM_TIME_FIELD_DESC); + oprot.writeI64(struct.systemTime); + oprot.writeFieldEnd(); oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -700,7 +784,10 @@ import org.slf4j.LoggerFactory; if (struct.isSetEntries()) { optionals.set(3); } - oprot.writeBitSet(optionals, 4); + if (struct.isSetSystemTime()) { + optionals.set(4); + } + oprot.writeBitSet(optionals, 5); if (struct.isSetRow()) { oprot.writeBinary(struct.row); } @@ -719,12 +806,15 @@ import org.slf4j.LoggerFactory; if (struct.isSetEntries()) { oprot.writeI32(struct.entries); } + if (struct.isSetSystemTime()) { + oprot.writeI64(struct.systemTime); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, TMutation struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; - BitSet incoming = iprot.readBitSet(4); + BitSet incoming = iprot.readBitSet(5); if (incoming.get(0)) { struct.row = iprot.readBinary(); struct.setRowIsSet(true); @@ -750,6 +840,10 @@ import org.slf4j.LoggerFactory; struct.entries = iprot.readI32(); struct.setEntriesIsSet(true); } + if (incoming.get(4)) { + struct.systemTime = iprot.readI64(); + struct.setSystemTimeIsSet(true); + } } } Modified: accumulo/trunk/core/src/main/thrift/data.thrift URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/thrift/data.thrift?rev=1394755&r1=1394754&r2=1394755&view=diff ============================================================================== --- accumulo/trunk/core/src/main/thrift/data.thrift (original) +++ accumulo/trunk/core/src/main/thrift/data.thrift Fri Oct 5 19:47:32 2012 @@ -38,6 +38,7 @@ struct TMutation { 2:binary data, 3:list<binary> values 4:i32 entries + 5:i64 systemTime } struct TKeyExtent { Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java?rev=1394755&r1=1394754&r2=1394755&view=diff ============================================================================== --- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java (original) +++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java Fri Oct 5 19:47:32 2012 @@ -345,6 +345,81 @@ public class MutationTest extends TestCa assertEquals(2, m2.size()); assertEquals(m2.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1"); assertEquals(m2.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2"); + } + + Mutation convert(OldMutation old) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + old.write(dos); + dos.close(); + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + DataInputStream dis = new DataInputStream(bis); + Mutation m = new Mutation(); + m.readFields(dis); + dis.close(); + return m; + } + + + public void testNewSerialization() throws Exception { + Mutation m1 = new Mutation("row"); + ColumnVisibility vis = new ColumnVisibility("vis"); + m1.put("cf", "cq", vis, "value"); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + m1.write(dos); + dos.close(); + + // write an old mutation + OldMutation m2 = new OldMutation("r1"); + m2.put("cf1", "cq1", "v1"); + m2.put("cf2", "cq2", new ColumnVisibility("cv2"), "v2"); + bos = new ByteArrayOutputStream(); + dos = new DataOutputStream(bos); + m2.write(dos); + dos.close(); + long oldSize = dos.size(); + ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray()); + DataInputStream dis = new DataInputStream(bis); + m2.readFields(dis); + dis.close(); + + // check it + assertEquals("r1", new String(m2.getRow())); + assertEquals(2, m2.getUpdates().size()); + assertEquals(2, m2.size()); + assertEquals(m2.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1"); + assertEquals(m2.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2"); + + m1 = convert(m2); + + assertEquals("r1", new String(m1.getRow())); + assertEquals(2, m1.getUpdates().size()); + assertEquals(2, m1.size()); + assertEquals(m1.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1"); + assertEquals(m1.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2"); + + Text exampleRow = new Text(" 123456789 123456789 123456789 123456789 123456789"); + int exampleLen = exampleRow.getLength(); + m1 = new Mutation(exampleRow); + m1.put("", "", ""); + + bos = new ByteArrayOutputStream(); + dos = new DataOutputStream(bos); + m1.write(dos); + dos.close(); + long newSize = dos.size(); + assertTrue(newSize < oldSize); + System.out.println(String.format("%d %d %.2f%%", newSize - exampleLen, oldSize - exampleLen, (newSize-exampleLen) * 100. / (oldSize - exampleLen))); + byte[] ba = bos.toByteArray(); + for (int i = 0; i < bos.size(); i += 4) { + for (int j = i; j < bos.size() && j < i + 4; j++) { + System.out.append(String.format("%02x", ba[j])); + } + System.out.append(" "); + } + System.out.println(); } + } Added: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/OldMutation.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/OldMutation.java?rev=1394755&view=auto ============================================================================== --- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/OldMutation.java (added) +++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/OldMutation.java Fri Oct 5 19:47:32 2012 @@ -0,0 +1,495 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.data; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.accumulo.core.data.thrift.TMutation; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.util.ByteBufferUtil; +import org.apache.accumulo.core.util.TextUtil; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * Will read/write old mutations. + */ +public class OldMutation implements Writable { + + static final int VALUE_SIZE_COPY_CUTOFF = 1 << 15; + + private byte[] row; + private byte[] data; + private int entries; + private List<byte[]> values; + + // created this little class instead of using ByteArrayOutput stream and DataOutputStream + // because both are synchronized... lots of small syncs slow things down + private static class ByteBuffer { + + int offset; + byte data[] = new byte[64]; + + private void reserve(int l) { + if (offset + l > data.length) { + int newSize = data.length * 2; + while (newSize <= offset + l) + newSize = newSize * 2; + + byte[] newData = new byte[newSize]; + System.arraycopy(data, 0, newData, 0, offset); + data = newData; + } + + } + + void add(byte[] b) { + reserve(b.length); + System.arraycopy(b, 0, data, offset, b.length); + offset += b.length; + } + + public void add(byte[] bytes, int off, int length) { + reserve(length); + System.arraycopy(bytes, off, data, offset, length); + offset += length; + } + + void add(boolean b) { + reserve(1); + if (b) + data[offset++] = 1; + else + data[offset++] = 0; + } + + void add(long v) { + reserve(8); + data[offset++] = (byte) (v >>> 56); + data[offset++] = (byte) (v >>> 48); + data[offset++] = (byte) (v >>> 40); + data[offset++] = (byte) (v >>> 32); + data[offset++] = (byte) (v >>> 24); + data[offset++] = (byte) (v >>> 16); + data[offset++] = (byte) (v >>> 8); + data[offset++] = (byte) (v >>> 0); + } + + void add(int i) { + reserve(4); + data[offset++] = (byte) (i >>> 24); + data[offset++] = (byte) (i >>> 16); + data[offset++] = (byte) (i >>> 8); + data[offset++] = (byte) (i >>> 0); + } + + public byte[] toArray() { + byte ret[] = new byte[offset]; + System.arraycopy(data, 0, ret, 0, offset); + return ret; + } + + } + + private static class SimpleReader { + int offset; + byte data[]; + + SimpleReader(byte b[]) { + this.data = b; + } + + int readInt() { + return (data[offset++] << 24) + ((data[offset++] & 255) << 16) + ((data[offset++] & 255) << 8) + ((data[offset++] & 255) << 0); + + } + + long readLong() { + return (((long) data[offset++] << 56) + ((long) (data[offset++] & 255) << 48) + ((long) (data[offset++] & 255) << 40) + + ((long) (data[offset++] & 255) << 32) + ((long) (data[offset++] & 255) << 24) + ((data[offset++] & 255) << 16) + ((data[offset++] & 255) << 8) + ((data[offset++] & 255) << 0)); + } + + void readBytes(byte b[]) { + System.arraycopy(data, offset, b, 0, b.length); + offset += b.length; + } + + boolean readBoolean() { + return (data[offset++] == 1); + } + + } + + private ByteBuffer buffer; + + private List<ColumnUpdate> updates; + + private static final byte[] EMPTY_BYTES = new byte[0]; + + private void serialize() { + if (buffer != null) { + data = buffer.toArray(); + buffer = null; + } + } + + public OldMutation(Text row) { + this.row = new byte[row.getLength()]; + System.arraycopy(row.getBytes(), 0, this.row, 0, row.getLength()); + buffer = new ByteBuffer(); + } + + public OldMutation(CharSequence row) { + this(new Text(row.toString())); + } + + public OldMutation() {} + + public OldMutation(TMutation tmutation) { + this.row = ByteBufferUtil.toBytes(tmutation.row); + this.data = ByteBufferUtil.toBytes(tmutation.data); + this.entries = tmutation.entries; + this.values = ByteBufferUtil.toBytesList(tmutation.values); + } + + public byte[] getRow() { + return row; + } + + private void put(byte b[]) { + buffer.add(b.length); + buffer.add(b); + } + + private void put(Text t) { + buffer.add(t.getLength()); + buffer.add(t.getBytes(), 0, t.getLength()); + } + + private void put(boolean b) { + buffer.add(b); + } + + private void put(int i) { + buffer.add(i); + } + + private void put(long l) { + buffer.add(l); + } + + private void put(Text cf, Text cq, byte[] cv, boolean hasts, long ts, boolean deleted, byte[] val) { + + if (buffer == null) + throw new IllegalStateException("Can not add to mutation after serializing it"); + + put(cf); + put(cq); + put(cv); + put(hasts); + put(ts); + put(deleted); + + if (val.length < VALUE_SIZE_COPY_CUTOFF) { + put(val); + } else { + if (values == null) + values = new ArrayList<byte[]>(); + byte copy[] = new byte[val.length]; + System.arraycopy(val, 0, copy, 0, val.length); + values.add(copy); + put(-1 * values.size()); + } + + entries++; + } + + private void put(CharSequence cf, CharSequence cq, byte[] cv, boolean hasts, long ts, boolean deleted, byte[] val) { + put(new Text(cf.toString()), new Text(cq.toString()), cv, hasts, ts, deleted, val); + } + + private void put(CharSequence cf, CharSequence cq, byte[] cv, boolean hasts, long ts, boolean deleted, CharSequence val) { + put(cf, cq, cv, hasts, ts, deleted, TextUtil.getBytes(new Text(val.toString()))); + } + + public void put(Text columnFamily, Text columnQualifier, Value value) { + put(columnFamily, columnQualifier, EMPTY_BYTES, false, 0l, false, value.get()); + } + + public void put(Text columnFamily, Text columnQualifier, ColumnVisibility columnVisibility, Value value) { + put(columnFamily, columnQualifier, columnVisibility.getExpression(), false, 0l, false, value.get()); + } + + public void put(Text columnFamily, Text columnQualifier, long timestamp, Value value) { + put(columnFamily, columnQualifier, EMPTY_BYTES, true, timestamp, false, value.get()); + } + + public void put(Text columnFamily, Text columnQualifier, ColumnVisibility columnVisibility, long timestamp, Value value) { + put(columnFamily, columnQualifier, columnVisibility.getExpression(), true, timestamp, false, value.get()); + } + + public void putDelete(Text columnFamily, Text columnQualifier) { + put(columnFamily, columnQualifier, EMPTY_BYTES, false, 0l, true, EMPTY_BYTES); + } + + public void putDelete(Text columnFamily, Text columnQualifier, ColumnVisibility columnVisibility) { + put(columnFamily, columnQualifier, columnVisibility.getExpression(), false, 0l, true, EMPTY_BYTES); + } + + public void putDelete(Text columnFamily, Text columnQualifier, long timestamp) { + put(columnFamily, columnQualifier, EMPTY_BYTES, true, timestamp, true, EMPTY_BYTES); + } + + public void putDelete(Text columnFamily, Text columnQualifier, ColumnVisibility columnVisibility, long timestamp) { + put(columnFamily, columnQualifier, columnVisibility.getExpression(), true, timestamp, true, EMPTY_BYTES); + } + + public void put(CharSequence columnFamily, CharSequence columnQualifier, Value value) { + put(columnFamily, columnQualifier, EMPTY_BYTES, false, 0l, false, value.get()); + } + + public void put(CharSequence columnFamily, CharSequence columnQualifier, ColumnVisibility columnVisibility, Value value) { + put(columnFamily, columnQualifier, columnVisibility.getExpression(), false, 0l, false, value.get()); + } + + public void put(CharSequence columnFamily, CharSequence columnQualifier, long timestamp, Value value) { + put(columnFamily, columnQualifier, EMPTY_BYTES, true, timestamp, false, value.get()); + } + + public void put(CharSequence columnFamily, CharSequence columnQualifier, ColumnVisibility columnVisibility, long timestamp, Value value) { + put(columnFamily, columnQualifier, columnVisibility.getExpression(), true, timestamp, false, value.get()); + } + + public void putDelete(CharSequence columnFamily, CharSequence columnQualifier) { + put(columnFamily, columnQualifier, EMPTY_BYTES, false, 0l, true, EMPTY_BYTES); + } + + public void putDelete(CharSequence columnFamily, CharSequence columnQualifier, ColumnVisibility columnVisibility) { + put(columnFamily, columnQualifier, columnVisibility.getExpression(), false, 0l, true, EMPTY_BYTES); + } + + public void putDelete(CharSequence columnFamily, CharSequence columnQualifier, long timestamp) { + put(columnFamily, columnQualifier, EMPTY_BYTES, true, timestamp, true, EMPTY_BYTES); + } + + public void putDelete(CharSequence columnFamily, CharSequence columnQualifier, ColumnVisibility columnVisibility, long timestamp) { + put(columnFamily, columnQualifier, columnVisibility.getExpression(), true, timestamp, true, EMPTY_BYTES); + } + + public void put(CharSequence columnFamily, CharSequence columnQualifier, CharSequence value) { + put(columnFamily, columnQualifier, EMPTY_BYTES, false, 0l, false, value); + } + + public void put(CharSequence columnFamily, CharSequence columnQualifier, ColumnVisibility columnVisibility, CharSequence value) { + put(columnFamily, columnQualifier, columnVisibility.getExpression(), false, 0l, false, value); + } + + public void put(CharSequence columnFamily, CharSequence columnQualifier, long timestamp, CharSequence value) { + put(columnFamily, columnQualifier, EMPTY_BYTES, true, timestamp, false, value); + } + + public void put(CharSequence columnFamily, CharSequence columnQualifier, ColumnVisibility columnVisibility, long timestamp, CharSequence value) { + put(columnFamily, columnQualifier, columnVisibility.getExpression(), true, timestamp, false, value); + } + + private byte[] readBytes(SimpleReader in) { + int len = in.readInt(); + if (len == 0) + return EMPTY_BYTES; + + byte bytes[] = new byte[len]; + in.readBytes(bytes); + return bytes; + } + + public List<ColumnUpdate> getUpdates() { + serialize(); + + SimpleReader in = new SimpleReader(data); + + if (updates == null) { + if (entries == 1) { + updates = Collections.singletonList(deserializeColumnUpdate(in)); + } else { + ColumnUpdate[] tmpUpdates = new ColumnUpdate[entries]; + + for (int i = 0; i < entries; i++) + tmpUpdates[i] = deserializeColumnUpdate(in); + + updates = Arrays.asList(tmpUpdates); + } + } + + return updates; + } + + private ColumnUpdate deserializeColumnUpdate(SimpleReader in) { + byte[] cf = readBytes(in); + byte[] cq = readBytes(in); + byte[] cv = readBytes(in); + boolean hasts = in.readBoolean(); + long ts = in.readLong(); + boolean deleted = in.readBoolean(); + + byte[] val; + int valLen = in.readInt(); + + if (valLen < 0) { + val = values.get((-1 * valLen) - 1); + } else if (valLen == 0) { + val = EMPTY_BYTES; + } else { + val = new byte[valLen]; + in.readBytes(val); + } + + return new ColumnUpdate(cf, cq, cv, hasts, ts, deleted, val, null); + } + + private int cachedValLens = -1; + + long getValueLengths() { + if (values == null) + return 0; + + if (cachedValLens == -1) { + int tmpCVL = 0; + for (byte[] val : values) + tmpCVL += val.length; + + cachedValLens = tmpCVL; + } + + return cachedValLens; + + } + + public long numBytes() { + serialize(); + return row.length + data.length + getValueLengths(); + } + + public long estimatedMemoryUsed() { + return numBytes() + 230; + } + + /** + * @return the number of column value pairs added to the mutation + */ + public int size() { + return entries; + } + + @Override + public void readFields(DataInput in) throws IOException { + // Clear out cached column updates and value lengths so + // that we recalculate them based on the (potentially) new + // data we are about to read in. + updates = null; + cachedValLens = -1; + buffer = null; + + int len = in.readInt(); + row = new byte[len]; + in.readFully(row); + len = in.readInt(); + data = new byte[len]; + in.readFully(data); + entries = in.readInt(); + + boolean valuesPresent = in.readBoolean(); + if (!valuesPresent) { + values = null; + } else { + values = new ArrayList<byte[]>(); + int numValues = in.readInt(); + for (int i = 0; i < numValues; i++) { + len = in.readInt(); + byte val[] = new byte[len]; + in.readFully(val); + values.add(val); + } + } + } + + @Override + public void write(DataOutput out) throws IOException { + serialize(); + out.writeInt(row.length); + out.write(row); + out.writeInt(data.length); + out.write(data); + out.writeInt(entries); + + if (values == null) + out.writeBoolean(false); + else { + out.writeBoolean(true); + out.writeInt(values.size()); + for (int i = 0; i < values.size(); i++) { + byte val[] = values.get(i); + out.writeInt(val.length); + out.write(val); + } + } + + } + + @Override + public boolean equals(Object o) { + if (o instanceof Mutation) + return equals((Mutation) o); + return false; + } + + @Override + public int hashCode() { + return toThrift().hashCode(); + } + + public boolean equals(Mutation m) { + serialize(); + if (Arrays.equals(row, m.row) && entries == m.entries && Arrays.equals(data, m.data)) { + if (values == null && m.values == null) + return true; + + if (values != null && m.values != null && values.size() == m.values.size()) { + for (int i = 0; i < values.size(); i++) { + if (!Arrays.equals(values.get(i), m.values.get(i))) + return false; + } + + return true; + } + + } + + return false; + } + + public TMutation toThrift() { + serialize(); + return new TMutation(java.nio.ByteBuffer.wrap(row), java.nio.ByteBuffer.wrap(data), ByteBufferUtil.toByteBuffers(values), entries, 0); + } + +} Propchange: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/OldMutation.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java?rev=1394755&r1=1394754&r2=1394755&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java Fri Oct 5 19:47:32 2012 @@ -19,12 +19,10 @@ */ package org.apache.accumulo.server.tabletserver; -import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.client.admin.TimeType; -import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.server.util.time.RelativeTime; @@ -57,13 +55,7 @@ public abstract class TabletTime { abstract long getAndUpdateTime(); protected void setSystemTimes(Mutation mutation, long lastCommitTime) { - Collection<ColumnUpdate> updates = mutation.getUpdates(); - for (ColumnUpdate cvp : updates) { - if (!cvp.hasTimestamp()) { - cvp.setSystemTimestamp(lastCommitTime); - - } - } + mutation.setSystemTimestamp(lastCommitTime); } static TabletTime getInstance(String metadataValue) {