Repository: zeppelin Updated Branches: refs/heads/branch-0.7 fcca919a1 -> 0e3c9b10b
[ZEPPELIN-1821] Add HTTP client to elasticsearch interpreter ### What is this PR for? Add HTTP client to elasticsearch interpreter. ### What type of PR is it? Feature ### Todos * [X] - Source code * [X] - Tests * [X] - License * [X] - Docs ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-1821 ### How should this be tested? * Start an Elasticsearch node * Configure the elasticsearch interpreter to use http * Create queries in a note using elasticsearch ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? Yes * Is there breaking changes for older versions? No * Does this needs documentation? Yes Author: Bruno Bonnin <bbon...@gmail.com> Author: Bruno Bonnin <bruno.bon...@myscript.com> Closes #1902 from bbonnin/master and squashes the following commits: f5a539e [Bruno Bonnin] Remove commented code lines 86153a8 [Bruno Bonnin] Merge remote-tracking branch 'upstream/master' 2e1bbbd [Bruno Bonnin] Merge remote-tracking branch 'upstream/master' 19e888e [Bruno Bonnin] Remove bad code in test 523d155 [Bruno Bonnin] Replace Java 8 methods 6bcf369 [Bruno Bonnin] Fix issue with id containing special chars (/, #) 4e9812e [Bruno Bonnin] Merge elasticsearch/pom.xml 5a96ae0 [Bruno Bonnin] Merge branch 'master' into master e2365fb [Bruno Bonnin] Update elasticsearch/pom.xml 28b9805 [Bruno Bonnin] Update img 549db39 [Bruno Bonnin] Add HTTP client to elasticsearch interpreter f4c5ac3 [Bruno Bonnin] HTTP-based Elasticsearch client (cherry picked from commit e763b3bf3e8a26a2e2134bc615aac1bff59cd82d) Signed-off-by: Mina Lee <mina...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/0e3c9b10 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/0e3c9b10 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/0e3c9b10 Branch: refs/heads/branch-0.7 Commit: 0e3c9b10b5ea848544b3605a4203abe4d82e903a Parents: fcca919 Author: Bruno Bonnin <bbon...@gmail.com> Authored: Sat Jan 28 10:16:43 2017 +0100 Committer: Mina Lee <mina...@apache.org> Committed: Tue Jan 31 14:54:14 2017 +0900 ---------------------------------------------------------------------- .../img/docs-img/elasticsearch-config.png | Bin 150536 -> 55656 bytes docs/interpreter/elasticsearch.md | 17 +- elasticsearch/pom.xml | 15 +- .../elasticsearch/ElasticsearchInterpreter.java | 370 ++++++++++-------- .../elasticsearch/action/ActionException.java | 32 ++ .../elasticsearch/action/ActionResponse.java | 78 ++++ .../elasticsearch/action/AggWrapper.java | 43 +++ .../elasticsearch/action/HitWrapper.java | 67 ++++ .../client/ElasticsearchClient.java | 36 ++ .../elasticsearch/client/HttpBasedClient.java | 372 +++++++++++++++++++ .../client/TransportBasedClient.java | 235 ++++++++++++ .../src/main/resources/interpreter-setting.json | 18 + .../ElasticsearchInterpreterTest.java | 183 ++++++--- zeppelin-distribution/src/bin_license/LICENSE | 2 + 14 files changed, 1251 insertions(+), 217 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0e3c9b10/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-config.png ---------------------------------------------------------------------- diff --git a/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-config.png b/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-config.png index b5f7dda..54a634a 100644 Binary files a/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-config.png and b/docs/assets/themes/zeppelin/img/docs-img/elasticsearch-config.png differ http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0e3c9b10/docs/interpreter/elasticsearch.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/elasticsearch.md b/docs/interpreter/elasticsearch.md index 7f3fb27..165116b 100644 --- a/docs/interpreter/elasticsearch.md +++ b/docs/interpreter/elasticsearch.md @@ -46,7 +46,22 @@ limitations under the License. <tr> <td>elasticsearch.port</td> <td>9300</td> - <td>Connection port <b>( Important: this is not the HTTP port, but the transport port )</b></td> + <td>Connection port <b>( Important: it depends on the client type, transport or http)</b></td> + </tr> + <tr> + <td>elasticsearch.client.type</td> + <td>transport</td> + <td>The type of client for Elasticsearch (transport or http)<b>( Important: the port depends on this value)</b></td> + </tr> + <tr> + <td>elasticsearch.basicauth.username</td> + <td></td> + <td>Username for a basic authentication (http)</b></td> + </tr> + <tr> + <td>elasticsearch.basicauth.password</td> + <td></td> + <td>Password for a basic authentication (http)</b></td> </tr> <tr> <td>elasticsearch.result.size</td> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0e3c9b10/elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml index eb06a84..2073660 100644 --- a/elasticsearch/pom.xml +++ b/elasticsearch/pom.xml @@ -26,7 +26,6 @@ <relativePath>..</relativePath> </parent> - <groupId>org.apache.zeppelin</groupId> <artifactId>zeppelin-elasticsearch</artifactId> <packaging>jar</packaging> <version>0.7.1-SNAPSHOT</version> @@ -34,8 +33,10 @@ <properties> <elasticsearch.version>2.4.3</elasticsearch.version> + <httpasyncclient.version>4.0.2</httpasyncclient.version> <guava.version>18.0</guava.version> <json-flattener.version>0.1.6</json-flattener.version> + <unirest.version>1.4.9</unirest.version> </properties> <dependencies> @@ -51,6 +52,12 @@ <artifactId>elasticsearch</artifactId> <version>${elasticsearch.version}</version> </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpasyncclient</artifactId> + <version>${httpasyncclient.version}</version> + </dependency> <dependency> <groupId>com.google.guava</groupId> @@ -65,6 +72,12 @@ </dependency> <dependency> + <groupId>com.mashape.unirest</groupId> + <artifactId>unirest-java</artifactId> + <version>${unirest.version}</version> + </dependency> + + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0e3c9b10/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java index 549b5f2..e3918e4 100644 --- a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java +++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreter.java @@ -18,7 +18,6 @@ package org.apache.zeppelin.elasticsearch; import java.io.IOException; -import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -33,27 +32,20 @@ import java.util.TreeSet; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.elasticsearch.action.ActionResponse; +import org.apache.zeppelin.elasticsearch.action.AggWrapper; +import org.apache.zeppelin.elasticsearch.action.HitWrapper; +import org.apache.zeppelin.elasticsearch.client.ElasticsearchClient; +import org.apache.zeppelin.elasticsearch.client.HttpBasedClient; +import org.apache.zeppelin.elasticsearch.client.TransportBasedClient; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.elasticsearch.action.delete.DeleteResponse; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchAction; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; @@ -66,7 +58,7 @@ import org.slf4j.LoggerFactory; import com.github.wnameless.json.flattener.JsonFlattener; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import com.google.gson.JsonParseException; +import com.google.gson.JsonObject; /** @@ -77,75 +69,82 @@ public class ElasticsearchInterpreter extends Interpreter { private static Logger logger = LoggerFactory.getLogger(ElasticsearchInterpreter.class); private static final String HELP = "Elasticsearch interpreter:\n" - + "General format: <command> /<indices>/<types>/<id> <option> <JSON>\n" - + " - indices: list of indices separated by commas (depends on the command)\n" - + " - types: list of document types separated by commas (depends on the command)\n" - + "Commands:\n" - + " - search /indices/types <query>\n" - + " . indices and types can be omitted (at least, you have to provide '/')\n" - + " . a query is either a JSON-formatted query, nor a lucene query\n" - + " - size <value>\n" - + " . defines the size of the result set (default value is in the config)\n" - + " . if used, this command must be declared before a search command\n" - + " - count /indices/types <query>\n" - + " . same comments as for the search\n" - + " - get /index/type/id\n" - + " - delete /index/type/id\n" - + " - index /ndex/type/id <json-formatted document>\n" - + " . the id can be omitted, elasticsearch will generate one"; + + "General format: <command> /<indices>/<types>/<id> <option> <JSON>\n" + + " - indices: list of indices separated by commas (depends on the command)\n" + + " - types: list of document types separated by commas (depends on the command)\n" + + "Commands:\n" + + " - search /indices/types <query>\n" + + " . indices and types can be omitted (at least, you have to provide '/')\n" + + " . a query is either a JSON-formatted query, nor a lucene query\n" + + " - size <value>\n" + + " . defines the size of the result set (default value is in the config)\n" + + " . if used, this command must be declared before a search command\n" + + " - count /indices/types <query>\n" + + " . same comments as for the search\n" + + " - get /index/type/id\n" + + " - delete /index/type/id\n" + + " - index /ndex/type/id <json-formatted document>\n" + + " . the id can be omitted, elasticsearch will generate one"; protected static final List<String> COMMANDS = Arrays.asList( - "count", "delete", "get", "help", "index", "search"); + "count", "delete", "get", "help", "index", "search"); private static final Pattern FIELD_NAME_PATTERN = Pattern.compile("\\[\\\\\"(.+)\\\\\"\\](.*)"); public static final String ELASTICSEARCH_HOST = "elasticsearch.host"; public static final String ELASTICSEARCH_PORT = "elasticsearch.port"; + public static final String ELASTICSEARCH_CLIENT_TYPE = "elasticsearch.client.type"; public static final String ELASTICSEARCH_CLUSTER_NAME = "elasticsearch.cluster.name"; public static final String ELASTICSEARCH_RESULT_SIZE = "elasticsearch.result.size"; + public static final String ELASTICSEARCH_BASIC_AUTH_USERNAME = "elasticsearch.basicauth.username"; + public static final String ELASTICSEARCH_BASIC_AUTH_PASSWORD = "elasticsearch.basicauth.password"; private final Gson gson = new GsonBuilder().setPrettyPrinting().create(); - private Client client; - private String host = "localhost"; - private int port = 9300; - private String clusterName = "elasticsearch"; + private ElasticsearchClient elsClient; private int resultSize = 10; public ElasticsearchInterpreter(Properties property) { super(property); - this.host = getProperty(ELASTICSEARCH_HOST); - this.port = Integer.parseInt(getProperty(ELASTICSEARCH_PORT)); - this.clusterName = getProperty(ELASTICSEARCH_CLUSTER_NAME); + + } + + @Override + public void open() { + logger.info("Properties: {}", getProperty()); + + String clientType = getProperty(ELASTICSEARCH_CLIENT_TYPE); + clientType = clientType == null ? null : clientType.toLowerCase(); + try { this.resultSize = Integer.parseInt(getProperty(ELASTICSEARCH_RESULT_SIZE)); - } catch (NumberFormatException e) { + } + catch (final NumberFormatException e) { this.resultSize = 10; logger.error("Unable to parse " + ELASTICSEARCH_RESULT_SIZE + " : " + - property.get(ELASTICSEARCH_RESULT_SIZE), e); + property.get(ELASTICSEARCH_RESULT_SIZE), e); } - } - @Override - public void open() { try { - logger.info("prop={}", getProperty()); - final Settings settings = Settings.settingsBuilder() - .put("cluster.name", clusterName) - .put(getProperty()) - .build(); - client = TransportClient.builder().settings(settings).build() - .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port)); - } - catch (IOException e) { + if (StringUtils.isEmpty(clientType) || "transport".equals(clientType)) { + elsClient = new TransportBasedClient(getProperty()); + } + else if ("http".equals(clientType)) { + elsClient = new HttpBasedClient(getProperty()); + } + else { + logger.error("Unknown type of Elasticsearch client: " + clientType); + } + } + catch (final IOException e) { logger.error("Open connection with Elasticsearch", e); } } @Override public void close() { - if (client != null) { - client.close(); + if (elsClient != null) { + elsClient.close(); } } @@ -159,7 +158,7 @@ public class ElasticsearchInterpreter extends Interpreter { int currentResultSize = resultSize; - if (client == null) { + if (elsClient == null) { return new InterpreterResult(InterpreterResult.Code.ERROR, "Problem with the Elasticsearch client, please check your configuration (host, port,...)"); } @@ -178,7 +177,7 @@ public class ElasticsearchInterpreter extends Interpreter { if (lines.length < 2) { return processHelp(InterpreterResult.Code.ERROR, - "Size cmd must be followed by a search"); + "Size cmd must be followed by a search"); } final String[] sizeLine = StringUtils.split(lines[0], " ", 2); @@ -202,13 +201,13 @@ public class ElasticsearchInterpreter extends Interpreter { try { if ("get".equalsIgnoreCase(method)) { - return processGet(urlItems); + return processGet(urlItems, interpreterContext); } else if ("count".equalsIgnoreCase(method)) { - return processCount(urlItems, data); + return processCount(urlItems, data, interpreterContext); } else if ("search".equalsIgnoreCase(method)) { - return processSearch(urlItems, data, currentResultSize); + return processSearch(urlItems, data, currentResultSize, interpreterContext); } else if ("index".equalsIgnoreCase(method)) { return processIndex(urlItems, data); @@ -219,7 +218,7 @@ public class ElasticsearchInterpreter extends Interpreter { return processHelp(InterpreterResult.Code.ERROR, "Unknown command"); } - catch (Exception e) { + catch (final Exception e) { return new InterpreterResult(InterpreterResult.Code.ERROR, "Error : " + e.getMessage()); } } @@ -243,7 +242,7 @@ public class ElasticsearchInterpreter extends Interpreter { public List<InterpreterCompletion> completion(String s, int i) { final List suggestions = new ArrayList<>(); - for (String cmd : COMMANDS) { + for (final String cmd : COMMANDS) { if (cmd.toLowerCase().contains(s)) { suggestions.add(new InterpreterCompletion(cmd, cmd)); } @@ -251,6 +250,31 @@ public class ElasticsearchInterpreter extends Interpreter { return suggestions; } + private void addAngularObject(InterpreterContext interpreterContext, String prefix, Object obj) { + interpreterContext.getAngularObjectRegistry().add( + prefix + "_" + interpreterContext.getParagraphId().replace("-", "_"), + obj, null, null); + } + + private String[] getIndexTypeId(String[] urlItems) { + + if (urlItems.length < 3) { + return null; + } + + final String index = urlItems[0]; + final String type = urlItems[1]; + final String id = StringUtils.join(Arrays.copyOfRange(urlItems, 2, urlItems.length), '/'); + + if (StringUtils.isEmpty(index) + || StringUtils.isEmpty(type) + || StringUtils.isEmpty(id)) { + return null; + } + + return new String[] { index, type, id }; + } + private InterpreterResult processHelp(InterpreterResult.Code code, String additionalMessage) { final StringBuffer buffer = new StringBuffer(); @@ -267,28 +291,30 @@ public class ElasticsearchInterpreter extends Interpreter { * Processes a "get" request. * * @param urlItems Items of the URL + * @param interpreterContext Instance of the context * @return Result of the get request, it contains a JSON-formatted string */ - private InterpreterResult processGet(String[] urlItems) { + private InterpreterResult processGet(String[] urlItems, InterpreterContext interpreterContext) { + + final String[] indexTypeId = getIndexTypeId(urlItems); - if (urlItems.length != 3 - || StringUtils.isEmpty(urlItems[0]) - || StringUtils.isEmpty(urlItems[1]) - || StringUtils.isEmpty(urlItems[2])) { + if (indexTypeId == null) { return new InterpreterResult(InterpreterResult.Code.ERROR, - "Bad URL (it should be /index/type/id)"); + "Bad URL (it should be /index/type/id)"); } - final GetResponse response = client - .prepareGet(urlItems[0], urlItems[1], urlItems[2]) - .get(); - if (response.isExists()) { - final String json = gson.toJson(response.getSource()); + final ActionResponse response = elsClient.get(indexTypeId[0], indexTypeId[1], indexTypeId[2]); + + if (response.isSucceeded()) { + final JsonObject json = response.getHit().getSourceAsJsonObject(); + final String jsonStr = gson.toJson(json); + + addAngularObject(interpreterContext, "get", json); return new InterpreterResult( - InterpreterResult.Code.SUCCESS, - InterpreterResult.Type.TEXT, - json); + InterpreterResult.Code.SUCCESS, + InterpreterResult.Type.TEXT, + jsonStr); } return new InterpreterResult(InterpreterResult.Code.ERROR, "Document not found"); @@ -299,21 +325,25 @@ public class ElasticsearchInterpreter extends Interpreter { * * @param urlItems Items of the URL * @param data May contains the JSON of the request + * @param interpreterContext Instance of the context * @return Result of the count request, it contains the total hits */ - private InterpreterResult processCount(String[] urlItems, String data) { + private InterpreterResult processCount(String[] urlItems, String data, + InterpreterContext interpreterContext) { if (urlItems.length > 2) { return new InterpreterResult(InterpreterResult.Code.ERROR, - "Bad URL (it should be /index1,index2,.../type1,type2,...)"); + "Bad URL (it should be /index1,index2,.../type1,type2,...)"); } - final SearchResponse response = searchData(urlItems, data, 0); + final ActionResponse response = searchData(urlItems, data, 0); + + addAngularObject(interpreterContext, "count", response.getTotalHits()); return new InterpreterResult( - InterpreterResult.Code.SUCCESS, - InterpreterResult.Type.TEXT, - "" + response.getHits().getTotalHits()); + InterpreterResult.Code.SUCCESS, + InterpreterResult.Type.TEXT, + "" + response.getTotalHits()); } /** @@ -322,16 +352,22 @@ public class ElasticsearchInterpreter extends Interpreter { * @param urlItems Items of the URL * @param data May contains the JSON of the request * @param size Limit of result set + * @param interpreterContext Instance of the context * @return Result of the search request, it contains a tab-formatted string of the matching hits */ - private InterpreterResult processSearch(String[] urlItems, String data, int size) { + private InterpreterResult processSearch(String[] urlItems, String data, int size, + InterpreterContext interpreterContext) { if (urlItems.length > 2) { return new InterpreterResult(InterpreterResult.Code.ERROR, - "Bad URL (it should be /index1,index2,.../type1,type2,...)"); + "Bad URL (it should be /index1,index2,.../type1,type2,...)"); } - final SearchResponse response = searchData(urlItems, data, size); + final ActionResponse response = searchData(urlItems, data, size); + + addAngularObject(interpreterContext, "search", + (response.getAggregations() != null && response.getAggregations().size() > 0) ? + response.getAggregations() : response.getHits()); return buildResponseMessage(response); } @@ -347,18 +383,16 @@ public class ElasticsearchInterpreter extends Interpreter { if (urlItems.length < 2 || urlItems.length > 3) { return new InterpreterResult(InterpreterResult.Code.ERROR, - "Bad URL (it should be /index/type or /index/type/id)"); + "Bad URL (it should be /index/type or /index/type/id)"); } - final IndexResponse response = client - .prepareIndex(urlItems[0], urlItems[1], urlItems.length == 2 ? null : urlItems[2]) - .setSource(data) - .get(); + final ActionResponse response = elsClient.index( + urlItems[0], urlItems[1], urlItems.length == 2 ? null : urlItems[2], data); return new InterpreterResult( - InterpreterResult.Code.SUCCESS, - InterpreterResult.Type.TEXT, - response.getId()); + InterpreterResult.Code.SUCCESS, + InterpreterResult.Type.TEXT, + response.getHit().getId()); } /** @@ -369,59 +403,39 @@ public class ElasticsearchInterpreter extends Interpreter { */ private InterpreterResult processDelete(String[] urlItems) { - if (urlItems.length != 3 - || StringUtils.isEmpty(urlItems[0]) - || StringUtils.isEmpty(urlItems[1]) - || StringUtils.isEmpty(urlItems[2])) { + final String[] indexTypeId = getIndexTypeId(urlItems); + + if (indexTypeId == null) { return new InterpreterResult(InterpreterResult.Code.ERROR, - "Bad URL (it should be /index/type/id)"); + "Bad URL (it should be /index/type/id)"); } - final DeleteResponse response = client - .prepareDelete(urlItems[0], urlItems[1], urlItems[2]) - .get(); + final ActionResponse response = + elsClient.delete(indexTypeId[0], indexTypeId[1], indexTypeId[2]); - if (response.isFound()) { + if (response.isSucceeded()) { return new InterpreterResult( - InterpreterResult.Code.SUCCESS, - InterpreterResult.Type.TEXT, - response.getId()); + InterpreterResult.Code.SUCCESS, + InterpreterResult.Type.TEXT, + response.getHit().getId()); } return new InterpreterResult(InterpreterResult.Code.ERROR, "Document not found"); } - private SearchResponse searchData(String[] urlItems, String query, int size) { + private ActionResponse searchData(String[] urlItems, String query, int size) { - final SearchRequestBuilder reqBuilder = new SearchRequestBuilder( - client, SearchAction.INSTANCE); - reqBuilder.setIndices(); + String[] indices = null; + String[] types = null; if (urlItems.length >= 1) { - reqBuilder.setIndices(StringUtils.split(urlItems[0], ",")); + indices = StringUtils.split(urlItems[0], ","); } if (urlItems.length > 1) { - reqBuilder.setTypes(StringUtils.split(urlItems[1], ",")); + types = StringUtils.split(urlItems[1], ","); } - if (!StringUtils.isEmpty(query)) { - // The query can be either JSON-formatted, nor a Lucene query - // So, try to parse as a JSON => if there is an error, consider the query a Lucene one - try { - final Map source = gson.fromJson(query, Map.class); - reqBuilder.setExtraSource(source); - } - catch (JsonParseException e) { - // This is not a JSON (or maybe not well formatted...) - reqBuilder.setQuery(QueryBuilders.queryStringQuery(query).analyzeWildcard(true)); - } - } - - reqBuilder.setSize(size); - - final SearchResponse response = reqBuilder.get(); - - return response; + return elsClient.search(indices, types, query, size); } private InterpreterResult buildAggResponseMessage(Aggregations aggregations) { @@ -442,8 +456,8 @@ public class ElasticsearchInterpreter extends Interpreter { final Set<String> headerKeys = new HashSet<>(); final List<Map<String, Object>> buckets = new LinkedList<>(); final InternalMultiBucketAggregation multiBucketAgg = (InternalMultiBucketAggregation) agg; - - for (MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) { + + for (final MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) { try { final XContentBuilder builder = XContentFactory.jsonBuilder(); bucket.toXContent(builder, null); @@ -451,22 +465,22 @@ public class ElasticsearchInterpreter extends Interpreter { headerKeys.addAll(bucketMap.keySet()); buckets.add(bucketMap); } - catch (IOException e) { + catch (final IOException e) { logger.error("Processing bucket: " + e.getMessage(), e); } } - + final StringBuffer buffer = new StringBuffer(); final String[] keys = headerKeys.toArray(new String[0]); - for (String key: keys) { + for (final String key: keys) { buffer.append("\t" + key); } buffer.deleteCharAt(0); - - for (Map<String, Object> bucket : buckets) { + + for (final Map<String, Object> bucket : buckets) { buffer.append("\n"); - - for (String key: keys) { + + for (final String key: keys) { buffer.append(bucket.get(key)).append("\t"); } buffer.deleteCharAt(buffer.length() - 1); @@ -479,38 +493,64 @@ public class ElasticsearchInterpreter extends Interpreter { return new InterpreterResult(InterpreterResult.Code.SUCCESS, resType, resMsg); } - private String buildSearchHitsResponseMessage(SearchHit[] hits) { + private InterpreterResult buildAggResponseMessage(List<AggWrapper> aggregations) { + + final InterpreterResult.Type resType = InterpreterResult.Type.TABLE; + String resMsg = ""; + + final Set<String> headerKeys = new HashSet<>(); + final List<Map<String, Object>> buckets = new LinkedList<>(); + + for (final AggWrapper aggregation: aggregations) { + final Map<String, Object> bucketMap = JsonFlattener.flattenAsMap(aggregation.getResult()); + headerKeys.addAll(bucketMap.keySet()); + buckets.add(bucketMap); + } + + final StringBuffer buffer = new StringBuffer(); + final String[] keys = headerKeys.toArray(new String[0]); + for (final String key: keys) { + buffer.append("\t" + key); + } + buffer.deleteCharAt(0); + + for (final Map<String, Object> bucket : buckets) { + buffer.append("\n"); + + for (final String key: keys) { + buffer.append(bucket.get(key)).append("\t"); + } + buffer.deleteCharAt(buffer.length() - 1); + } + + resMsg = buffer.toString(); + + return new InterpreterResult(InterpreterResult.Code.SUCCESS, resType, resMsg); + } + + private String buildSearchHitsResponseMessage(ActionResponse response) { - if (hits == null || hits.length == 0) { + if (response.getHits() == null || response.getHits().size() == 0) { return ""; } //First : get all the keys in order to build an ordered list of the values for each hit // - final Map<String, Object> hitFields = new HashMap<>(); final List<Map<String, Object>> flattenHits = new LinkedList<>(); final Set<String> keys = new TreeSet<>(); - for (SearchHit hit : hits) { - // Fields can be found either in _source, or in fields (it depends on the query) - // - String json = hit.getSourceAsString(); - if (json == null) { - hitFields.clear(); - for (SearchHitField hitField : hit.getFields().values()) { - hitFields.put(hitField.getName(), hitField.getValues()); - } - json = gson.toJson(hitFields); - } + for (final HitWrapper hit : response.getHits()) { + + final String json = hit.getSourceAsString(); final Map<String, Object> flattenJsonMap = JsonFlattener.flattenAsMap(json); final Map<String, Object> flattenMap = new HashMap<>(); - for (Iterator<String> iter = flattenJsonMap.keySet().iterator(); iter.hasNext(); ) { + for (final Iterator<String> iter = flattenJsonMap.keySet().iterator(); iter.hasNext(); ) { // Replace keys that match a format like that : [\"keyname\"][0] final String fieldName = iter.next(); final Matcher fieldNameMatcher = FIELD_NAME_PATTERN.matcher(fieldName); if (fieldNameMatcher.matches()) { flattenMap.put(fieldNameMatcher.group(1) + fieldNameMatcher.group(2), - flattenJsonMap.get(fieldName)); + flattenJsonMap.get(fieldName)); } else { flattenMap.put(fieldName, flattenJsonMap.get(fieldName)); @@ -518,7 +558,7 @@ public class ElasticsearchInterpreter extends Interpreter { } flattenHits.add(flattenMap); - for (String key : flattenMap.keySet()) { + for (final String key : flattenMap.keySet()) { keys.add(key); } } @@ -526,15 +566,15 @@ public class ElasticsearchInterpreter extends Interpreter { // Next : build the header of the table // final StringBuffer buffer = new StringBuffer(); - for (String key : keys) { + for (final String key : keys) { buffer.append(key).append('\t'); } buffer.replace(buffer.lastIndexOf("\t"), buffer.lastIndexOf("\t") + 1, "\n"); // Finally : build the result by using the key set // - for (Map<String, Object> hit : flattenHits) { - for (String key : keys) { + for (final Map<String, Object> hit : flattenHits) { + for (final String key : keys) { final Object val = hit.get(key); if (val != null) { buffer.append(val); @@ -547,17 +587,17 @@ public class ElasticsearchInterpreter extends Interpreter { return buffer.toString(); } - private InterpreterResult buildResponseMessage(SearchResponse response) { + private InterpreterResult buildResponseMessage(ActionResponse response) { - final Aggregations aggregations = response.getAggregations(); + final List<AggWrapper> aggregations = response.getAggregations(); - if (aggregations != null && aggregations.asList().size() > 0) { + if (aggregations != null && aggregations.size() > 0) { return buildAggResponseMessage(aggregations); } return new InterpreterResult( - InterpreterResult.Code.SUCCESS, - InterpreterResult.Type.TABLE, - buildSearchHitsResponseMessage(response.getHits().getHits())); + InterpreterResult.Code.SUCCESS, + InterpreterResult.Type.TABLE, + buildSearchHitsResponseMessage(response)); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0e3c9b10/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionException.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionException.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionException.java new file mode 100644 index 0000000..6846d0a --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.elasticsearch.action; + +/** + * Runtime exception thrown when there is a problem during an action (search, get, ...). + */ +public class ActionException extends RuntimeException { + + public ActionException(String message) { + super(message); + } + + public ActionException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0e3c9b10/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionResponse.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionResponse.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionResponse.java new file mode 100644 index 0000000..4141bce --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/ActionResponse.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.elasticsearch.action; + +import java.util.LinkedList; +import java.util.List; + +/** + * Contains the result of an action (hits, aggregations, ...). + */ +public class ActionResponse { + + private boolean succeeded; + private long totalHits; + private final List<HitWrapper> hits = new LinkedList<>(); + private final List<AggWrapper> aggregations = new LinkedList<>(); + + + public ActionResponse succeeded(boolean succeeded) { + this.succeeded = succeeded; + return this; + } + + public boolean isSucceeded() { + return succeeded; + } + + public ActionResponse totalHits(long totalHits) { + this.totalHits = totalHits; + return this; + } + + public long getTotalHits() { + return totalHits; + } + + public List<HitWrapper> getHits() { + return hits; + } + + public ActionResponse addHit(HitWrapper hit) { + this.hits.add(hit); + return this; + } + + public List<AggWrapper> getAggregations() { + return aggregations; + } + + public ActionResponse addAggregation(AggWrapper aggregation) { + this.aggregations.add(aggregation); + return this; + } + + public ActionResponse hit(HitWrapper hit) { + this.addHit(hit); + return this; + } + + public HitWrapper getHit() { + return this.hits.get(0); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0e3c9b10/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/AggWrapper.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/AggWrapper.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/AggWrapper.java new file mode 100644 index 0000000..14446db --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/AggWrapper.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.elasticsearch.action; + +/** + * Contains the result of an aggregation. + */ +public class AggWrapper { + + /** Type of an aggregation (to know if there are buckets or not) */ + public enum AggregationType { SIMPLE, MULTI_BUCKETS }; + + private final AggregationType type; + private final String result; + + public AggWrapper(AggregationType type, String result) { + this.type = type; + this.result = result; + } + + public AggregationType getType() { + return type; + } + + public String getResult() { + return result; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0e3c9b10/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/HitWrapper.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/HitWrapper.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/HitWrapper.java new file mode 100644 index 0000000..3be4514 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/action/HitWrapper.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.elasticsearch.action; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +/** + * Contains the data of a hit. + */ +public class HitWrapper { + + private final JsonParser parser = new JsonParser(); + + private final String index; + private final String type; + private final String id; + private final String source; + + public HitWrapper(String index, String type, String id, String source) { + this.index = index; + this.type = type; + this.id = id; + this.source = source; + } + + public HitWrapper(String source) { + this(null, null, null, source); + } + + public String getSourceAsString() { + return source; + } + + public JsonObject getSourceAsJsonObject() { + final JsonElement element = parser.parse(source); + return element.getAsJsonObject(); + } + + public String getIndex() { + return index; + } + + public String getType() { + return type; + } + + public String getId() { + return id; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0e3c9b10/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/ElasticsearchClient.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/ElasticsearchClient.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/ElasticsearchClient.java new file mode 100644 index 0000000..48e1980 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/ElasticsearchClient.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.elasticsearch.client; + +import org.apache.zeppelin.elasticsearch.action.ActionResponse; + +/** + * Interface that must be implemented by any kind of Elasticsearch client (transport, ...). + */ +public interface ElasticsearchClient { + + ActionResponse get(String index, String type, String id); + + ActionResponse index(String index, String type, String id, String data); + + ActionResponse delete(String index, String type, String id); + + ActionResponse search(String[] indices, String[] types, String query, int size); + + void close(); +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0e3c9b10/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java new file mode 100644 index 0000000..d691597 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/HttpBasedClient.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.elasticsearch.client; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter; +import org.apache.zeppelin.elasticsearch.action.ActionException; +import org.apache.zeppelin.elasticsearch.action.ActionResponse; +import org.apache.zeppelin.elasticsearch.action.AggWrapper; +import org.apache.zeppelin.elasticsearch.action.AggWrapper.AggregationType; +import org.apache.zeppelin.elasticsearch.action.HitWrapper; +import org.json.JSONArray; +import org.json.JSONObject; + +import com.google.common.base.Joiner; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonParseException; +import com.mashape.unirest.http.HttpResponse; +import com.mashape.unirest.http.JsonNode; +import com.mashape.unirest.http.Unirest; +import com.mashape.unirest.http.exceptions.UnirestException; +import com.mashape.unirest.request.HttpRequest; +import com.mashape.unirest.request.HttpRequestWithBody; + +/** + * Elasticsearch client using the HTTP API. + */ +public class HttpBasedClient implements ElasticsearchClient { + + private static final String QUERY_STRING_TEMPLATE = + "{ \"query\": { \"query_string\": { \"query\": \"_Q_\", \"analyze_wildcard\": \"true\" } } }"; + + private final String host; + private final int port; + private final String username; + private final String password; + + private final Gson gson = new GsonBuilder().setPrettyPrinting().create(); + + public HttpBasedClient(Properties props) { + this.host = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_HOST); + this.port = Integer.parseInt(props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_PORT)); + this.username = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_BASIC_AUTH_USERNAME); + this.password = props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_BASIC_AUTH_PASSWORD); + } + + private boolean isSucceeded(HttpResponse response) { + return response.getStatus() >= 200 && response.getStatus() < 300; + } + + private JSONObject getParentField(JSONObject parent, String[] fields) { + JSONObject obj = parent; + for (int i = 0; i < fields.length - 1; i++) { + obj = obj.getJSONObject(fields[i]); + } + return obj; + } + + private JSONArray getFieldAsArray(JSONObject obj, String field) { + final String[] fields = field.split("/"); + final JSONObject parent = getParentField(obj, fields); + return parent.getJSONArray(fields[fields.length - 1]); + } + + private String getFieldAsString(HttpResponse<JsonNode> response, String field) { + return getFieldAsString(response.getBody(), field); + } + + private String getFieldAsString(JsonNode json, String field) { + return json.getObject().get(field).toString(); + } + + private long getFieldAsLong(HttpResponse<JsonNode> response, String field) { + final String[] fields = field.split("/"); + final JSONObject obj = getParentField(response.getBody().getObject(), fields); + return obj.getLong(fields[fields.length - 1]); + } + + private String getUrl(String index, String type, String id, boolean useSearch) { + try { + final StringBuilder buffer = new StringBuilder(); + buffer.append("http://").append(host).append(":").append(port).append("/"); + if (StringUtils.isNotEmpty(index)) { + buffer.append(index); + + if (StringUtils.isNotEmpty(type)) { + buffer.append("/").append(type); + + if (StringUtils.isNotEmpty(id)) { + if (useSearch) { + final String encodedId = URLEncoder.encode(id, "UTF-8"); + if (id.equals(encodedId)) { + // No difference, use directly the id + buffer.append("/").append(id); + } + else { + // There are differences: to avoid problems with some special characters + // such as / and # in id, use a "terms" query + buffer.append("/_search?source=") + .append(URLEncoder + .encode("{\"query\":{\"terms\":{\"_id\":[\"" + id + "\"]}}}", "UTF-8")); + } + } + else { + buffer.append("/").append(id); + } + } + } + } + return buffer.toString(); + } + catch (final UnsupportedEncodingException e) { + throw new ActionException(e); + } + } + + private String getUrl(String[] indices, String[] types) { + final String inds = indices == null ? null : Joiner.on(",").join(indices); + final String typs = types == null ? null : Joiner.on(",").join(types); + return getUrl(inds, typs, null, false); + } + + @Override + public ActionResponse get(String index, String type, String id) { + ActionResponse response = null; + try { + final HttpRequest request = Unirest.get(getUrl(index, type, id, true)); + if (StringUtils.isNotEmpty(username)) { + request.basicAuth(username, password); + } + + final HttpResponse<String> result = request.asString(); + final boolean isSucceeded = isSucceeded(result); + + if (isSucceeded) { + final JsonNode body = new JsonNode(result.getBody()); + if (body.getObject().has("_index")) { + response = new ActionResponse() + .succeeded(true) + .hit(new HitWrapper( + getFieldAsString(body, "_index"), + getFieldAsString(body, "_type"), + getFieldAsString(body, "_id"), + getFieldAsString(body, "_source"))); + } + else { + final JSONArray hits = getFieldAsArray(body.getObject(), "hits/hits"); + final JSONObject hit = (JSONObject) hits.iterator().next(); + response = new ActionResponse() + .succeeded(true) + .hit(new HitWrapper( + hit.getString("_index"), + hit.getString("_type"), + hit.getString("_id"), + hit.opt("_source").toString())); + } + } + else { + if (result.getStatus() == 404) { + response = new ActionResponse() + .succeeded(false); + } + else { + throw new ActionException(result.getBody()); + } + } + } + catch (final UnirestException e) { + throw new ActionException(e); + } + return response; + } + + @Override + public ActionResponse delete(String index, String type, String id) { + ActionResponse response = null; + try { + final HttpRequest request = Unirest.delete(getUrl(index, type, id, true)); + if (StringUtils.isNotEmpty(username)) { + request.basicAuth(username, password); + } + + final HttpResponse<String> result = request.asString(); + final boolean isSucceeded = isSucceeded(result); + + if (isSucceeded) { + final JsonNode body = new JsonNode(result.getBody()); + response = new ActionResponse() + .succeeded(true) + .hit(new HitWrapper( + getFieldAsString(body, "_index"), + getFieldAsString(body, "_type"), + getFieldAsString(body, "_id"), + null)); + } + else { + throw new ActionException(result.getBody()); + } + } + catch (final UnirestException e) { + throw new ActionException(e); + } + return response; + } + + @Override + public ActionResponse index(String index, String type, String id, String data) { + ActionResponse response = null; + try { + HttpRequestWithBody request = null; + if (StringUtils.isEmpty(id)) { + request = Unirest.post(getUrl(index, type, id, false)); + } + else { + request = Unirest.put(getUrl(index, type, id, false)); + } + request + .header("Accept", "application/json") + .header("Content-Type", "application/json") + .body(data).getHttpRequest(); + if (StringUtils.isNotEmpty(username)) { + request.basicAuth(username, password); + } + + final HttpResponse<JsonNode> result = request.asJson(); + final boolean isSucceeded = isSucceeded(result); + + if (isSucceeded) { + response = new ActionResponse() + .succeeded(true) + .hit(new HitWrapper( + getFieldAsString(result, "_index"), + getFieldAsString(result, "_type"), + getFieldAsString(result, "_id"), + null)); + } + else { + throw new ActionException(result.getBody().toString()); + } + } + catch (final UnirestException e) { + throw new ActionException(e); + } + return response; + } + + @Override + public ActionResponse search(String[] indices, String[] types, String query, int size) { + ActionResponse response = null; + + if (!StringUtils.isEmpty(query)) { + // The query can be either JSON-formatted, nor a Lucene query + // So, try to parse as a JSON => if there is an error, consider the query a Lucene one + try { + gson.fromJson(query, Map.class); + } + catch (final JsonParseException e) { + // This is not a JSON (or maybe not well formatted...) + query = QUERY_STRING_TEMPLATE.replace("_Q_", query); + } + } + + try { + final HttpRequestWithBody request = Unirest + .post(getUrl(indices, types) + "/_search?size=" + size) + .header("Content-Type", "application/json"); + + if (StringUtils.isNoneEmpty(query)) { + request.header("Accept", "application/json").body(query); + } + + if (StringUtils.isNotEmpty(username)) { + request.basicAuth(username, password); + } + + final HttpResponse<JsonNode> result = request.asJson(); + final JSONObject body = result.getBody() != null ? result.getBody().getObject() : null; + + if (isSucceeded(result)) { + final long total = getFieldAsLong(result, "hits/total"); + + response = new ActionResponse() + .succeeded(true) + .totalHits(total); + + if (containsAggs(result)) { + JSONObject aggregationsMap = body.getJSONObject("aggregations"); + if (aggregationsMap == null) { + aggregationsMap = body.getJSONObject("aggs"); + } + + for (final String key: aggregationsMap.keySet()) { + final JSONObject aggResult = aggregationsMap.getJSONObject(key); + if (aggResult.has("buckets")) { + // Multi-bucket aggregations + final Iterator<Object> buckets = aggResult.getJSONArray("buckets").iterator(); + while (buckets.hasNext()) { + response.addAggregation( + new AggWrapper(AggregationType.MULTI_BUCKETS, buckets.next().toString())); + } + } + else { + response.addAggregation( + new AggWrapper(AggregationType.SIMPLE, aggregationsMap.toString())); + } + break; // Keep only one aggregation + } + } + else if (size > 0 && total > 0) { + final JSONArray hits = getFieldAsArray(body, "hits/hits"); + final Iterator<Object> iter = hits.iterator(); + + while (iter.hasNext()) { + final JSONObject hit = (JSONObject) iter.next(); + final Object data = + hit.opt("_source") != null ? hit.opt("_source") : hit.opt("fields"); + response.addHit(new HitWrapper( + hit.getString("_index"), + hit.getString("_type"), + hit.getString("_id"), + data.toString())); + } + } + } + else { + throw new ActionException(body.get("error").toString()); + } + } + catch (final UnirestException e) { + throw new ActionException(e); + } + + return response; + } + + private boolean containsAggs(HttpResponse<JsonNode> result) { + return result.getBody() != null && + (result.getBody().getObject().has("aggregations") || + result.getBody().getObject().has("aggs")); + } + + @Override + public void close() { + } + + @Override + public String toString() { + return "HttpBasedClient [host=" + host + ", port=" + port + ", username=" + username + "]"; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0e3c9b10/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java new file mode 100644 index 0000000..1451019 --- /dev/null +++ b/elasticsearch/src/main/java/org/apache/zeppelin/elasticsearch/client/TransportBasedClient.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.elasticsearch.client; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter; +import org.apache.zeppelin.elasticsearch.action.ActionResponse; +import org.apache.zeppelin.elasticsearch.action.AggWrapper; +import org.apache.zeppelin.elasticsearch.action.HitWrapper; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHitField; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.metrics.InternalMetricsAggregation; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonSyntaxException; + +/** + * Elasticsearch client using the transport protocol. + */ +public class TransportBasedClient implements ElasticsearchClient { + + private final Gson gson = new GsonBuilder().setPrettyPrinting().create(); + private final Client client; + + public TransportBasedClient(Properties props) throws UnknownHostException { + final String host = + props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_HOST); + final int port = Integer.parseInt( + props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_PORT)); + final String clusterName = + props.getProperty(ElasticsearchInterpreter.ELASTICSEARCH_CLUSTER_NAME); + + final Settings settings = Settings.settingsBuilder() + .put("cluster.name", clusterName) + .put(props) + .build(); + + client = TransportClient.builder().settings(settings).build() + .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port)); + } + + @Override + public ActionResponse get(String index, String type, String id) { + final GetResponse getResp = client + .prepareGet(index, type, id) + .get(); + + return new ActionResponse() + .succeeded(getResp.isExists()) + .hit(new HitWrapper( + getResp.getIndex(), + getResp.getType(), + getResp.getId(), + getResp.getSourceAsString())); + } + + @Override + public ActionResponse delete(String index, String type, String id) { + final DeleteResponse delResp = client + .prepareDelete(index, type, id) + .get(); + + return new ActionResponse() + .succeeded(delResp.isFound()) + .hit(new HitWrapper( + delResp.getIndex(), + delResp.getType(), + delResp.getId(), + null)); + } + + @Override + public ActionResponse index(String index, String type, String id, String data) { + final IndexResponse idxResp = client + .prepareIndex(index, type, id) + .setSource(data) + .get(); + + return new ActionResponse() + .succeeded(idxResp.isCreated()) + .hit(new HitWrapper( + idxResp.getIndex(), + idxResp.getType(), + idxResp.getId(), + null)); + } + + @Override + public ActionResponse search(String[] indices, String[] types, String query, int size) { + final SearchRequestBuilder reqBuilder = new SearchRequestBuilder( + client, SearchAction.INSTANCE); + reqBuilder.setIndices(); + + if (indices != null) { + reqBuilder.setIndices(indices); + } + if (types != null) { + reqBuilder.setTypes(types); + } + + if (!StringUtils.isEmpty(query)) { + // The query can be either JSON-formatted, nor a Lucene query + // So, try to parse as a JSON => if there is an error, consider the query a Lucene one + try { + @SuppressWarnings("rawtypes") + final Map source = gson.fromJson(query, Map.class); + reqBuilder.setExtraSource(source); + } + catch (final JsonSyntaxException e) { + // This is not a JSON (or maybe not well formatted...) + reqBuilder.setQuery(QueryBuilders.queryStringQuery(query).analyzeWildcard(true)); + } + } + + reqBuilder.setSize(size); + + final SearchResponse searchResp = reqBuilder.get(); + + final ActionResponse actionResp = new ActionResponse() + .succeeded(true) + .totalHits(searchResp.getHits().getTotalHits()); + + if (searchResp.getAggregations() != null) { + setAggregations(searchResp.getAggregations(), actionResp); + } + else { + for (final SearchHit hit: searchResp.getHits()) { + // Fields can be found either in _source, or in fields (it depends on the query) + // => specific for elasticsearch's version < 5 + // + String src = hit.getSourceAsString(); + if (src == null) { + final Map<String, Object> hitFields = new HashMap<>(); + for (final SearchHitField hitField : hit.getFields().values()) { + hitFields.put(hitField.getName(), hitField.getValues()); + } + src = gson.toJson(hitFields); + } + actionResp.addHit(new HitWrapper(hit.getIndex(), hit.getType(), hit.getId(), src)); + } + } + + return actionResp; + } + + private void setAggregations(Aggregations aggregations, ActionResponse actionResp) { + // Only the result of the first aggregation is returned + // + final Aggregation agg = aggregations.asList().get(0); + + if (agg instanceof InternalMetricsAggregation) { + actionResp.addAggregation(new AggWrapper(AggWrapper.AggregationType.SIMPLE, + XContentHelper.toString((InternalMetricsAggregation) agg).toString())); + } + else if (agg instanceof InternalSingleBucketAggregation) { + actionResp.addAggregation(new AggWrapper(AggWrapper.AggregationType.SIMPLE, + XContentHelper.toString((InternalSingleBucketAggregation) agg).toString())); + } + else if (agg instanceof InternalMultiBucketAggregation) { + final Set<String> headerKeys = new HashSet<>(); + final List<Map<String, Object>> buckets = new LinkedList<>(); + final InternalMultiBucketAggregation multiBucketAgg = (InternalMultiBucketAggregation) agg; + + for (final MultiBucketsAggregation.Bucket bucket : multiBucketAgg.getBuckets()) { + try { + final XContentBuilder builder = XContentFactory.jsonBuilder(); + bucket.toXContent(builder, null); + actionResp.addAggregation( + new AggWrapper(AggWrapper.AggregationType.MULTI_BUCKETS, builder.string())); + } + catch (final IOException e) { + // Ignored + } + } + } + } + + @Override + public void close() { + if (client != null) { + client.close(); + } + } + + @Override + public String toString() { + return "TransportBasedClient []"; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0e3c9b10/elasticsearch/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/elasticsearch/src/main/resources/interpreter-setting.json b/elasticsearch/src/main/resources/interpreter-setting.json index 089c811..18200ae 100644 --- a/elasticsearch/src/main/resources/interpreter-setting.json +++ b/elasticsearch/src/main/resources/interpreter-setting.json @@ -16,6 +16,12 @@ "defaultValue": "9300", "description": "The port for Elasticsearch" }, + "elasticsearch.client.type": { + "envName": "ELASTICSEARCH_CLIENT_TYPE", + "propertyName": "elasticsearch.client.type", + "defaultValue": "transport", + "description": "The type of client for Elasticsearch (transport or http)" + }, "elasticsearch.cluster.name": { "envName": "ELASTICSEARCH_CLUSTER_NAME", "propertyName": "elasticsearch.cluster.name", @@ -27,6 +33,18 @@ "propertyName": "elasticsearch.result.size", "defaultValue": "10", "description": "The size of the result set of a search query" + }, + "elasticsearch.basicauth.username": { + "envName": "ELASTICSEARCH_BASIC_AUTH_USERNAME", + "propertyName": "elasticsearch.basicauth.username", + "defaultValue": "", + "description": "Username for a basic authentication" + }, + "elasticsearch.basicauth.password": { + "envName": "ELASTICSEARCH_BASIC_AUTH_PASSWORD", + "propertyName": "elasticsearch.basicauth.password", + "defaultValue": "", + "description": "Password for a basic authentication" } }, "editor": { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0e3c9b10/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java ---------------------------------------------------------------------- diff --git a/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java b/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java index 8d3a14b..aece163 100644 --- a/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java +++ b/elasticsearch/src/test/java/org/apache/zeppelin/elasticsearch/ElasticsearchInterpreterTest.java @@ -19,6 +19,7 @@ package org.apache.zeppelin.elasticsearch; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import java.io.IOException; import java.util.ArrayList; @@ -27,8 +28,11 @@ import java.util.Date; import java.util.List; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang.math.RandomUtils; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; @@ -40,13 +44,19 @@ import org.elasticsearch.node.NodeBuilder; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.experimental.theories.DataPoint; +import org.junit.experimental.theories.Theories; +import org.junit.experimental.theories.Theory; +import org.junit.runner.RunWith; +@RunWith(Theories.class) public class ElasticsearchInterpreterTest { + @DataPoint public static ElasticsearchInterpreter transportInterpreter; + @DataPoint public static ElasticsearchInterpreter httpInterpreter; + private static Client elsClient; private static Node elsNode; - private static ElasticsearchInterpreter interpreter; private static final String[] METHODS = { "GET", "PUT", "DELETE", "POST" }; private static final int[] STATUS = { 200, 404, 500, 403 }; @@ -57,6 +67,8 @@ public class ElasticsearchInterpreterTest { private static final String ELS_HTTP_PORT = "10200"; private static final String ELS_PATH = "/tmp/els"; + private static final AtomicInteger deleteId = new AtomicInteger(2); + @BeforeClass public static void populate() throws IOException { @@ -80,7 +92,7 @@ public class ElasticsearchInterpreterTest { .endObject() .endObject().endObject().endObject()).get(); - for (int i = 0; i < 50; i++) { + for (int i = 0; i < 48; i++) { elsClient.prepareIndex("logs", "http", "" + i) .setRefresh(true) .setSource(jsonBuilder() @@ -97,22 +109,50 @@ public class ElasticsearchInterpreterTest { .get(); } + for (int i = 1; i < 3; i++) { + elsClient.prepareIndex("logs", "http", "very/strange/id#" + i) + .setRefresh(true) + .setSource(jsonBuilder() + .startObject() + .field("date", new Date()) + .startObject("request") + .field("method", METHODS[RandomUtils.nextInt(METHODS.length)]) + .field("url", "/zeppelin/" + UUID.randomUUID().toString()) + .field("headers", Arrays.asList("Accept: *.*", "Host: apache.org")) + .endObject() + .field("status", STATUS[RandomUtils.nextInt(STATUS.length)]) + .field("content_length", RandomUtils.nextInt(2000)) + ) + .get(); + } + final Properties props = new Properties(); props.put(ElasticsearchInterpreter.ELASTICSEARCH_HOST, ELS_HOST); - props.put(ElasticsearchInterpreter.ELASTICSEARCH_PORT, ELS_TRANSPORT_PORT); props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLUSTER_NAME, ELS_CLUSTER_NAME); - interpreter = new ElasticsearchInterpreter(props); - interpreter.open(); + + props.put(ElasticsearchInterpreter.ELASTICSEARCH_PORT, ELS_TRANSPORT_PORT); + props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLIENT_TYPE, "transport"); + transportInterpreter = new ElasticsearchInterpreter(props); + transportInterpreter.open(); + + props.put(ElasticsearchInterpreter.ELASTICSEARCH_PORT, ELS_HTTP_PORT); + props.put(ElasticsearchInterpreter.ELASTICSEARCH_CLIENT_TYPE, "http"); + httpInterpreter = new ElasticsearchInterpreter(props); + httpInterpreter.open(); } @AfterClass public static void clean() { - if (interpreter != null) { - interpreter.close(); + if (transportInterpreter != null) { + transportInterpreter.close(); + } + + if (httpInterpreter != null) { + httpInterpreter.close(); } if (elsClient != null) { - elsClient.admin().indices().delete(new DeleteIndexRequest("logs")).actionGet(); + elsClient.admin().indices().delete(new DeleteIndexRequest("*")).actionGet(); elsClient.close(); } @@ -121,97 +161,140 @@ public class ElasticsearchInterpreterTest { } } - @Test - public void testCount() { + private InterpreterContext buildContext(String noteAndParagraphId) { + final AngularObjectRegistry angularObjReg = new AngularObjectRegistry("elasticsearch", null); + return new InterpreterContext(noteAndParagraphId, noteAndParagraphId, null, null, null, null, null, + null, angularObjReg , null, null, null); + } + + @Theory + public void testCount(ElasticsearchInterpreter interpreter) { + + final InterpreterContext ctx = buildContext("testCount"); - InterpreterResult res = interpreter.interpret("count /unknown", null); + InterpreterResult res = interpreter.interpret("count /unknown", ctx); assertEquals(Code.ERROR, res.code()); - res = interpreter.interpret("count /logs", null); + res = interpreter.interpret("count /logs", ctx); + assertEquals(Code.SUCCESS, res.code()); assertEquals("50", res.message().get(0).getData()); + assertNotNull(ctx.getAngularObjectRegistry().get("count_testCount", null, null)); + assertEquals(50l, ctx.getAngularObjectRegistry().get("count_testCount", null, null).get()); + + res = interpreter.interpret("count /logs { \"query\": { \"match\": { \"status\": 500 } } }", ctx); + assertEquals(Code.SUCCESS, res.code()); } - @Test - public void testGet() { + @Theory + public void testGet(ElasticsearchInterpreter interpreter) { + + final InterpreterContext ctx = buildContext("get"); - InterpreterResult res = interpreter.interpret("get /logs/http/unknown", null); + InterpreterResult res = interpreter.interpret("get /logs/http/unknown", ctx); assertEquals(Code.ERROR, res.code()); - res = interpreter.interpret("get /logs/http/10", null); + res = interpreter.interpret("get /logs/http/unknown/unknown", ctx); + assertEquals(Code.ERROR, res.code()); + + res = interpreter.interpret("get /unknown/unknown/unknown", ctx); + assertEquals(Code.ERROR, res.code()); + + res = interpreter.interpret("get /logs/http/very/strange/id#1", ctx); + assertEquals(Code.SUCCESS, res.code()); + + res = interpreter.interpret("get /logs/http/4", ctx); + assertEquals(Code.SUCCESS, res.code()); + + res = interpreter.interpret("get /logs/_all/4", ctx); assertEquals(Code.SUCCESS, res.code()); } - @Test - public void testSearch() { + @Theory + public void testSearch(ElasticsearchInterpreter interpreter) { + + final InterpreterContext ctx = buildContext("search"); - InterpreterResult res = interpreter.interpret("size 10\nsearch /logs *", null); + InterpreterResult res = interpreter.interpret("size 10\nsearch /logs *", ctx); assertEquals(Code.SUCCESS, res.code()); - res = interpreter.interpret("search /logs {{{hello}}}", null); + res = interpreter.interpret("search /logs {{{hello}}}", ctx); assertEquals(Code.ERROR, res.code()); - res = interpreter.interpret("search /logs { \"query\": { \"match\": { \"status\": 500 } } }", null); + res = interpreter.interpret("search /logs { \"query\": { \"match\": { \"status\": 500 } } }", ctx); assertEquals(Code.SUCCESS, res.code()); - res = interpreter.interpret("search /logs status:404", null); + res = interpreter.interpret("search /logs status:404", ctx); assertEquals(Code.SUCCESS, res.code()); - res = interpreter.interpret("search /logs { \"fields\": [ \"date\", \"request.headers\" ], \"query\": { \"match\": { \"status\": 500 } } }", null); + res = interpreter.interpret("search /logs { \"fields\": [ \"date\", \"request.headers\" ], \"query\": { \"match\": { \"status\": 500 } } }", ctx); assertEquals(Code.SUCCESS, res.code()); } - @Test - public void testAgg() { + @Theory + public void testAgg(ElasticsearchInterpreter interpreter) { + + final InterpreterContext ctx = buildContext("agg"); // Single-value metric InterpreterResult res = interpreter.interpret("search /logs { \"aggs\" : { \"distinct_status_count\" : " + - " { \"cardinality\" : { \"field\" : \"status\" } } } }", null); + " { \"cardinality\" : { \"field\" : \"status\" } } } }", ctx); assertEquals(Code.SUCCESS, res.code()); // Multi-value metric res = interpreter.interpret("search /logs { \"aggs\" : { \"content_length_stats\" : " + - " { \"extended_stats\" : { \"field\" : \"content_length\" } } } }", null); + " { \"extended_stats\" : { \"field\" : \"content_length\" } } } }", ctx); assertEquals(Code.SUCCESS, res.code()); // Single bucket res = interpreter.interpret("search /logs { \"aggs\" : { " + " \"200_OK\" : { \"filter\" : { \"term\": { \"status\": \"200\" } }, " + - " \"aggs\" : { \"avg_length\" : { \"avg\" : { \"field\" : \"content_length\" } } } } } }", null); + " \"aggs\" : { \"avg_length\" : { \"avg\" : { \"field\" : \"content_length\" } } } } } }", ctx); assertEquals(Code.SUCCESS, res.code()); // Multi-buckets res = interpreter.interpret("search /logs { \"aggs\" : { \"status_count\" : " + - " { \"terms\" : { \"field\" : \"status\" } } } }", null); + " { \"terms\" : { \"field\" : \"status\" } } } }", ctx); assertEquals(Code.SUCCESS, res.code()); - + res = interpreter.interpret("search /logs { \"aggs\" : { " + " \"length\" : { \"terms\": { \"field\": \"status\" }, " + - " \"aggs\" : { \"sum_length\" : { \"sum\" : { \"field\" : \"content_length\" } }, \"sum_status\" : { \"sum\" : { \"field\" : \"status\" } } } } } }", null); + " \"aggs\" : { \"sum_length\" : { \"sum\" : { \"field\" : \"content_length\" } }, \"sum_status\" : { \"sum\" : { \"field\" : \"status\" } } } } } }", ctx); assertEquals(Code.SUCCESS, res.code()); } - @Test - public void testIndex() { + @Theory + public void testIndex(ElasticsearchInterpreter interpreter) { InterpreterResult res = interpreter.interpret("index /logs { \"date\": \"" + new Date() + "\", \"method\": \"PUT\", \"status\": \"500\" }", null); assertEquals(Code.ERROR, res.code()); + res = interpreter.interpret("index /logs/http { bad ", null); + assertEquals(Code.ERROR, res.code()); + res = interpreter.interpret("index /logs/http { \"date\": \"2015-12-06T14:54:23.368Z\", \"method\": \"PUT\", \"status\": \"500\" }", null); assertEquals(Code.SUCCESS, res.code()); + + res = interpreter.interpret("index /logs/http/1000 { \"date\": \"2015-12-06T14:54:23.368Z\", \"method\": \"PUT\", \"status\": \"500\" }", null); + assertEquals(Code.SUCCESS, res.code()); } - @Test - public void testDelete() { + @Theory + public void testDelete(ElasticsearchInterpreter interpreter) { InterpreterResult res = interpreter.interpret("delete /logs/http/unknown", null); assertEquals(Code.ERROR, res.code()); - res = interpreter.interpret("delete /logs/http/11", null); - assertEquals("11", res.message().get(0).getData()); + res = interpreter.interpret("delete /unknown/unknown/unknown", null); + assertEquals(Code.ERROR, res.code()); + + final int testDeleteId = deleteId.decrementAndGet(); + res = interpreter.interpret("delete /logs/http/" + testDeleteId, null); + assertEquals(Code.SUCCESS, res.code()); + assertEquals("" + testDeleteId, res.message().get(0).getData()); } - @Test - public void testMisc() { + @Theory + public void testMisc(ElasticsearchInterpreter interpreter) { InterpreterResult res = interpreter.interpret(null, null); assertEquals(Code.SUCCESS, res.code()); @@ -220,23 +303,23 @@ public class ElasticsearchInterpreterTest { assertEquals(Code.SUCCESS, res.code()); } - @Test - public void testCompletion() { - List expectedResultOne = Arrays.asList(new InterpreterCompletion("count", "count")); - List expectedResultTwo = Arrays.asList(new InterpreterCompletion("help", "help")); + @Theory + public void testCompletion(ElasticsearchInterpreter interpreter) { + final List<InterpreterCompletion> expectedResultOne = Arrays.asList(new InterpreterCompletion("count", "count")); + final List<InterpreterCompletion> expectedResultTwo = Arrays.asList(new InterpreterCompletion("help", "help")); - List<InterpreterCompletion> resultOne = interpreter.completion("co", 0); - List<InterpreterCompletion> resultTwo = interpreter.completion("he", 0); - List<InterpreterCompletion> resultAll = interpreter.completion("", 0); + final List<InterpreterCompletion> resultOne = interpreter.completion("co", 0); + final List<InterpreterCompletion> resultTwo = interpreter.completion("he", 0); + final List<InterpreterCompletion> resultAll = interpreter.completion("", 0); Assert.assertEquals(expectedResultOne, resultOne); Assert.assertEquals(expectedResultTwo, resultTwo); - List allCompletionList = new ArrayList<>(); - for (InterpreterCompletion ic : resultAll) { + final List<String> allCompletionList = new ArrayList<>(); + for (final InterpreterCompletion ic : resultAll) { allCompletionList.add(ic.getName()); } - Assert.assertEquals(interpreter.COMMANDS, allCompletionList); + Assert.assertEquals(ElasticsearchInterpreter.COMMANDS, allCompletionList); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0e3c9b10/zeppelin-distribution/src/bin_license/LICENSE ---------------------------------------------------------------------- diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE index 1197ea7..ae3ea5f 100644 --- a/zeppelin-distribution/src/bin_license/LICENSE +++ b/zeppelin-distribution/src/bin_license/LICENSE @@ -13,6 +13,7 @@ The following components are provided under Apache License. (Apache 2.0) Apache Commons Exec (commons-exec:commons-exec:1.3 - http://commons.apache.org/exec/) (Apache 2.0) Http Components (org.apache.httpcomponents:httpcore:4.3.3 - https://github.com/apache/httpclient) (Apache 2.0) Http Components (org.apache.httpcomponents:httpclient:4.3.6 - https://github.com/apache/httpclient) + (Apache 2.0) Http Components (org.apache.httpcomponents:httpasyncclient:4.0.2 - https://github.com/apache/httpclient) (Apache 2.0) Apache Commons Lang (org.apache.commons:commons-lang:2.5 - http://commons.apache.org/proper/commons-lang/) (Apache 2.0) Apache Commons Lang 3 (org.apache.commons:commons-lang3:3.4 - http://commons.apache.org/proper/commons-lang/) (Apache 2.0) Apache Commons Math 3 (org.apache.commons:commons-math3:3.6.1 - http://commons.apache.org/proper/commons-math/) @@ -267,6 +268,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version (The MIT License) Java String Similarity 0.12 (info.debatty:java-string-similarity:0.12 - https://github.com/tdebatty/java-string-similarity) (The MIT License) Java LSH 0.10 (info.debatty:java-lsh:0.10 - https://github.com/tdebatty/java-LSH) (The MIT License) JSoup 1.6.1 (org.jsoup:jsoup:1.6.1 - https://github.com/jhy/jsoup/) + (The MIT License) Unirest 1.4.9 (com.mashape.unirest:unirest-java:1.4.9 - https://github.com/Mashape/unirest-java) (The MIT License) ngclipboard v1.1.1 (https://github.com/sachinchoolur/ngclipboard) - https://github.com/sachinchoolur/ngclipboard/blob/1.1.1/LICENSE) ========================================================================