Updated Branches: refs/heads/1.6.0-SNAPSHOT 2faafcce4 -> 739718253
ACCUMULO-1613 added documentation for conditional writer Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/73971825 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/73971825 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/73971825 Branch: refs/heads/1.6.0-SNAPSHOT Commit: 739718253bd49dbe5c74e041b5910cfd12edad29 Parents: 2faafcc Author: Keith Turner <ktur...@apache.org> Authored: Tue Nov 19 17:15:23 2013 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Tue Nov 19 17:15:23 2013 -0500 ---------------------------------------------------------------------- .../accumulo/core/client/ConditionalWriter.java | 3 + .../apache/accumulo/core/data/Condition.java | 54 ++++ .../accumulo/core/data/ConditionalMutation.java | 2 + .../accumulo_user_manual/chapters/clients.tex | 27 +- .../examples/simple/reservations/ARS.java | 308 +++++++++++++++++++ .../resources/docs/examples/README.reservations | 66 ++++ 6 files changed, 459 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java index 4ed4d31..2c24a2e 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java @@ -23,6 +23,9 @@ import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; import org.apache.accumulo.core.data.ConditionalMutation; /** + * ConditionalWriter provides the ability to do efficient, atomic read-modify-write operations on rows. These operations are performed on the tablet server + * while a row lock is held. + * * @since 1.6.0 */ public interface ConditionalWriter { http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/core/src/main/java/org/apache/accumulo/core/data/Condition.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/data/Condition.java b/core/src/main/java/org/apache/accumulo/core/data/Condition.java index df20682..fc8f2bf 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/Condition.java +++ b/core/src/main/java/org/apache/accumulo/core/data/Condition.java @@ -26,6 +26,7 @@ import org.apache.accumulo.core.util.ArgumentChecker; import org.apache.hadoop.io.Text; /** + * Conditions that must be met on a particular column in a row. * * @since 1.6.0 */ @@ -76,6 +77,13 @@ public class Condition { return cq; } + /** + * Sets the version for the column to check. If this is not set then the latest column will be checked, unless iterators do something different. + * + * @param ts + * @return returns this + */ + public Condition setTimestamp(long ts) { this.ts = ts; return this; @@ -85,24 +93,53 @@ public class Condition { return ts; } + /** + * see {@link #setValue(byte[])} + * + * @param value + * @return returns this + */ + public Condition setValue(CharSequence value) { ArgumentChecker.notNull(value); this.val = new ArrayByteSequence(value.toString().getBytes(Constants.UTF8)); return this; } + /** + * This method sets the expected value of a column. Inorder for the condition to pass the column must exist and have this value. If a value is not set, then + * the column must be absent for the condition to pass. + * + * @param value + * @return returns this + */ + public Condition setValue(byte[] value) { ArgumentChecker.notNull(value); this.val = new ArrayByteSequence(value); return this; } + /** + * see {@link #setValue(byte[])} + * + * @param value + * @return returns this + */ + public Condition setValue(Text value) { ArgumentChecker.notNull(value); this.val = new ArrayByteSequence(value.getBytes(), 0, value.getLength()); return this; } + /** + * see {@link #setValue(byte[])} + * + * @param value + * @return returns this + */ + public Condition setValue(ByteSequence value) { ArgumentChecker.notNull(value); this.val = value; @@ -113,6 +150,13 @@ public class Condition { return val; } + /** + * Sets the visibility for the column to check. If not set it defaults to empty visibility. + * + * @param cv + * @return returns this + */ + public Condition setVisibility(ColumnVisibility cv) { ArgumentChecker.notNull(cv); this.cv = new ArrayByteSequence(cv.getExpression()); @@ -123,6 +167,16 @@ public class Condition { return cv; } + /** + * Set iterators to use when reading the columns value. These iterators will be applied in addition to the iterators configured for the table. Using iterators + * its possible to test other conditions, besides equality and absence, like less than. On the server side the iterators will be seeked using a range that + * covers only the family, qualifier, and visibility (if the timestamp is set then it will be used to narrow the range). Value equality will be tested using + * the first entry returned by the iterator stack. + * + * @param iterators + * @return returns this + */ + public Condition setIterators(IteratorSetting... iterators) { ArgumentChecker.notNull(iterators); http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java b/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java index 510396d..c438f6d 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java +++ b/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java @@ -26,6 +26,8 @@ import org.apache.accumulo.core.util.ArgumentChecker; import org.apache.hadoop.io.Text; /** + * A Mutation that contains a list of conditions that must all be met before the mutation is applied. + * * @since 1.6.0 */ public class ConditionalMutation extends Mutation { http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/docs/src/main/latex/accumulo_user_manual/chapters/clients.tex ---------------------------------------------------------------------- diff --git a/docs/src/main/latex/accumulo_user_manual/chapters/clients.tex b/docs/src/main/latex/accumulo_user_manual/chapters/clients.tex index 18fbafc..9b35d37 100644 --- a/docs/src/main/latex/accumulo_user_manual/chapters/clients.tex +++ b/docs/src/main/latex/accumulo_user_manual/chapters/clients.tex @@ -116,6 +116,31 @@ writer.close(); An example of using the batch writer can be found at\\ accumulo/docs/examples/README.batch +\subsection{ConditionalWriter} +The ConditionalWriter enables efficient, atomic read-modify-write operations on +rows. The ConditionalWriter writes special Mutations which have a list of per +column conditions that must all be met before the mutation is applied. The +conditions are checked in the tablet server while a row lock is +held\footnote{Mutations written by the BatchWriter will not obtain a row +lock.}. The conditions that can be checked for a column are equality and +absence. For example a conditional mutation can require that column A is +absent inorder to be applied. Iterators can be applied when checking +conditions. Using iterators, many other operations besides equality and +absence can be checked. For example, using an iterator that converts values +less than 5 to 0 and everything else to 1, its possible to only apply a +mutation when a column is less than 5. + +In the case when a tablet server dies after a client sent a conditional +mutation, its not known if the mutation was applied or not. When this happens +the ConditionalWriter reports a status of UNKNOWN for the ConditionalMutation. +In many cases this situation can be dealt with by simply reading the row again +and possibly sending another conditional mutation. If this is not sufficient, +then a higher level of abstraction can be built by storing transactional +information within a row. + +An example of using the batch writer can be found at\\ +accumulo/docs/examples/README.reservations + \section{Reading Data} Accumulo is optimized to quickly retrieve the value associated with a given key, and @@ -329,4 +354,4 @@ for(KeyValue keyValue : results.getResultsIterator()) { client.closeScanner(scanner); \end{verbatim} -\normalsize \ No newline at end of file +\normalsize http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java ---------------------------------------------------------------------- diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java new file mode 100644 index 0000000..0c51843 --- /dev/null +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.examples.simple.reservations; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; + +import jline.console.ConsoleReader; + +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.ConditionalWriter.Status; +import org.apache.accumulo.core.client.ConditionalWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Condition; +import org.apache.accumulo.core.data.ConditionalMutation; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; + +/** + * Accumulo Reservation System : An example reservation system using Accumulo. Supports atomic reservations of a resource at a date. Wait list are also + * supported. Inorder to keep the example simple no checking is done of the date. Also the code is inefficient, if interested in improving it take a look at the + * EXCERCISE comments. + */ + +// EXCERCISE create a test that verifies correctness under concurrency. For example, have M threads making reservations against N resources. Each thread could +// randomly reserver and cancel resources for a single user. When each thread finishes it know what the state of its single user should be. When all threads +// finish collect their expected state and verify the status of all users and resources. For extra credit run the test on a IAAS provider using 10 nodes and +// 10 threads per node. + +public class ARS { + + private Connector conn; + private String rTable; + + public enum ReservationResult { + RESERVED, WAIT_LISTED + } + + public ARS(Connector conn, String rTable) { + this.conn = conn; + this.rTable = rTable; + } + + public List<String> setCapacity(String what, String when, int count) { + // EXCERCISE implement this method which atomically sets a capacity and returns anyone who was moved to the waitlist if the capacity was decreased + + throw new UnsupportedOperationException(); + } + + public ReservationResult reserve(String what, String when, String who) throws Exception { + + String row = what + ":" + when; + + // EXCERCISE This code assumes there is no reservation and tries to create one. If a reservation exist then the update will fail. This is a good strategy + // when its expected there are usually no reservations. Could modify the code to scan first. + + // The following mutation requires that the column tx:seq does not exist and will fail if it does. + ConditionalMutation update = new ConditionalMutation(row, new Condition("tx", "seq")); + update.put("tx", "seq", "0"); + update.put("res", String.format("%04d", 0), who); + + ReservationResult result = ReservationResult.RESERVED; + + ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig()); + + try { + while (true) { + Status status = cwriter.write(update).getStatus(); + switch (status) { + case ACCEPTED: + return result; + case REJECTED: + case UNKNOWN: + // read the row and decide what to do + break; + default: + throw new RuntimeException("Unexpected status " + status); + } + + // EXCERCISE in the case of many threads trying to reserve a slot this approach of immediately retrying is inefficient. Exponential backoff is good + // general solution to solve contention problems like this. However in this particular case exponential backoff could penalize the earliest threads that + // attempted to make a reservation putting them later in the list. A more complex solution could involve having independent sub-queues within the row + // that approximately maintain arrival order and use exponential back off to fairly merge the sub-queues into the main queue. + + // its important to use an isolated scanner so that only whole mutations are seen + Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY)); + scanner.setRange(new Range(row)); + + int seq = -1; + int maxReservation = -1; + + for (Entry<Key,Value> entry : scanner) { + String cf = entry.getKey().getColumnFamilyData().toString(); + String cq = entry.getKey().getColumnQualifierData().toString(); + String val = entry.getValue().toString(); + + if (cf.equals("tx") && cq.equals("seq")) { + seq = Integer.parseInt(val); + } else if (cf.equals("res")) { + // EXCERCISE scanning the entire list to find if reserver is already in the list is inefficient. One possible way to solve this would be to sort the + // data differently in Accumulo so that finding the reserver could be done quickly. + if (val.equals(who)) + if (maxReservation == -1) + return ReservationResult.RESERVED; // already have the first reservation + else + return ReservationResult.WAIT_LISTED; // already on wait list + + // EXCERCISE the way this code finds the max reservation is very inefficient.... it would be better if it did not have to scan the entire row. + // One possibility is to just use the sequence number. Could also consider sorting the data in another way and/or using an iterator. + maxReservation = Integer.parseInt(cq); + } + } + + Condition condition = new Condition("tx", "seq"); + if (seq >= 0) + condition.setValue(seq + ""); // only expect a seq # if one was seen + + update = new ConditionalMutation(row, condition); + update.put("tx", "seq", (seq + 1) + ""); + update.put("res", String.format("%04d", maxReservation + 1), who); + + // EXCERCISE if set capacity is implemented, then result should take capacity into account + if (maxReservation == -1) + result = ReservationResult.RESERVED; // if successful, will be first reservation + else + result = ReservationResult.WAIT_LISTED; + } + } finally { + cwriter.close(); + } + + } + + public void cancel(String what, String when, String who) throws Exception { + + String row = what + ":" + when; + + // Even though this method is only deleting a column, its important to use a conditional writer. By updating the seq # when deleting a reservation it + // will cause any concurrent reservations to retry. If this delete were done using a batch writer, then a concurrent reservation could report WAIT_LISTED + // when it actually got the reservation. + + ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig()); + + try { + while (true) { + + // its important to use an isolated scanner so that only whole mutations are seen + Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY)); + scanner.setRange(new Range(row)); + + int seq = -1; + String reservation = null; + + for (Entry<Key,Value> entry : scanner) { + String cf = entry.getKey().getColumnFamilyData().toString(); + String cq = entry.getKey().getColumnQualifierData().toString(); + String val = entry.getValue().toString(); + + // EXCERCISE avoid linear scan + + if (cf.equals("tx") && cq.equals("seq")) { + seq = Integer.parseInt(val); + } else if (cf.equals("res") && val.equals(who)) { + reservation = cq; + } + } + + if (reservation != null) { + ConditionalMutation update = new ConditionalMutation(row, new Condition("tx", "seq").setValue(seq + "")); + update.putDelete("res", reservation); + update.put("tx", "seq", (seq + 1) + ""); + + Status status = cwriter.write(update).getStatus(); + switch (status) { + case ACCEPTED: + // successfully canceled reservation + return; + case REJECTED: + case UNKNOWN: + // retry + // EXCERCISE exponential backoff could be used here + break; + default: + throw new RuntimeException("Unexpected status " + status); + } + + } else { + // not reserved, nothing to do + break; + } + + } + } finally { + cwriter.close(); + } + } + + public List<String> list(String what, String when) throws Exception { + String row = what + ":" + when; + + //its important to use an isolated scanner so that only whole mutations are seen + Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY)); + scanner.setRange(new Range(row)); + scanner.fetchColumnFamily(new Text("res")); + + List<String> reservations = new ArrayList<String>(); + + for (Entry<Key,Value> entry : scanner) { + String val = entry.getValue().toString(); + reservations.add(val); + } + + return reservations; + } + + public static void main(String[] args) throws Exception { + final ConsoleReader reader = new ConsoleReader(); + ARS ars = null; + + while (true) { + String line = reader.readLine(">"); + if (line == null) + break; + + final String[] tokens = line.split("\\s+"); + + if (tokens[0].equals("reserve") && tokens.length >= 4 && ars != null) { + // start up multiple threads all trying to reserve the same resource, no more than one should succeed + + final ARS fars = ars; + ArrayList<Thread> threads = new ArrayList<Thread>(); + for (int i = 3; i < tokens.length; i++) { + final int whoIndex = i; + Runnable reservationTask = new Runnable() { + @Override + public void run() { + try { + reader.println(" " + String.format("%20s", tokens[whoIndex]) + " : " + fars.reserve(tokens[1], tokens[2], tokens[whoIndex])); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + threads.add(new Thread(reservationTask)); + } + + for (Thread thread : threads) + thread.start(); + + for (Thread thread : threads) + thread.join(); + + } else if (tokens[0].equals("cancel") && tokens.length == 4 && ars != null) { + ars.cancel(tokens[1], tokens[2], tokens[3]); + } else if (tokens[0].equals("list") && tokens.length == 3 && ars != null) { + List<String> reservations = ars.list(tokens[1], tokens[2]); + if (reservations.size() > 0) { + reader.println(" Reservation holder : " + reservations.get(0)); + if (reservations.size() > 1) + reader.println(" Wait list : " + reservations.subList(1, reservations.size())); + } + } else if (tokens[0].equals("quit") && tokens.length == 1) { + break; + } else if (tokens[0].equals("connect") && tokens.length == 6 && ars == null) { + ZooKeeperInstance zki = new ZooKeeperInstance(tokens[1], tokens[2]); + Connector conn = zki.getConnector(tokens[3], new PasswordToken(tokens[4])); + if (conn.tableOperations().exists(tokens[5])) { + ars = new ARS(conn, tokens[5]); + reader.println(" connected"); + } else + reader.println(" No Such Table"); + } else { + System.out.println(" Commands : "); + if (ars == null) { + reader.println(" connect <instance> <zookeepers> <user> <pass> <table>"); + } else { + reader.println(" reserve <what> <when> <who> {who}"); + reader.println(" cancel <what> <when> <who>"); + reader.println(" list <what> <when>"); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/server/monitor/src/main/resources/docs/examples/README.reservations ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/resources/docs/examples/README.reservations b/server/monitor/src/main/resources/docs/examples/README.reservations new file mode 100644 index 0000000..7ad9cd3 --- /dev/null +++ b/server/monitor/src/main/resources/docs/examples/README.reservations @@ -0,0 +1,66 @@ +Title: Apache Accumulo Isolation Example +Notice: Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + . + http://www.apache.org/licenses/LICENSE-2.0 + . + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +This example shows running a simple reservation system implemented using +conditional mutations. This system garuntees that only one concurrent user can +reserve a resource. The example's reserve command allows multiple users to be +specified. When this is done it creates a separate reservation thread for each +user. In the example below threads are spun up for alice, bob, eve, mallory, +and trent to reserve room06 on 20140101. Bob ends up getting the reservation +and everyone else is put on a wait list. The example code will take any string +for what, when and who. + + $ ./bin/accumulo org.apache.accumulo.examples.simple.reservations.ARS + >connect test16 localhost root secret ars + connected + > + Commands : + reserve <what> <when> <who> {who} + cancel <what> <when> <who> + list <what> <when> + >reserve room06 20140101 alice bob eve mallory trent + bob : RESERVED + mallory : WAIT_LISTED + alice : WAIT_LISTED + trent : WAIT_LISTED + eve : WAIT_LISTED + >list room06 20140101 + Reservation holder : bob + Wait list : [mallory, alice, trent, eve] + >cancel room06 20140101 alice + >cancel room06 20140101 bob + >list room06 20140101 + Reservation holder : mallory + Wait list : [trent, eve] + >quit + +Scanning the table in the Accumulo shell after running the example shows the +following: + + root@test16> table ars + root@test16 ars> scan + room06:20140101 res:0001 [] mallory + room06:20140101 res:0003 [] trent + room06:20140101 res:0004 [] eve + room06:20140101 tx:seq [] 6 + +The tx:seq column is incremented for each update to the row allowing for +detection of concurrent changes. For an update to go through the sequence +number must not have changed since the data was read. If it does change then +the conditional mutation will fail and the example code will retry. +