exceptionfactory commented on code in PR #11032: URL: https://github.com/apache/nifi/pull/11032#discussion_r3034178500
########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.oss.driver.api.core.CqlSession; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.cassandra.CassandraSessionProviderService; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * AbstractCassandraProcessor is a base class for Cassandra processors and contains logic and variables common to most + * processors integrating with Apache Cassandra. + */ +public abstract class AbstractCassandraProcessor extends AbstractProcessor { + + static final PropertyDescriptor CONNECTION_PROVIDER_SERVICE = new PropertyDescriptor.Builder() + .name("Cassandra Connection Provider") + .description("Specifies the Cassandra connection providing controller service to be used to connect to Cassandra cluster.") + .required(true) + .identifiesControllerService(CassandraSessionProviderService.class) + .build(); + + static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("Specifies the character set of the record data.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is transferred to this relationship if the operation completed successfully.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is transferred to this relationship if the operation failed.") + .build(); + + static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("A FlowFile is transferred to this relationship if the operation cannot be completed but attempting " + + "it again may succeed.") + .build(); + + protected static final List<PropertyDescriptor> COMMON_PROPERTY_DESCRIPTORS = List.of( + CONNECTION_PROVIDER_SERVICE, + CHARSET + ); + + protected final AtomicReference<CqlSession> cassandraSession = new AtomicReference<>(null); Review Comment: Instead of using a shared member variable, prefer a get method. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor: ########## @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.cassandra.QueryCassandra Review Comment: Trailing new lines should be added where applicable. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.oss.driver.api.core.CqlSession; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.cassandra.CassandraSessionProviderService; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * AbstractCassandraProcessor is a base class for Cassandra processors and contains logic and variables common to most + * processors integrating with Apache Cassandra. + */ +public abstract class AbstractCassandraProcessor extends AbstractProcessor { + + static final PropertyDescriptor CONNECTION_PROVIDER_SERVICE = new PropertyDescriptor.Builder() Review Comment: Property Descriptor variables should align closely with the property name, such as `CASSANDRA_CONNECTION_PROVIDER`. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java: ########## @@ -0,0 +1,715 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; +import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder; +import com.datastax.oss.driver.api.core.servererrors.QueryExecutionException; +import com.datastax.oss.driver.api.core.servererrors.QueryValidationException; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.ListType; +import com.datastax.oss.driver.api.core.type.MapType; +import com.datastax.oss.driver.api.core.type.SetType; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.commons.text.StringEscapeUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.cassandra.converter.CassandraTypeConverter; +import org.apache.nifi.processors.cassandra.converter.StandardCassandraTypeConverter; +import org.apache.nifi.util.StopWatch; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.time.Duration; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +@Tags({"cassandra", "cql", "select"}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) select query on a Cassandra cluster. Query result " + + "may be converted to Avro or JSON format. Streaming is used so arbitrarily large result sets are supported. This processor can be " + + "scheduled to run on a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + + "select query. FlowFile attribute 'executecql.row.count' indicates how many rows were selected.") +@WritesAttributes({ + @WritesAttribute(attribute = "executecql.row.count", description = "The number of rows returned by the CQL query"), + @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this " + + "attribute will not be populated."), + @WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + + "FlowFiles were produced") +}) +public class QueryCassandra extends AbstractCassandraProcessor { + + private static final CassandraTypeConverter typeConverter = new StandardCassandraTypeConverter(); + + public static final String AVRO_FORMAT = "Avro"; + public static final String JSON_FORMAT = "Json"; + + public static final String RESULT_ROW_COUNT = "executecql.row.count"; + + public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); + public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); + public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key(); + + private static final byte[] OPEN_OBJ = "{".getBytes(StandardCharsets.UTF_8); + private static final byte[] CLOSE_OBJ = "}".getBytes(StandardCharsets.UTF_8); + private static final byte[] COMMA = ",".getBytes(StandardCharsets.UTF_8); + private static final byte[] COLON = ":".getBytes(StandardCharsets.UTF_8); + private static final byte[] QUOTE = "\"".getBytes(StandardCharsets.UTF_8); + + private static final byte[] RESULTS_PREFIX = "{\"results\":[".getBytes(StandardCharsets.UTF_8); + private static final byte[] EMPTY_RESULTS = "{\"results\":[]}".getBytes(StandardCharsets.UTF_8); + private static final byte[] RESULTS_SUFFIX = "]}".getBytes(StandardCharsets.UTF_8); + public static final PropertyDescriptor CQL_SELECT_QUERY = new PropertyDescriptor.Builder() + .name("CQL Select Query") + .description("CQL select query") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder() + .name("Max Wait Time") + .description("The maximum amount of time allowed for a running CQL select query. Must be of format " + + "<duration> <TimeUnit> where <duration> is a non-negative integer and TimeUnit is a supported " + + "Time Unit, such as: millis, secs, mins, hrs, days. A value of zero means there is no limit. ") + .defaultValue("0 seconds") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() + .name("Fetch Size") + .description("The number of result rows to be fetched from the result set at a time. Zero is the default " + + "and means there is no limit.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder() + .name("Max Rows Per Flow File") + .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large " + + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Output Batch Size") + .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows " + + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles " + + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will " + + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The maxvalue.* and fragment.count attributes will not be set on FlowFiles when this " + + "property is set.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor OUTPUT_FORMAT = new PropertyDescriptor.Builder() + .name("Output Format") + .description("The format to which the result rows will be converted. If JSON is selected, the output will " + + "contain an object with field 'results' containing an array of result rows. Each row in the array is a " + + "map of the named column to its value. For example: { \"results\": [{\"userid\":1, \"name\":\"Joe Smith\"}]}") + .required(true) + .allowableValues(AVRO_FORMAT, JSON_FORMAT) + .defaultValue(AVRO_FORMAT) + .build(); + + public static final PropertyDescriptor TIMESTAMP_FORMAT_PATTERN = new PropertyDescriptor.Builder() + .name("Timestamp Format Pattern for JSON output") + .description("Pattern to use when converting timestamp fields to JSON. Note: the formatted timestamp will be in UTC timezone.") + .required(true) + .defaultValue("yyyy-MM-dd HH:mm:ssZ") + .addValidator((subject, input, context) -> { + final ValidationResult.Builder vrb = new ValidationResult.Builder().subject(subject).input(input); + try { + new SimpleDateFormat(input).format(new Date()); + vrb.valid(true).explanation("Valid date format pattern"); + } catch (Exception ex) { + vrb.valid(false).explanation("the pattern is invalid: " + ex.getMessage()); + } + return vrb.build(); + }) + .build(); + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = + Stream.concat( + COMMON_PROPERTY_DESCRIPTORS.stream(), + Stream.of( + CQL_SELECT_QUERY, + QUERY_TIMEOUT, + FETCH_SIZE, + MAX_ROWS_PER_FLOW_FILE, + OUTPUT_BATCH_SIZE, + OUTPUT_FORMAT, + TIMESTAMP_FORMAT_PATTERN + ) + ).toList(); + + private static final Set<Relationship> RELATIONSHIPS = Set.of( + REL_SUCCESS, + REL_FAILURE, + REL_RETRY + ); + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + @OnScheduled + @Override + public void onScheduled(final ProcessContext context) { + super.onScheduled(context); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + getLogger().info("DEBUG: NEW TIMEOUT LOGIC ACTIVE"); Review Comment: This kind of debug logging is not appropriate and should be removed. ########## nifi-assembly/pom.xml: ########## @@ -747,6 +747,39 @@ language governing permissions and limitations under the License. --> <version>2.9.0-SNAPSHOT</version> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-cassandra-processors</artifactId> + <version>2.9.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-cassandra-services</artifactId> + <version>2.9.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-cassandra-services-api</artifactId> + <version>2.9.0-SNAPSHOT</version> + </dependency> Review Comment: Component JAR dependencies do not belong in the `nifi-assembly` module. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.oss.driver.api.core.CqlSession; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.cassandra.CassandraSessionProviderService; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * AbstractCassandraProcessor is a base class for Cassandra processors and contains logic and variables common to most + * processors integrating with Apache Cassandra. + */ +public abstract class AbstractCassandraProcessor extends AbstractProcessor { + + static final PropertyDescriptor CONNECTION_PROVIDER_SERVICE = new PropertyDescriptor.Builder() + .name("Cassandra Connection Provider") + .description("Specifies the Cassandra connection providing controller service to be used to connect to Cassandra cluster.") + .required(true) + .identifiesControllerService(CassandraSessionProviderService.class) + .build(); + + static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("Specifies the character set of the record data.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is transferred to this relationship if the operation completed successfully.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is transferred to this relationship if the operation failed.") + .build(); + + static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("A FlowFile is transferred to this relationship if the operation cannot be completed but attempting " + + "it again may succeed.") Review Comment: Descriptions should use multi-line strings instead of concatenation. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-nar/pom.xml: ########## @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-cassandra-bundle</artifactId> + <version>2.9.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> Review Comment: This relative path is not needed and should be removed. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.oss.driver.api.core.CqlSession; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.cassandra.CassandraSessionProviderService; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * AbstractCassandraProcessor is a base class for Cassandra processors and contains logic and variables common to most + * processors integrating with Apache Cassandra. + */ +public abstract class AbstractCassandraProcessor extends AbstractProcessor { + + static final PropertyDescriptor CONNECTION_PROVIDER_SERVICE = new PropertyDescriptor.Builder() + .name("Cassandra Connection Provider") + .description("Specifies the Cassandra connection providing controller service to be used to connect to Cassandra cluster.") + .required(true) + .identifiesControllerService(CassandraSessionProviderService.class) + .build(); + + static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("Specifies the character set of the record data.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is transferred to this relationship if the operation completed successfully.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is transferred to this relationship if the operation failed.") + .build(); + + static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("A FlowFile is transferred to this relationship if the operation cannot be completed but attempting " + + "it again may succeed.") + .build(); + + protected static final List<PropertyDescriptor> COMMON_PROPERTY_DESCRIPTORS = List.of( + CONNECTION_PROVIDER_SERVICE, + CHARSET + ); + + protected final AtomicReference<CqlSession> cassandraSession = new AtomicReference<>(null); + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + Set<ValidationResult> results = new HashSet<>(); + + if (!validationContext.getProperty(CONNECTION_PROVIDER_SERVICE).isSet()) { + results.add(new ValidationResult.Builder() + .subject(CONNECTION_PROVIDER_SERVICE.getDisplayName()) + .valid(false) + .explanation("Cassandra Connection Provider must be specified.") + .build()); + } + + return results; + } Review Comment: This method is not needed and should be removed. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-nar/src/main/resources/META-INF/NOTICE: ########## @@ -0,0 +1,323 @@ +nifi-cassandra-nar Review Comment: Many of the references in this file are not accurate, because they are instead included in shared NAR bundles. Please review each library listed against the actual NAR contents. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java: ########## @@ -0,0 +1,715 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; +import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder; +import com.datastax.oss.driver.api.core.servererrors.QueryExecutionException; +import com.datastax.oss.driver.api.core.servererrors.QueryValidationException; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.ListType; +import com.datastax.oss.driver.api.core.type.MapType; +import com.datastax.oss.driver.api.core.type.SetType; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.commons.text.StringEscapeUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.cassandra.converter.CassandraTypeConverter; +import org.apache.nifi.processors.cassandra.converter.StandardCassandraTypeConverter; +import org.apache.nifi.util.StopWatch; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.time.Duration; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +@Tags({"cassandra", "cql", "select"}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) select query on a Cassandra cluster. Query result " + + "may be converted to Avro or JSON format. Streaming is used so arbitrarily large result sets are supported. This processor can be " + + "scheduled to run on a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + + "select query. FlowFile attribute 'executecql.row.count' indicates how many rows were selected.") +@WritesAttributes({ + @WritesAttribute(attribute = "executecql.row.count", description = "The number of rows returned by the CQL query"), + @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this " + + "attribute will not be populated."), + @WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + + "FlowFiles were produced") +}) +public class QueryCassandra extends AbstractCassandraProcessor { + + private static final CassandraTypeConverter typeConverter = new StandardCassandraTypeConverter(); + + public static final String AVRO_FORMAT = "Avro"; + public static final String JSON_FORMAT = "Json"; Review Comment: This is an artifact of the historical implementation that should be redesigned. Record Writers support configurable output strategies. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java: ########## @@ -0,0 +1,715 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; +import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder; +import com.datastax.oss.driver.api.core.servererrors.QueryExecutionException; +import com.datastax.oss.driver.api.core.servererrors.QueryValidationException; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.ListType; +import com.datastax.oss.driver.api.core.type.MapType; +import com.datastax.oss.driver.api.core.type.SetType; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.commons.text.StringEscapeUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.cassandra.converter.CassandraTypeConverter; +import org.apache.nifi.processors.cassandra.converter.StandardCassandraTypeConverter; +import org.apache.nifi.util.StopWatch; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.time.Duration; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +@Tags({"cassandra", "cql", "select"}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) select query on a Cassandra cluster. Query result " + + "may be converted to Avro or JSON format. Streaming is used so arbitrarily large result sets are supported. This processor can be " + + "scheduled to run on a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + + "select query. FlowFile attribute 'executecql.row.count' indicates how many rows were selected.") +@WritesAttributes({ + @WritesAttribute(attribute = "executecql.row.count", description = "The number of rows returned by the CQL query"), + @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this " + + "attribute will not be populated."), + @WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + + "FlowFiles were produced") Review Comment: All long strings for documentation should use multi-lines instead of concatenation. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java: ########## @@ -0,0 +1,715 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; +import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder; +import com.datastax.oss.driver.api.core.servererrors.QueryExecutionException; +import com.datastax.oss.driver.api.core.servererrors.QueryValidationException; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.ListType; +import com.datastax.oss.driver.api.core.type.MapType; +import com.datastax.oss.driver.api.core.type.SetType; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.commons.text.StringEscapeUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.cassandra.converter.CassandraTypeConverter; +import org.apache.nifi.processors.cassandra.converter.StandardCassandraTypeConverter; +import org.apache.nifi.util.StopWatch; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.time.Duration; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +@Tags({"cassandra", "cql", "select"}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) select query on a Cassandra cluster. Query result " + + "may be converted to Avro or JSON format. Streaming is used so arbitrarily large result sets are supported. This processor can be " + + "scheduled to run on a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + + "select query. FlowFile attribute 'executecql.row.count' indicates how many rows were selected.") +@WritesAttributes({ + @WritesAttribute(attribute = "executecql.row.count", description = "The number of rows returned by the CQL query"), + @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this " + + "attribute will not be populated."), + @WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + + "FlowFiles were produced") +}) +public class QueryCassandra extends AbstractCassandraProcessor { + + private static final CassandraTypeConverter typeConverter = new StandardCassandraTypeConverter(); + + public static final String AVRO_FORMAT = "Avro"; + public static final String JSON_FORMAT = "Json"; + + public static final String RESULT_ROW_COUNT = "executecql.row.count"; + + public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); + public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); + public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key(); + + private static final byte[] OPEN_OBJ = "{".getBytes(StandardCharsets.UTF_8); + private static final byte[] CLOSE_OBJ = "}".getBytes(StandardCharsets.UTF_8); + private static final byte[] COMMA = ",".getBytes(StandardCharsets.UTF_8); + private static final byte[] COLON = ":".getBytes(StandardCharsets.UTF_8); + private static final byte[] QUOTE = "\"".getBytes(StandardCharsets.UTF_8); Review Comment: This getBytes approach appears wrong, these should be individual characters, not byte arrays. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/converter/StandardCassandraTypeConverter.java: ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra.converter; + +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.DataTypes; +import com.datastax.oss.driver.api.core.type.ListType; +import com.datastax.oss.driver.api.core.type.MapType; +import com.datastax.oss.driver.api.core.type.SetType; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.List; + +public class StandardCassandraTypeConverter implements CassandraTypeConverter { + + @Override + public Object getCassandraObject(final Row row, final int index) { + final DataType dataType = row.getColumnDefinitions().get(index).getType(); + + try { + if (dataType.equals(DataTypes.BLOB)) { + return row.getByteBuffer(index); + + } else if (dataType.equals(DataTypes.VARINT) || dataType.equals(DataTypes.DECIMAL)) { + Object obj = row.getObject(index); + return obj != null ? obj.toString() : null; + + } else if (dataType.equals(DataTypes.BOOLEAN)) { + return row.getBoolean(index); + + } else if (dataType.equals(DataTypes.INT)) { + return row.getInt(index); + + } else if (dataType.equals(DataTypes.BIGINT) || dataType.equals(DataTypes.COUNTER)) { + return row.getLong(index); + + } else if (dataType.equals(DataTypes.ASCII) || dataType.equals(DataTypes.TEXT)) { + return row.getString(index); + + } else if (dataType.equals(DataTypes.FLOAT)) { + return row.getFloat(index); + + } else if (dataType.equals(DataTypes.DOUBLE)) { + return row.getDouble(index); + + } else if (dataType.equals(DataTypes.TIMESTAMP)) { + Instant instant = row.getInstant(index); + return instant != null ? instant.toString() : null; + } else if (dataType.equals(DataTypes.DATE)) { + LocalDate localDate = row.getLocalDate(index); + return localDate != null ? localDate.toString() : null; + } else if (dataType.equals(DataTypes.TIME)) { + LocalTime time = row.getLocalTime(index); + return time != null ? time.toString() : null; + } else if (dataType instanceof ListType) { + ListType listType = (ListType) dataType; + Class<?> elementClass = getJavaClassForCassandraType(listType.getElementType()); + return row.getList(index, elementClass); + + } else if (dataType instanceof SetType) { + SetType setType = (SetType) dataType; + Class<?> elementClass = getJavaClassForCassandraType(setType.getElementType()); + return row.getSet(index, elementClass); + + } else if (dataType instanceof MapType) { + MapType mapType = (MapType) dataType; + Class<?> keyClass = getJavaClassForCassandraType(mapType.getKeyType()); + Class<?> valueClass = getJavaClassForCassandraType(mapType.getValueType()); + return row.getMap(index, keyClass, valueClass); + } + + // Fallback for any other types + Object value = row.getObject(index); + return value != null ? value.toString() : null; + + } catch (Exception e) { + return row.getObject(index) != null ? row.getObject(index).toString() : null; + } + } + + @Override + public Class<?> getJavaClassForCassandraType(final DataType type) { + if (type.equals(DataTypes.ASCII) || type.equals(DataTypes.TEXT)) { + return String.class; + } else if (type.equals(DataTypes.INT)) { + return Integer.class; + } else if (type.equals(DataTypes.BIGINT) || type.equals(DataTypes.COUNTER)) { + return Long.class; + } else if (type.equals(DataTypes.BOOLEAN)) { + return Boolean.class; + } else if (type.equals(DataTypes.FLOAT)) { + return Float.class; + } else if (type.equals(DataTypes.DOUBLE)) { + return Double.class; + } else if (type.equals(DataTypes.VARINT) || type.equals(DataTypes.DECIMAL)) { + return String.class; + } else if (type.equals(DataTypes.UUID) || type.equals(DataTypes.TIMEUUID)) { + return java.util.UUID.class; + } else if (type.equals(DataTypes.TIMESTAMP)) { + return java.time.Instant.class; + } else if (type.equals(DataTypes.DATE)) { + return java.time.LocalDate.class; + } else if (type.equals(DataTypes.TIME)) { + return Long.class; + } else if (type.equals(DataTypes.BLOB)) { + return java.nio.ByteBuffer.class; Review Comment: Qualified class name references should be avoided in favor of standard imports. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.oss.driver.api.core.CqlSession; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.cassandra.CassandraSessionProviderService; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * AbstractCassandraProcessor is a base class for Cassandra processors and contains logic and variables common to most + * processors integrating with Apache Cassandra. + */ +public abstract class AbstractCassandraProcessor extends AbstractProcessor { + + static final PropertyDescriptor CONNECTION_PROVIDER_SERVICE = new PropertyDescriptor.Builder() + .name("Cassandra Connection Provider") + .description("Specifies the Cassandra connection providing controller service to be used to connect to Cassandra cluster.") + .required(true) + .identifiesControllerService(CassandraSessionProviderService.class) + .build(); + + static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("Specifies the character set of the record data.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is transferred to this relationship if the operation completed successfully.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is transferred to this relationship if the operation failed.") + .build(); + + static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("A FlowFile is transferred to this relationship if the operation cannot be completed but attempting " + + "it again may succeed.") + .build(); + + protected static final List<PropertyDescriptor> COMMON_PROPERTY_DESCRIPTORS = List.of( + CONNECTION_PROVIDER_SERVICE, + CHARSET + ); + + protected final AtomicReference<CqlSession> cassandraSession = new AtomicReference<>(null); + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + Set<ValidationResult> results = new HashSet<>(); + + if (!validationContext.getProperty(CONNECTION_PROVIDER_SERVICE).isSet()) { + results.add(new ValidationResult.Builder() + .subject(CONNECTION_PROVIDER_SERVICE.getDisplayName()) + .valid(false) + .explanation("Cassandra Connection Provider must be specified.") + .build()); + } + + return results; + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + final CassandraSessionProviderService sessionProvider = context.getProperty(CONNECTION_PROVIDER_SERVICE) + .asControllerService(CassandraSessionProviderService.class); + cassandraSession.set(sessionProvider.getCassandraSession()); + } + + public void stop(ProcessContext context) { Review Comment: This method should be annotated if it is supposed to be used for lifecycle operations, such as OnStopped. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-services-api/src/main/java/org/apache/nifi/cassandra/CassandraSessionProviderService.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cassandra; + +import com.datastax.oss.driver.api.core.CqlSession; +import org.apache.nifi.controller.ControllerService; + +public interface CassandraSessionProviderService extends ControllerService { + /** + * Obtains a Cassandra session instance + * @return {@link CqlSession} + */ + CqlSession getCassandraSession(); Review Comment: This design approach links the Controller Service interface to the specific DataStax Driver. A different approach should be considered, with the potential for avoiding this coupling. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java: ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.DataTypes; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.cassandra.converter.StandardCassandraTypeConverter; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +/** + * Unit tests for the AbstractCassandraProcessor class + */ +public class AbstractCassandraProcessorTest { Review Comment: Test classes and method do not need the `public` modifier. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java: ########## @@ -0,0 +1,715 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; +import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder; +import com.datastax.oss.driver.api.core.servererrors.QueryExecutionException; +import com.datastax.oss.driver.api.core.servererrors.QueryValidationException; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.ListType; +import com.datastax.oss.driver.api.core.type.MapType; +import com.datastax.oss.driver.api.core.type.SetType; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.commons.text.StringEscapeUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.cassandra.converter.CassandraTypeConverter; +import org.apache.nifi.processors.cassandra.converter.StandardCassandraTypeConverter; +import org.apache.nifi.util.StopWatch; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.time.Duration; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +@Tags({"cassandra", "cql", "select"}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) select query on a Cassandra cluster. Query result " + + "may be converted to Avro or JSON format. Streaming is used so arbitrarily large result sets are supported. This processor can be " + + "scheduled to run on a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + + "select query. FlowFile attribute 'executecql.row.count' indicates how many rows were selected.") +@WritesAttributes({ + @WritesAttribute(attribute = "executecql.row.count", description = "The number of rows returned by the CQL query"), + @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this " + + "attribute will not be populated."), + @WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + + "FlowFiles were produced") +}) +public class QueryCassandra extends AbstractCassandraProcessor { + + private static final CassandraTypeConverter typeConverter = new StandardCassandraTypeConverter(); + + public static final String AVRO_FORMAT = "Avro"; + public static final String JSON_FORMAT = "Json"; + + public static final String RESULT_ROW_COUNT = "executecql.row.count"; + + public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); + public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); + public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key(); + + private static final byte[] OPEN_OBJ = "{".getBytes(StandardCharsets.UTF_8); + private static final byte[] CLOSE_OBJ = "}".getBytes(StandardCharsets.UTF_8); + private static final byte[] COMMA = ",".getBytes(StandardCharsets.UTF_8); + private static final byte[] COLON = ":".getBytes(StandardCharsets.UTF_8); + private static final byte[] QUOTE = "\"".getBytes(StandardCharsets.UTF_8); + + private static final byte[] RESULTS_PREFIX = "{\"results\":[".getBytes(StandardCharsets.UTF_8); + private static final byte[] EMPTY_RESULTS = "{\"results\":[]}".getBytes(StandardCharsets.UTF_8); + private static final byte[] RESULTS_SUFFIX = "]}".getBytes(StandardCharsets.UTF_8); + public static final PropertyDescriptor CQL_SELECT_QUERY = new PropertyDescriptor.Builder() + .name("CQL Select Query") + .description("CQL select query") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder() + .name("Max Wait Time") + .description("The maximum amount of time allowed for a running CQL select query. Must be of format " + + "<duration> <TimeUnit> where <duration> is a non-negative integer and TimeUnit is a supported " + + "Time Unit, such as: millis, secs, mins, hrs, days. A value of zero means there is no limit. ") + .defaultValue("0 seconds") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() + .name("Fetch Size") + .description("The number of result rows to be fetched from the result set at a time. Zero is the default " + + "and means there is no limit.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder() + .name("Max Rows Per Flow File") + .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large " + + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Output Batch Size") + .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows " + + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles " + + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will " + + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The maxvalue.* and fragment.count attributes will not be set on FlowFiles when this " + + "property is set.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor OUTPUT_FORMAT = new PropertyDescriptor.Builder() + .name("Output Format") + .description("The format to which the result rows will be converted. If JSON is selected, the output will " + + "contain an object with field 'results' containing an array of result rows. Each row in the array is a " + + "map of the named column to its value. For example: { \"results\": [{\"userid\":1, \"name\":\"Joe Smith\"}]}") + .required(true) + .allowableValues(AVRO_FORMAT, JSON_FORMAT) + .defaultValue(AVRO_FORMAT) + .build(); + + public static final PropertyDescriptor TIMESTAMP_FORMAT_PATTERN = new PropertyDescriptor.Builder() + .name("Timestamp Format Pattern for JSON output") + .description("Pattern to use when converting timestamp fields to JSON. Note: the formatted timestamp will be in UTC timezone.") + .required(true) + .defaultValue("yyyy-MM-dd HH:mm:ssZ") + .addValidator((subject, input, context) -> { + final ValidationResult.Builder vrb = new ValidationResult.Builder().subject(subject).input(input); + try { + new SimpleDateFormat(input).format(new Date()); + vrb.valid(true).explanation("Valid date format pattern"); + } catch (Exception ex) { + vrb.valid(false).explanation("the pattern is invalid: " + ex.getMessage()); + } + return vrb.build(); + }) + .build(); + + private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = + Stream.concat( + COMMON_PROPERTY_DESCRIPTORS.stream(), + Stream.of( + CQL_SELECT_QUERY, + QUERY_TIMEOUT, + FETCH_SIZE, + MAX_ROWS_PER_FLOW_FILE, + OUTPUT_BATCH_SIZE, + OUTPUT_FORMAT, + TIMESTAMP_FORMAT_PATTERN + ) + ).toList(); + + private static final Set<Relationship> RELATIONSHIPS = Set.of( + REL_SUCCESS, + REL_FAILURE, + REL_RETRY + ); + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + @OnScheduled + @Override + public void onScheduled(final ProcessContext context) { + super.onScheduled(context); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { Review Comment: This is a very large and complicated method that needs to be refactored significantly ########## pom.xml: ########## @@ -496,7 +501,17 @@ <dependency> <groupId>org.xerial.snappy</groupId> <artifactId>snappy-java</artifactId> - <version>1.1.10.8</version> + <version>${snappy-java.version}</version> + </dependency> + <dependency> + <groupId>org.lz4</groupId> + <artifactId>lz4-java</artifactId> + <version>${lz4-java.version}</version> + </dependency> Review Comment: This dependency is banned. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-services-api/pom.xml: ########## @@ -0,0 +1,47 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-cassandra-bundle</artifactId> + <version>2.9.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>nifi-cassandra-services-api</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.cassandra</groupId> + <artifactId>java-driver-core</artifactId> + <version>${datastax.driver.version}</version> + </dependency> + <dependency> + <groupId>org.lz4</groupId> + <artifactId>lz4-java</artifactId> + </dependency> Review Comment: This dependency is banned. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java: ########## @@ -0,0 +1,715 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; +import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder; +import com.datastax.oss.driver.api.core.servererrors.QueryExecutionException; +import com.datastax.oss.driver.api.core.servererrors.QueryValidationException; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.ListType; +import com.datastax.oss.driver.api.core.type.MapType; +import com.datastax.oss.driver.api.core.type.SetType; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.commons.text.StringEscapeUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.cassandra.converter.CassandraTypeConverter; +import org.apache.nifi.processors.cassandra.converter.StandardCassandraTypeConverter; +import org.apache.nifi.util.StopWatch; +import org.jetbrains.annotations.NotNull; Review Comment: JetBrains annotations should be avoided. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java: ########## @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.service; + +import com.datastax.oss.driver.api.core.ConsistencyLevel; +import com.datastax.oss.driver.api.core.CqlIdentifier; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; +import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.cassandra.CassandraSessionProviderService; +import org.apache.nifi.cassandra.CompressionType; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; + +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import javax.net.ssl.SSLContext; + +@Tags({"cassandra", "dbcp", "database", "connection", "pooling"}) +@CapabilityDescription("Provides connection session for Cassandra processors to work with Apache Cassandra.") +public class CassandraSessionProvider extends AbstractControllerService implements CassandraSessionProviderService { + + public static final int DEFAULT_CASSANDRA_PORT = 9042; + + private static final String[] CONSISTENCY_LEVELS = { + ConsistencyLevel.ANY.name(), + ConsistencyLevel.ONE.name(), + ConsistencyLevel.TWO.name(), + ConsistencyLevel.THREE.name(), + ConsistencyLevel.QUORUM.name(), + ConsistencyLevel.ALL.name(), + ConsistencyLevel.LOCAL_ONE.name(), + ConsistencyLevel.LOCAL_QUORUM.name(), + ConsistencyLevel.EACH_QUORUM.name(), + ConsistencyLevel.SERIAL.name(), + ConsistencyLevel.LOCAL_SERIAL.name() + }; + + // Common descriptors + public static final PropertyDescriptor CONTACT_POINTS = new PropertyDescriptor.Builder() + .name("Cassandra Contact Points") + .description("Contact points are addresses of Cassandra nodes. The list of contact points should be " + + "comma-separated and in hostname:port format. Example node1:port,node2:port,...." + + " The default client port for Cassandra is 9042, but the port(s) must be explicitly specified.") + .required(true) + .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .build(); + + public static final PropertyDescriptor KEYSPACE = new PropertyDescriptor.Builder() + .name("Keyspace") + .description("The Cassandra Keyspace to connect to. If no keyspace is specified, the query will need to " + + "include the keyspace name before any table reference, in case of 'query' native processors or " + + "if the processor supports the 'Table' property, the keyspace name has to be provided with the " + + "table name in the form of <KEYSPACE>.<TABLE>") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The SSL Context Service used to provide client certificate information for TLS/SSL " + + "connections.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .name("Username") + .description("Username to access the Cassandra cluster") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("Password to access the Cassandra cluster") + .required(false) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CONSISTENCY_LEVEL = new PropertyDescriptor.Builder() + .name("Consistency Level") + .description("The strategy for how many replicas must respond before results are returned.") + .required(true) + .allowableValues(CONSISTENCY_LEVELS) + .defaultValue(ConsistencyLevel.QUORUM.name()) + .build(); + + static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder() + .name("Compression Type") + .description("Enable compression at transport-level requests and responses. Snappy compression is not supported in Protocol V5.") + .required(false) + .allowableValues(CompressionType.class) + .defaultValue(CompressionType.NONE.getValue()) + .build(); + + static final PropertyDescriptor READ_TIMEOUT_MS = new PropertyDescriptor.Builder() + .name("Read Timeout (ms)") Review Comment: This convention is not correct, duration validators and syntax should be used. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java: ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.DataTypes; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.cassandra.converter.StandardCassandraTypeConverter; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +/** + * Unit tests for the AbstractCassandraProcessor class + */ +public class AbstractCassandraProcessorTest { + + MockAbstractCassandraProcessor processor; + private TestRunner testRunner; + private final StandardCassandraTypeConverter typeConverter = new StandardCassandraTypeConverter(); + + @BeforeEach + public void setUp() throws Exception { + processor = new MockAbstractCassandraProcessor(); + testRunner = TestRunners.newTestRunner(processor); + } + + @Test + public void testCustomValidate() throws Exception { + testRunner.assertNotValid(); + + final CqlSession mockSession = mock(CqlSession.class); + final MockCassandraSessionProviderService sessionProvider = new MockCassandraSessionProviderService(mockSession); + testRunner.addControllerService("cassandra-session-provider", sessionProvider); + testRunner.enableControllerService(sessionProvider); + testRunner.setProperty(AbstractCassandraProcessor.CONNECTION_PROVIDER_SERVICE, "cassandra-session-provider"); Review Comment: Strings such as the service identifier should be declared once and reused across methods. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-services-nar/src/main/resources/META-INF/LICENSE: ########## @@ -0,0 +1,352 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +APACHE NIFI SUBCOMPONENTS: Review Comment: This license file is incorrect and contains references that are not applicable. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService: ########## @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.service.CassandraSessionProvider Review Comment: This should be changed to a `cassandra` subpackage. ########## nifi-extension-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/MockCassandraProcessor.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.service; + +import org.apache.nifi.cassandra.CassandraSessionProviderService; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Collections; +import java.util.List; + +/** + * Mock Cassandra processor for testing CassandraSessionProvider + */ +public class MockCassandraProcessor extends AbstractProcessor { Review Comment: What is the reason for this class, as opposed to just using `Mockito`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
