http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/hibernate/Person.hbm.xml ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/hibernate/Person.hbm.xml b/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/hibernate/Person.hbm.xml new file mode 100644 index 0000000..ba07a84 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/hibernate/Person.hbm.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + ~ /* + ~ * Licensed to the Apache Software Foundation (ASF) under one or more + ~ * contributor license agreements. See the NOTICE file distributed with + ~ * this work for additional information regarding copyright ownership. + ~ * The ASF licenses this file to You under the Apache License, Version 2.0 + ~ * (the "License"); you may not use this file except in compliance with + ~ * the License. You may obtain a copy of the License at + ~ * + ~ * http://www.apache.org/licenses/LICENSE-2.0 + ~ * + ~ * Unless required by applicable law or agreed to in writing, software + ~ * distributed under the License is distributed on an "AS IS" BASIS, + ~ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ * See the License for the specific language governing permissions and + ~ * limitations under the License. + ~ */ + --> + + +<!DOCTYPE hibernate-mapping PUBLIC + "-//Hibernate/Hibernate Mapping DTD 3.0//EN" + "http://www.hibernate.org/dtd/hibernate-mapping-3.0.dtd"> + +<hibernate-mapping default-access="field"> + <class name="org.apache.ignite.scalar.examples.datagrid.store.Person" table="PERSONS"> + <!-- ID. --> + <id name="id"/> + + <!-- We only map data we are interested in. --> + <property name="firstName"/> + <property name="lastName"/> + </class> +</hibernate-mapping>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/hibernate/ScalarCacheHibernatePersonStore.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/hibernate/ScalarCacheHibernatePersonStore.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/hibernate/ScalarCacheHibernatePersonStore.scala new file mode 100644 index 0000000..91c2c07 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/hibernate/ScalarCacheHibernatePersonStore.scala @@ -0,0 +1,268 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.datagrid.store.hibernate + +import org.apache.ignite.cache.store.{CacheStore, CacheStoreAdapter, CacheStoreSession} +import org.apache.ignite.lang.IgniteBiInClosure +import org.apache.ignite.resources.CacheStoreSessionResource +import org.apache.ignite.scalar.examples.datagrid.store._ +import org.apache.ignite.transactions.Transaction + +import org.hibernate.cfg.Configuration +import org.hibernate.{FlushMode, HibernateException, Session} +import org.jetbrains.annotations.Nullable + +import javax.cache.Cache +import javax.cache.integration.{CacheLoaderException, CacheWriterException} +import java.lang.{Long => JavaLong} +import java.util.UUID + +/** + * Example of [[CacheStore]] implementation that uses Hibernate + * and deals with maps [[UUID]] to [[Person]]. + */ +class ScalarCacheHibernatePersonStore extends CacheStoreAdapter[JavaLong, Person]{ + /** Default hibernate configuration resource path. */ + private val DFLT_HIBERNATE_CFG = "/org/apache/ignite/scalar/examples/datagrid/store/hibernate/hibernate.cfg.xml" + + /** Session attribute name. */ + private val ATTR_SES = "HIBERNATE_STORE_SESSION" + + /** Session factory. */ + private val sesFactory = new Configuration().configure(DFLT_HIBERNATE_CFG).buildSessionFactory + + /** Auto-injected store session. */ + @CacheStoreSessionResource private var ses: CacheStoreSession = null + + def load(key: JavaLong): Person = { + val tx = transaction + + println(">>> Store load [key=" + key + ", xid=" + (if (tx == null) null else tx.xid) + ']') + + val ses = session(tx) + + try { + ses.get(classOf[Person], key).asInstanceOf[Person] + } + catch { + case e: HibernateException => + rollback(ses, tx) + + throw new CacheLoaderException("Failed to load value from cache store with key: " + key, e) + } + finally { + end(ses, tx) + } + } + + def write(entry: Cache.Entry[_ <: JavaLong, _ <: Person]) { + val tx = transaction + + val key = entry.getKey + + val value = entry.getValue + + println(">>> Store put [key=" + key + ", val=" + value + ", xid=" + (if (tx == null) null else tx.xid) + ']') + + if (value == null) + delete(key) + else { + val ses = session(tx) + + try { + ses.saveOrUpdate(value) + } + catch { + case e: HibernateException => + rollback(ses, tx) + + throw new CacheWriterException("Failed to put value to cache store [key=" + key + ", val" + value + "]", e) + } + finally { + end(ses, tx) + } + } + } + + @SuppressWarnings(Array("JpaQueryApiInspection")) def delete(key: Object) { + val tx = transaction + + println(">>> Store remove [key=" + key + ", xid=" + (if (tx == null) null else tx.xid) + ']') + + val ses = session(tx) + + try { + ses.createQuery("delete " + classOf[Person].getSimpleName + " where key = :key") + .setParameter("key", key).setFlushMode(FlushMode.ALWAYS).executeUpdate + } + catch { + case e: HibernateException => + rollback(ses, tx) + + throw new CacheWriterException("Failed to remove value from cache store with key: " + key, e) + } + finally { + end(ses, tx) + } + } + + override def loadCache(clo: IgniteBiInClosure[JavaLong, Person], args: AnyRef*) { + if (args == null || args.length == 0 || args(0) == null) + throw new CacheLoaderException("Expected entry count parameter is not provided.") + + val entryCnt = args(0).asInstanceOf[Integer] + + val ses = session(null) + + try { + var cnt = 0 + + val res = ses.createCriteria(classOf[Person]).list + + if (res != null) { + val iter = res.iterator + + while (cnt < entryCnt && iter.hasNext) { + val person = iter.next.asInstanceOf[Person] + + clo.apply(person.getId, person) + + cnt += 1 + } + } + println(">>> Loaded " + cnt + " values into cache.") + } + catch { + case e: HibernateException => + throw new CacheLoaderException("Failed to load values from cache store.", e) + } + finally { + end(ses, null) + } + } + + /** + * Rolls back hibernate session. + * + * @param ses Hibernate session. + * @param tx Cache ongoing transaction. + */ + private def rollback(ses: Session, tx: Transaction) { + if (tx == null) { + val hTx = ses.getTransaction + + if (hTx != null && hTx.isActive) + hTx.rollback() + } + } + + /** + * Ends hibernate session. + * + * @param ses Hibernate session. + * @param tx Cache ongoing transaction. + */ + private def end(ses: Session, @Nullable tx: Transaction) { + if (tx == null) { + val hTx = ses.getTransaction + + if (hTx != null && hTx.isActive) + hTx.commit() + + ses.close + } + } + + override def sessionEnd(commit: Boolean) { + val tx = ses.transaction + + val props = ses.properties + + val session: Session = props.remove(ATTR_SES) + + if (session != null) { + val hTx = session.getTransaction + + if (hTx != null) { + try { + if (commit) { + session.flush() + + hTx.commit() + } + else + hTx.rollback() + + println("Transaction ended [xid=" + tx.xid + ", commit=" + commit + ']') + } + catch { + case e: HibernateException => + throw new CacheWriterException("Failed to end transaction [xid=" + tx.xid + ", commit=" + commit + ']', e) + } + finally { + session.close + } + } + } + } + + /** + * Gets Hibernate session. + * + * @param tx Cache transaction. + * @return Session. + */ + private def session(@Nullable tx: Transaction): Session = { + var hbSes: Session = null + + if (tx != null) { + val props = ses.properties[String, Session]() + + hbSes = props.get(ATTR_SES) + + if (hbSes == null) { + hbSes = sesFactory.openSession + + hbSes.beginTransaction + + props.put(ATTR_SES, hbSes) + + println("Hibernate session open [ses=" + hbSes + ", tx=" + tx.xid + "]") + } + } + else { + hbSes = sesFactory.openSession + + hbSes.beginTransaction + } + + hbSes + } + + /** + * @return Current transaction. + */ + @Nullable private def transaction: Transaction = { + if (ses != null) + ses.transaction + else + null + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/hibernate/ScalarCacheHibernateStoreExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/hibernate/ScalarCacheHibernateStoreExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/hibernate/ScalarCacheHibernateStoreExample.scala new file mode 100644 index 0000000..cbf7335 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/hibernate/ScalarCacheHibernateStoreExample.scala @@ -0,0 +1,133 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.datagrid.store.hibernate + +import org.apache.ignite.IgniteCache +import org.apache.ignite.cache.CacheAtomicityMode._ +import org.apache.ignite.configuration.CacheConfiguration +import org.apache.ignite.examples.{ExampleNodeStartup, ExamplesUtils} +import org.apache.ignite.scalar.examples.datagrid.store._ +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import javax.cache.configuration.FactoryBuilder +import java.lang.{Long => JavaLong} +import java.util.UUID + +/** + * Demonstrates usage of cache with underlying persistent store configured. + * <p> + * This example uses [[ScalarCacheHibernatePersonStore]] as a persistent store. + * <p> + * Remote nodes can be started with [[ExampleNodeStartup]] in another JVM which will + * start node with `examples/config/example-ignite.xml` configuration. + */ +object ScalarCacheHibernateStoreExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + /** Cache name. */ + private val CACHE_NAME = ScalarCacheHibernateStoreExample.getClass.getSimpleName + + /** Heap size required to run this example. */ + val MIN_MEMORY = 1024 * 1024 * 1024 + + /** Number of entries to load. */ + private val ENTRY_COUNT = 100000 + + /** Global person ID to use across entire example. */ + private val id = Math.abs(UUID.randomUUID.getLeastSignificantBits) + + ExamplesUtils.checkMinMemory(MIN_MEMORY) + + scalar(CONFIG) { + println() + println(">>> Cache store example started.") + + val cacheCfg = new CacheConfiguration[JavaLong, Person](CACHE_NAME) + + // Set atomicity as transaction, since we are showing transactions in example. + cacheCfg.setAtomicityMode(TRANSACTIONAL) + + // Configure Hibernate store. + cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(classOf[ScalarCacheHibernatePersonStore])) + + cacheCfg.setReadThrough(true) + cacheCfg.setWriteThrough(true) + + val cache = createCache$(cacheCfg) + + try { + loadCache(cache) + + executeTransaction(cache) + } + finally { + if (cache != null) + cache.close() + } + } + + /** + * Makes initial cache loading. + * + * @param cache Cache to load. + */ + private def loadCache(cache: IgniteCache[JavaLong, Person]) { + val start = System.currentTimeMillis + + cache.loadCache(null, Integer.valueOf(ENTRY_COUNT)) + + val end = System.currentTimeMillis + + println(">>> Loaded " + cache.size() + " keys with backups in " + (end - start) + "ms.") + } + + /** + * Executes transaction with read/write-through to persistent store. + * + * @param cache Cache to execute transaction on. + */ + private def executeTransaction(cache: IgniteCache[JavaLong, Person]) { + val tx = transaction$() + + try { + var value = cache.get(id) + + println("Read value: " + value) + + value = cache.getAndPut(id, new Person(id, "Isaac", "Newton")) + + println("Overwrote old value: " + value) + + value = cache.get(id) + + println("Read value: " + value) + + tx.commit() + } + finally { + if (tx != null) + tx.close() + } + + println("Read value after commit: " + cache.get(id)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/hibernate/hibernate.cfg.xml ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/hibernate/hibernate.cfg.xml b/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/hibernate/hibernate.cfg.xml new file mode 100644 index 0000000..86cfda4 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/hibernate/hibernate.cfg.xml @@ -0,0 +1,43 @@ +<?xml version='1.0' encoding='utf-8'?> + +<!-- + ~ /* + ~ * Licensed to the Apache Software Foundation (ASF) under one or more + ~ * contributor license agreements. See the NOTICE file distributed with + ~ * this work for additional information regarding copyright ownership. + ~ * The ASF licenses this file to You under the Apache License, Version 2.0 + ~ * (the "License"); you may not use this file except in compliance with + ~ * the License. You may obtain a copy of the License at + ~ * + ~ * http://www.apache.org/licenses/LICENSE-2.0 + ~ * + ~ * Unless required by applicable law or agreed to in writing, software + ~ * distributed under the License is distributed on an "AS IS" BASIS, + ~ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ * See the License for the specific language governing permissions and + ~ * limitations under the License. + ~ */ + --> + +<!DOCTYPE hibernate-configuration PUBLIC + "-//Hibernate/Hibernate Configuration DTD 3.0//EN" + "http://www.hibernate.org/dtd/hibernate-configuration-3.0.dtd"> + +<!-- + Hibernate configuration. +--> +<hibernate-configuration> + <session-factory> + <!-- Database connection settings (private in-memory database). --> + <property name="connection.url">jdbc:h2:mem:example;DB_CLOSE_DELAY=-1</property> + + <!-- Only validate the database schema on startup in production mode. --> + <property name="hbm2ddl.auto">update</property> + + <!-- Do not output SQL. --> + <property name="show_sql">false</property> + + <!-- Mappings. --> + <mapping resource="org/apache/ignite/scalar/examples/datagrid/store/hibernate/Person.hbm.xml"/> + </session-factory> +</hibernate-configuration> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/jdbc/ScalarCacheJdbcPersonStore.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/jdbc/ScalarCacheJdbcPersonStore.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/jdbc/ScalarCacheJdbcPersonStore.scala new file mode 100644 index 0000000..9dcf15f --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/jdbc/ScalarCacheJdbcPersonStore.scala @@ -0,0 +1,311 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.datagrid.store.jdbc + +import org.apache.ignite.IgniteException +import org.apache.ignite.cache.store.{CacheStore, CacheStoreAdapter, CacheStoreSession} +import org.apache.ignite.lang.IgniteBiInClosure +import org.apache.ignite.resources.CacheStoreSessionResource +import org.apache.ignite.scalar.examples.datagrid.store._ + +import org.jetbrains.annotations.Nullable + +import javax.cache.Cache +import javax.cache.integration.{CacheLoaderException, CacheWriterException} +import java.lang.{Long => JavaLong} +import java.sql._ + +/** + * Example of [[CacheStore]] implementation that uses JDBC + * transaction with cache transactions and maps [[java.lang.Long]] to [[Person]]. + * + */ +class ScalarCacheJdbcPersonStore extends CacheStoreAdapter[JavaLong, Person] { + /** Transaction metadata attribute name. */ + private val ATTR_NAME = "SIMPLE_STORE_CONNECTION" + + /** Auto-injected store session. */ + @CacheStoreSessionResource private var ses: CacheStoreSession = null + + prepareDb() + + /** + * Prepares database for example execution. This method will create a + * table called "PERSONS" so it can be used by store implementation. + */ + private def prepareDb() { + val conn = openConnection(false) + + val st = conn.createStatement + + try { + st.execute("create table if not exists PERSONS (id number unique, firstName varchar(255), " + "lastName varchar(255))") + + conn.commit() + } + catch { + case e: SQLException => + throw new IgniteException("Failed to create database table.", e) + } + finally { + if (conn != null) + conn.close() + if (st != null) + st.close() + } + } + + override def sessionEnd(commit: Boolean) { + val props = ses.properties[String, Connection]() + + val conn: Connection = props.remove(ATTR_NAME) + + try { + if (conn != null) { + if (commit) + conn.commit() + else + conn.rollback() + } + + println(">>> Transaction ended [commit=" + commit + ']') + } + catch { + case e: SQLException => + throw new CacheWriterException("Failed to end transaction: " + ses.transaction, e) + } + finally { + if (conn != null) conn.close() + } + } + + override def load(key: JavaLong): Person = { + println(">>> Loading key: " + key) + + var conn: Connection = null + + try { + conn = connection() + + val st = conn.prepareStatement("select * from PERSONS where id=?") + + try { + st.setString(1, key.toString) + + val rs = st.executeQuery + + if (rs.next) + return new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) + } + finally { + if (st != null) + st.close() + } + } + catch { + case e: SQLException => + throw new CacheLoaderException("Failed to load object: " + key, e) + } + finally { + end(conn) + } + + null + } + + def write(entry: Cache.Entry[_ <: JavaLong, _ <: Person]) { + val key = entry.getKey + + val value = entry.getValue + + println(">>> Putting [key=" + key + ", val=" + value + ']') + + var conn: Connection = null + + try { + conn = connection() + + var updated = 0 + + val st = conn.prepareStatement("update PERSONS set firstName=?, lastName=? where id=?") + + try { + st.setString(1, value.getFirstName) + st.setString(2, value.getLastName) + st.setLong(3, value.getId) + + updated = st.executeUpdate + } + finally { + if (st != null) + st.close() + } + + if (updated == 0) { + val st = conn.prepareStatement("insert into PERSONS (id, firstName, lastName) values(?, ?, ?)") + + try { + st.setLong(1, value.getId) + st.setString(2, value.getFirstName) + st.setString(3, value.getLastName) + + st.executeUpdate + } + finally { + if (st != null) + st.close() + } + } + } + catch { + case e: SQLException => + throw new CacheLoaderException("Failed to put object [key=" + key + ", val=" + value + ']', e) + } + finally { + end(conn) + } + } + + def delete(key: AnyRef) { + println(">>> Removing key: " + key) + + var conn: Connection = null + + try { + conn = connection() + + val st = conn.prepareStatement("delete from PERSONS where id=?") + + try { + st.setLong(1, key.asInstanceOf[JavaLong]) + + st.executeUpdate + } + finally { + if (st != null) + st.close() + } + } + catch { + case e: SQLException => + throw new CacheWriterException("Failed to remove object: " + key, e) + } + finally { + end(conn) + } + } + + override def loadCache(clo: IgniteBiInClosure[JavaLong, Person], args: AnyRef*) { + if (args == null || args.length == 0 || args(0) == null) + throw new CacheLoaderException("Expected entry count parameter is not provided.") + + val entryCnt = args(0).asInstanceOf[Integer] + + val conn = connection() + + try { + val st = conn.prepareStatement("select * from PERSONS") + + try { + val rs = st.executeQuery + + try { + var cnt = 0 + + while (cnt < entryCnt && rs.next) { + val person = new Person(rs.getLong(1), rs.getString(2), rs.getString(3)) + + clo.apply(person.getId, person) + + cnt += 1 + } + + println(">>> Loaded " + cnt + " values into cache.") + } + finally { + if (rs != null) + rs.close() + } + } + finally { + if (st != null) + st.close() + } + } + catch { + case e: SQLException => + throw new CacheLoaderException("Failed to load values from cache store.", e) + } + finally { + if (conn != null) conn.close() + } + } + + /** + * @return Connection. + */ + private def connection(): Connection = { + if (ses.isWithinTransaction) { + val props = ses.properties[AnyRef, AnyRef]() + + var conn = props.get(ATTR_NAME).asInstanceOf[Connection] + + if (conn == null) { + conn = openConnection(false) + + props.put(ATTR_NAME, conn) + } + + conn + } + else + openConnection(true) + } + + /** + * Closes allocated resources depending on transaction status. + * + * @param conn Allocated connection. + */ + private def end(@Nullable conn: Connection) { + if (!ses.isWithinTransaction && conn != null) { + try { + conn.close() + } + catch { + case ignored: SQLException => + } + } + } + + /** + * Gets connection from a pool. + * + * @param autocommit { @code true} If connection should use autocommit mode. + * @return Pooled connection. + */ + private def openConnection(autocommit: Boolean): Connection = { + val conn = DriverManager.getConnection("jdbc:h2:mem:example;DB_CLOSE_DELAY=-1") + + conn.setAutoCommit(autocommit) + + conn + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/jdbc/ScalarCacheJdbcStoreExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/jdbc/ScalarCacheJdbcStoreExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/jdbc/ScalarCacheJdbcStoreExample.scala new file mode 100644 index 0000000..baa32de --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/datagrid/store/jdbc/ScalarCacheJdbcStoreExample.scala @@ -0,0 +1,159 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.datagrid.store.jdbc + +import org.apache.ignite.IgniteCache +import org.apache.ignite.cache.CacheAtomicityMode._ +import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore +import org.apache.ignite.configuration.CacheConfiguration +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.examples.datagrid.store.auto.DbH2ServerStartup +import org.apache.ignite.scalar.examples.datagrid.store._ +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import javax.cache.configuration.FactoryBuilder +import java.lang.{Long => JavaLong} +import java.util.UUID + +/** + * Demonstrates usage of cache with underlying persistent store configured. + * <p> + * This example uses [[CacheJdbcPojoStore]] as a persistent store. + * <p> + * To start the example, you should: + * <ul> + * <li>Start H2 database TCP server using [[DbH2ServerStartup]].</li> + * <li>Start a few nodes using [[ExampleNodeStartup]] or by starting remote nodes as specified below.</li> + * <li>Start example using [[ScalarCacheJdbcStoreExample]].</li> + * </ul> + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will + * start node with `examples/config/example-ignite.xml` configuration. + */ +object ScalarCacheJdbcStoreExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + /** Cache name. */ + private val CACHE_NAME = ScalarCacheJdbcStoreExample.getClass.getSimpleName + + /** Heap size required to run this example. */ + val MIN_MEMORY = 1024 * 1024 * 1024 + + /** Number of entries to load. */ + private val ENTRY_COUNT = 100000 + + /** Global person ID to use across entire example. */ + private val id = Math.abs(UUID.randomUUID.getLeastSignificantBits) + + scalar(CONFIG) { + println() + println(">>> Cache auto store example started.") + + val cache = createCache$[JavaLong, Person](CacheConfig.jdbcPojoStoreCache(CACHE_NAME)) + + try { + // Make initial cache loading from persistent store. This is a + // distributed operation and will call CacheStore.loadCache(...) + // method on all nodes in topology. + loadCache(cache) + + // Start transaction and execute several cache operations with + // read/write-through to persistent store. + executeTransaction(cache) + } + finally { + if (cache != null) cache.close() + } + } + + /** + * Makes initial cache loading. + * + * @param cache Cache to load. + */ + private def loadCache(cache: IgniteCache[JavaLong, Person]) { + val start = System.currentTimeMillis + + cache.loadCache(null, ENTRY_COUNT.asInstanceOf[Object]) + + val end = System.currentTimeMillis + + println(">>> Loaded " + cache.size() + " keys with backups in " + (end - start) + "ms.") + } + + /** + * Executes transaction with read/write-through to persistent store. + * + * @param cache Cache to execute transaction on. + */ + private def executeTransaction(cache: IgniteCache[JavaLong, Person]) { + val tx = transaction$() + + try { + var value = cache.get(id) + + println("Read value: " + value) + + value = cache.getAndPut(id, new Person(id, "Isaac", "Newton")) + + println("Overwrote old value: " + value) + + value = cache.get(id) + + println("Read value: " + value) + + tx.commit() + } + finally { + if (tx != null) + tx.close() + } + + println("Read value after commit: " + cache.get(id)) + } + + /** + * Predefined configuration for examples with [[CacheJdbcPojoStore]]. + */ + private object CacheConfig { + /** + * Configure cache with store. + */ + def jdbcPojoStoreCache(name: String): CacheConfiguration[JavaLong, Person] = { + val cacheCfg = new CacheConfiguration[JavaLong, Person](CACHE_NAME) + + // Set atomicity as transaction, since we are showing transactions in example. + cacheCfg.setAtomicityMode(TRANSACTIONAL) + + // Configure JDBC store. + cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(classOf[ScalarCacheJdbcPersonStore])) + + cacheCfg.setReadThrough(true) + cacheCfg.setWriteThrough(true) + + cacheCfg + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarExecutorServiceExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarExecutorServiceExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarExecutorServiceExample.scala new file mode 100644 index 0000000..5292e19 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarExecutorServiceExample.scala @@ -0,0 +1,64 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.datastructures + +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import java.util.concurrent.TimeUnit + +/** + * Simple example to demonstrate usage of distributed executor service provided by Ignite. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will start node + * with `examples/config/example-ignite.xml` configuration. + */ +object ScalarExecutorServiceExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + scalar(CONFIG) { + println() + println(">>> Compute executor service example started.") + + // Get ignite-enabled executor service. + val exec = executorService$ + + // Iterate through all words in the sentence and create callable jobs. + "Print words using runnable".split(" ").foreach(word => { + exec.submit(toRunnable(() => { + println() + println(">>> Printing '" + word + "' on this node from ignite job.") + })) + }) + + exec.shutdown() + + // Wait for all jobs to complete (0 means no limit). + exec.awaitTermination(0, TimeUnit.MILLISECONDS) + + println() + println(">>> Check all nodes for output (this node is also part of the cluster).") + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteAtomicLongExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteAtomicLongExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteAtomicLongExample.scala new file mode 100644 index 0000000..bc08bb8 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteAtomicLongExample.scala @@ -0,0 +1,74 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.datastructures + +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import java.util.UUID + +/** + * Demonstrates a simple usage of distributed atomic long. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will + * start node with `examples/config/example-ignite.xml` configuration. + */ +class ScalarIgniteAtomicLongExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + /** Number of retries */ + private val RETRIES = 20 + + scalar(CONFIG) { + println() + println(">>> Atomic long example started.") + + // Make name for atomic long (by which it will be known in the cluster). + val atomicName = UUID.randomUUID.toString + + // Initialize atomic long. + val atomicLong = long$(atomicName, true) + + println() + println("Atomic long initial value : " + atomicLong.get + '.') + + val ignite = ignite$ + + // Try increment atomic long from all nodes. + // Note that this node is also part of the ignite cluster. + ignite.compute.broadcast(toRunnable(() => { + for (i <- 0 until RETRIES) + println("AtomicLong value has been incremented: " + atomicLong.incrementAndGet) + })) + + println() + println("Atomic long value after successful CAS: " + atomicLong.get) + } +} + +/** + * Object to run ScalarIgniteAtomicLongExample. + */ +object ScalarIgniteAtomicLongExampleStartup extends ScalarIgniteAtomicLongExample http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteAtomicSequenceExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteAtomicSequenceExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteAtomicSequenceExample.scala new file mode 100644 index 0000000..7ab26d2 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteAtomicSequenceExample.scala @@ -0,0 +1,69 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.datastructures + +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +/** + * Demonstrates a simple usage of distributed atomic sequence. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will + * start node with `examples/config/example-ignite.xml` configuration. + */ +class ScalarIgniteAtomicSequenceExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + /** Number of retries */ + private val RETRIES = 20 + + scalar(CONFIG) { + println() + println(">>> Cache atomic sequence example started.") + + val seqName = "example-sequence" + + // Try increment atomic sequence on all cluster nodes. Note that this node is also part of the cluster. + ignite$.compute.broadcast(toRunnable(() => { + val seq = atomicSequence$(seqName, 0, true) + + val firstVal = seq.get + + println("Sequence initial value on local node: " + firstVal) + + for (i <- 0 until RETRIES) + println("Sequence [currentValue=" + seq.get + ", afterIncrement=" + seq.incrementAndGet + ']') + + println("Sequence after incrementing [expected=" + (firstVal + RETRIES) + ", actual=" + seq.get + ']') + })) + + println() + println("Finished atomic sequence example...") + println("Check all nodes for output (this node is also part of the cluster).") + println() + } +} + +object ScalarIgniteAtomicSequenceExampleStartup extends ScalarIgniteAtomicSequenceExample http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteAtomicStampedExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteAtomicStampedExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteAtomicStampedExample.scala new file mode 100644 index 0000000..f9df697 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteAtomicStampedExample.scala @@ -0,0 +1,96 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.datastructures + +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.lang.IgniteRunnable +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import java.util.UUID + +/** + * Demonstrates a simple usage of distributed atomic stamped. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `@code 'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will + * start node with `examples/config/example-ignite.xml` configuration. + */ +object ScalarIgniteAtomicStampedExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + scalar(CONFIG) { + println() + println(">>> Atomic stamped example started.") + + // Make name of atomic stamped. + val stampedName = UUID.randomUUID.toString + + // Make value of atomic stamped. + val value = UUID.randomUUID.toString + + // Make stamp of atomic stamped. + val stamp = UUID.randomUUID.toString + + // Initialize atomic stamped. + val stamped = atomicStamped$[String, String](stampedName, value, stamp, true) + + println("Atomic stamped initial [value=" + stamped.value + ", stamp=" + stamped.stamp + ']') + + // Make closure for checking atomic stamped. + val c: IgniteRunnable = () => { + val stamped = atomicStamped$[String, String](stampedName, null, null, true) + + println("Atomic stamped [value=" + stamped.value + ", stamp=" + stamped.stamp + ']') + } + + // Check atomic stamped on all cluster nodes. + ignite$.compute.broadcast(c) + + // Make new value of atomic stamped. + val newVal = UUID.randomUUID.toString + + // Make new stamp of atomic stamped. + val newStamp = UUID.randomUUID.toString + + println("Try to change value and stamp of atomic stamped with wrong expected value and stamp.") + + stamped.compareAndSet("WRONG EXPECTED VALUE", newVal, "WRONG EXPECTED STAMP", newStamp) + + // Check atomic stamped on all cluster nodes. + // Atomic stamped value and stamp shouldn't be changed. + ignite$.compute.run(c) + + println("Try to change value and stamp of atomic stamped with correct value and stamp.") + + stamped.compareAndSet(value, newVal, stamp, newStamp) + + // Check atomic stamped on all cluster nodes. + // Atomic stamped value and stamp should be changed. + ignite$.compute.run(c) + + println() + println("Finished atomic stamped example...") + println("Check all nodes for output (this node is also part of the cluster).") + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteCountDownLatchExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteCountDownLatchExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteCountDownLatchExample.scala new file mode 100644 index 0000000..0a593ef --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteCountDownLatchExample.scala @@ -0,0 +1,75 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.datastructures + +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import java.util.UUID + +/** + * Demonstrates a simple usage of distributed count down latch. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will + * start node with '@code examples/config/example-ignite.xml' configuration. + */ +object ScalarIgniteCountDownLatchExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + /** Number of latch initial count */ + private val INITIAL_COUNT = 10 + + scalar(CONFIG) { + println() + println(">>> Cache atomic countdown latch example started.") + + // Make name of count down latch. + val latchName = UUID.randomUUID.toString + + // Initialize count down latch. + val latch = countDownLatch$(latchName, INITIAL_COUNT, false, true) + + println("Latch initial value: " + latch.count) + + // Start waiting on the latch on all cluster nodes. + for (i <- 0 until INITIAL_COUNT) + ignite$.compute.run(toRunnable(() => { + val latch = countDownLatch$(latchName, 1, false, true) + + val newCnt = latch.countDown + + println("Counted down [newCnt=" + newCnt + ", nodeId=" + cluster$.localNode.id + ']') + })) + + // Wait for latch to go down which essentially means that all remote closures completed. + latch.await() + + println("All latch closures have completed.") + + println() + println("Finished count down latch example...") + println("Check all nodes for output (this node is also part of the cluster).") + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteQueueExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteQueueExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteQueueExample.scala new file mode 100644 index 0000000..fc0f29d --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteQueueExample.scala @@ -0,0 +1,180 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.datastructures + +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.internal.util.scala.impl +import org.apache.ignite.lang.IgniteRunnable +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ +import org.apache.ignite.{Ignite, IgniteException, IgniteQueue} + +import java.util.UUID + +import scala.collection.JavaConversions._ + +/** + * Ignite cache distributed queue example. This example demonstrates `FIFO` unbounded + * cache queue. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat} examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will + * start node with `examples/config/example-ignite.xml` configuration. + */ +class ScalarIgniteQueueExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + /** Number of retries */ + private val RETRIES = 20 + + /** Queue instance. */ + private var queue: IgniteQueue[String] = null + + scalar(CONFIG) { + println() + println(">>> Ignite queue example started.") + + val queueName = UUID.randomUUID.toString + + queue = initializeQueue(ignite$, queueName) + + readFromQueue(ignite$) + + writeToQueue(ignite$) + + clearAndRemoveQueue() + + println("Cache queue example finished.") + } + + /** + * Initialize queue. + * + * @param ignite Ignite. + * @param queueName Name of queue. + * @return Queue. + * @throws IgniteException If execution failed. + */ + @throws(classOf[IgniteException]) + private def initializeQueue(ignite: Ignite, queueName: String): IgniteQueue[String] = { + val queue = queue$[String](queueName) + + for (i <- 0 until (ignite.cluster.nodes.size * RETRIES * 2)) + queue.put(Integer.toString(i)) + + println("Queue size after initializing: " + queue.size) + + queue + } + + /** + * Read items from head and tail of queue. + * + * @param ignite Ignite. + * @throws IgniteException If failed. + */ + @throws(classOf[IgniteException]) + private def readFromQueue(ignite: Ignite) { + val queueName = queue.name + + ignite.compute.broadcast(new QueueClosure(queueName, false)) + + println("Queue size after reading [expected=0, actual=" + queue.size + ']') + } + + /** + * Write items into queue. + * + * @param ignite Ignite. + * @throws IgniteException If failed. + */ + @throws(classOf[IgniteException]) + private def writeToQueue(ignite: Ignite) { + val queueName = queue.name + + ignite.compute.broadcast(new QueueClosure(queueName, true)) + + println("Queue size after writing [expected=" + ignite.cluster.nodes.size * RETRIES + ", actual=" + queue.size + ']') + + println("Iterate over queue.") + + for (item <- queue) + println("Queue item: " + item) + } + + /** + * Clear and remove queue. + * + * @throws IgniteException If execution failed. + */ + @throws(classOf[IgniteException]) + private def clearAndRemoveQueue() { + println("Queue size before clearing: " + queue.size) + + queue.clear() + + println("Queue size after clearing: " + queue.size) + + queue.close() + + try { + queue.poll + } + catch { + case expected: IllegalStateException => + println("Expected exception - " + expected.getMessage) + } + } + + /** + * Closure to populate or poll the queue. + * + * @param queueName Queue name. + * @param put Flag indicating whether to put or poll. + */ + private class QueueClosure(queueName: String, put: Boolean) extends IgniteRunnable { + @impl def run() { + val queue = queue$[String](queueName) + + if (put) { + val locId = cluster$.localNode.id + for (i <- 0 until RETRIES) { + val item = locId + "_" + Integer.toString(i) + + queue.put(item) + + println("Queue item has been added: " + item) + } + } + else { + for (i <- 0 until RETRIES) + println("Queue item has been read from queue head: " + queue.take) + + for (i <- 0 until RETRIES) + println("Queue item has been read from queue head: " + queue.poll) + } + } + } +} + +object ScalarIgniteQueueExampleStartup extends ScalarIgniteQueueExample http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteSetExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteSetExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteSetExample.scala new file mode 100644 index 0000000..b68e337 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/datastructures/ScalarIgniteSetExample.scala @@ -0,0 +1,154 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.datastructures + +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ +import org.apache.ignite.{Ignite, IgniteException, IgniteSet} + +import java.util.UUID + +import scala.collection.JavaConversions._ + +/** + * Ignite cache distributed set example. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: `'ignite.{sh|bat}` examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will + * start node with `examples/config/example-ignite.xml` configuration. + */ +object ScalarIgniteSetExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + /** Set instance. */ + private var set: IgniteSet[String] = null + + scalar(CONFIG) { + println() + println(">>> Ignite set example started.") + + val setName = UUID.randomUUID.toString + + set = initializeSet(ignite$, setName) + + writeToSet(ignite$) + + clearAndRemoveSet() + + println("Ignite set example finished.") + } + + /** + * Initialize set. + * + * @param ignite Ignite. + * @param setName Name of set. + * @return Set. + * @throws IgniteException If execution failed. + */ + @throws(classOf[IgniteException]) + private def initializeSet(ignite: Ignite, setName: String): IgniteSet[String] = { + val set = set$[String](setName) + + for (i <- 0 until 10) + set.add(Integer.toString(i)) + + println("Set size after initializing: " + set.size) + + set + } + + /** + * Write items into set. + * + * @param ignite Ignite. + * @throws IgniteException If failed. + */ + @throws(classOf[IgniteException]) + private def writeToSet(ignite: Ignite) { + val setName = set.name + + ignite.compute.broadcast(toRunnable(() => { + val set = set$[String](setName) + + val locId = cluster$.localNode.id + + for (i <- 0 until 5) { + val item = locId + "_" + Integer.toString(i) + + set.add(item) + + println("Set item has been added: " + item) + } + })) + + println("Set size after writing [expected=" + (10 + ignite.cluster.nodes.size * 5) + ", actual=" + set.size + ']') + + println("Iterate over set.") + + for (item <- set) + println("Set item: " + item) + + if (!set.contains("0")) + throw new RuntimeException("Set should contain '0' among its elements.") + + if (set.add("0")) + throw new RuntimeException("Set should not allow duplicates.") + + if (!set.remove("0")) + throw new RuntimeException("Set should correctly remove elements.") + + if (set.contains("0")) + throw new RuntimeException("Set should not contain '0' among its elements.") + + if (!set.add("0")) + throw new RuntimeException("Set should correctly add new elements.") + } + + /** + * Clear and remove set. + * + * @throws IgniteException If execution failed. + */ + @throws(classOf[IgniteException]) + private def clearAndRemoveSet() { + println("Set size before clearing: " + set.size) + + set.clear() + + println("Set size after clearing: " + set.size) + + set.close() + + println("Set was removed: " + set.removed) + + try { + set.contains("1") + } + catch { + case expected: IllegalStateException => + println("Expected exception - " + expected.getMessage) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/events/ScalarEventsExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/events/ScalarEventsExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/events/ScalarEventsExample.scala new file mode 100644 index 0000000..ef77c5a --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/events/ScalarEventsExample.scala @@ -0,0 +1,123 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.events + +import org.apache.ignite.IgniteException +import org.apache.ignite.compute.ComputeTaskSession +import org.apache.ignite.events.EventType._ +import org.apache.ignite.events.TaskEvent +import org.apache.ignite.examples.ExampleNodeStartup +import org.apache.ignite.lang.{IgniteBiPredicate, IgnitePredicate, IgniteRunnable} +import org.apache.ignite.resources.TaskSessionResource +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ + +import java.util.UUID + +/** + * Demonstrates event consume API that allows to register event listeners on remote nodes. + * Note that ignite events are disabled by default and must be specifically enabled, + * just like in `examples/config/example-ignite.xml` file. + * <p> + * Remote nodes should always be started with configuration: `'ignite.sh examples/config/example-ignite.xml'`. + * <p> + * Alternatively you can run [[ExampleNodeStartup]] in another JVM which will start + * node with `examples/config/example-ignite.xml` configuration. + */ +object ScalarEventsExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/example-ignite.xml" + + scalar(CONFIG) { + println() + println(">>> Events API example started.") + + localListen() + + remoteListen() + + Thread.sleep(1000) + } + + /** + * Listen to events that happen only on local node. + * + * @throws IgniteException If failed. + */ + @throws(classOf[IgniteException]) + private def localListen() { + println() + println(">>> Local event listener example.") + + val lsnr = new IgnitePredicate[TaskEvent] { + def apply(evt: TaskEvent): Boolean = { + println("Received task event [evt=" + evt.name + ", taskName=" + evt.taskName + ']') + + true + } + } + + ignite$.events.localListen(lsnr, EVTS_TASK_EXECUTION: _*) + + ignite$.compute().withName("example-event-task").run(() => { println("Executing sample job.") }) + ignite$.events.stopLocalListen(lsnr) + } + + /** + * Listen to events coming from all cluster nodes. + * + * @throws IgniteException If failed. + */ + @throws(classOf[IgniteException]) + private def remoteListen() { + println() + println(">>> Remote event listener example.") + + val locLsnr = new IgniteBiPredicate[UUID, TaskEvent] { + def apply(nodeId: UUID, evt: TaskEvent): Boolean = { + assert(evt.taskName.startsWith("good-task")) + + println("Received task event [evt=" + evt.name + ", taskName=" + evt.taskName) + + true + } + } + + val rmtLsnr = new IgnitePredicate[TaskEvent] { + def apply(evt: TaskEvent): Boolean = { + evt.taskName.startsWith("good-task") + } + } + + val ignite = ignite$ + + ignite.events.remoteListen(locLsnr, rmtLsnr, EVTS_TASK_EXECUTION: _*) + + for (i <- 0 until 10) { + ignite.compute.withName(if (i < 5) "good-task-" + i else "bad-task-" + i).run(new IgniteRunnable { + @TaskSessionResource private val ses: ComputeTaskSession = null + + override def run() { + println("Executing sample job for task: " + ses.getTaskName) + } + }) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/igfs/ScalarIgfsExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/igfs/ScalarIgfsExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/igfs/ScalarIgfsExample.scala new file mode 100644 index 0000000..8b58746 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/igfs/ScalarIgfsExample.scala @@ -0,0 +1,285 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.igfs + +import org.apache.ignite.examples.igfs.IgfsNodeStartup +import org.apache.ignite.igfs.{IgfsException, IgfsPath} +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ +import org.apache.ignite.{IgniteException, IgniteFileSystem} + +import org.jetbrains.annotations.Nullable + +import java.io.IOException +import java.util.{Collection => JavaCollection} + +import scala.collection.JavaConversions._ + +/** + * Example that shows usage of [[org.apache.ignite.IgniteFileSystem]] API. It starts a node with `IgniteFs` + * configured and performs several file system operations (create, write, append, read and delete + * files, create, list and delete directories). + * <p> + * Remote nodes should always be started with configuration file which includes + * IGFS: `'ignite.sh examples/config/filesystem/example-igfs.xml'`. + * <p> + * Alternatively you can run [[IgfsNodeStartup]] in another JVM which will start + * node with `examples/config/filesystem/example-igfs.xml` configuration. + */ +object ScalarIgfsExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/filesystem/example-igfs.xml" + + scalar(CONFIG) { + println() + println(">>> IGFS example started.") + + val fs = ignite$.fileSystem("igfs") + + val workDir = new IgfsPath("/examples/fs") + + delete(fs, workDir) + + mkdirs(fs, workDir) + + printInfo(fs, workDir) + + val filePath = new IgfsPath(workDir, "file.txt") + + create(fs, filePath, Array[Byte](1, 2, 3)) + + printInfo(fs, filePath) + + append(fs, filePath, Array[Byte](4, 5)) + + printInfo(fs, filePath) + + read(fs, filePath) + + delete(fs, filePath) + + printInfo(fs, filePath) + + for (i <- 0 until 5) + create(fs, new IgfsPath(workDir, "file-" + i + ".txt"), null) + + list(fs, workDir) + } + + /** + * Deletes file or directory. If directory + * is not empty, it's deleted recursively. + * + * @param fs IGFS. + * @param path File or directory path. + * @throws IgniteException In case of error. + */ + @throws(classOf[IgniteException]) + private def delete(fs: IgniteFileSystem, path: IgfsPath) { + assert(fs != null) + assert(path != null) + + if (fs.exists(path)) { + val isFile = fs.info(path).isFile + + try { + fs.delete(path, true) + + println() + println(">>> Deleted " + (if (isFile) "file" else "directory") + ": " + path) + } + catch { + case e: IgfsException => + println() + println(">>> Failed to delete " + (if (isFile) "file" else "directory") + " [path=" + path + ", msg=" + e.getMessage + ']') + } + } + else { + println() + println(">>> Won't delete file or directory (doesn't exist): " + path) + } + } + + /** + * Creates directories. + * + * @param fs IGFS. + * @param path Directory path. + * @throws IgniteException In case of error. + */ + @throws(classOf[IgniteException]) + private def mkdirs(fs: IgniteFileSystem, path: IgfsPath) { + assert(fs != null) + assert(path != null) + + try { + fs.mkdirs(path) + + println() + println(">>> Created directory: " + path) + } + catch { + case e: IgfsException => + println() + println(">>> Failed to create a directory [path=" + path + ", msg=" + e.getMessage + ']') + } + + println() + } + + /** + * Creates file and writes provided data to it. + * + * @param fs IGFS. + * @param path File path. + * @param data Data. + * @throws IgniteException If file can't be created. + * @throws IOException If data can't be written. + */ + @throws(classOf[IgniteException]) + @throws(classOf[IOException]) + private def create(fs: IgniteFileSystem, path: IgfsPath, @Nullable data: Array[Byte]) { + assert(fs != null) + assert(path != null) + + val out = fs.create(path, true) + + try { + println() + println(">>> Created file: " + path) + + if (data != null) { + out.write(data) + + println() + println(">>> Wrote data to file: " + path) + } + } + finally { + if (out != null) out.close() + } + + println() + } + + /** + * Opens file and appends provided data to it. + * + * @param fs IGFS. + * @param path File path. + * @param data Data. + * @throws IgniteException If file can't be created. + * @throws IOException If data can't be written. + */ + @throws(classOf[IgniteException]) + @throws(classOf[IOException]) + private def append(fs: IgniteFileSystem, path: IgfsPath, data: Array[Byte]) { + assert(fs != null) + assert(path != null) + assert(data != null) + assert(fs.info(path).isFile) + + val out = fs.append(path, true) + + try { + println() + println(">>> Opened file: " + path) + + out.write(data) + } + finally { + if (out != null) out.close() + } + + println() + println(">>> Appended data to file: " + path) + } + + /** + * Opens file and reads it to byte array. + * + * @param fs IgniteFs. + * @param path File path. + * @throws IgniteException If file can't be opened. + * @throws IOException If data can't be read. + */ + @throws(classOf[IgniteException]) + @throws(classOf[IOException]) + private def read(fs: IgniteFileSystem, path: IgfsPath) { + assert(fs != null) + assert(path != null) + assert(fs.info(path).isFile) + + val data = new Array[Byte](fs.info(path).length.toInt) + + val in = fs.open(path) + + try { + in.read(data) + } + finally { + if (in != null) in.close() + } + + println() + println(">>> Read data from " + path + ": " + data.mkString("[", ", ", "]")) + } + + /** + * Lists files in directory. + * + * @param fs IGFS. + * @param path Directory path. + * @throws IgniteException In case of error. + */ + @throws(classOf[IgniteException]) + private def list(fs: IgniteFileSystem, path: IgfsPath) { + assert(fs != null) + assert(path != null) + assert(fs.info(path).isDirectory) + + val files: JavaCollection[IgfsPath] = fs.listPaths(path) + + if (files.isEmpty) { + println() + println(">>> No files in directory: " + path) + } + else { + println() + println(">>> List of files in directory: " + path) + + for (f <- files) println(">>> " + f.name) + } + println() + } + + /** + * Prints information for file or directory. + * + * @param fs IGFS. + * @param path File or directory path. + * @throws IgniteException In case of error. + */ + @throws(classOf[IgniteException]) + private def printInfo(fs: IgniteFileSystem, path: IgfsPath) { + println() + println("Information for " + path + ": " + fs.info(path)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/41a5bf67/examples/src/main/scala/org/apache/ignite/scalar/examples/igfs/ScalarIgfsMapReduceExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/igfs/ScalarIgfsMapReduceExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/igfs/ScalarIgfsMapReduceExample.scala new file mode 100644 index 0000000..3afc1f0 --- /dev/null +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/igfs/ScalarIgfsMapReduceExample.scala @@ -0,0 +1,231 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.scalar.examples.igfs + +import org.apache.ignite.compute.ComputeJobResult +import org.apache.ignite.examples.igfs.IgfsNodeStartup +import org.apache.ignite.igfs.IgfsPath +import org.apache.ignite.igfs.mapreduce._ +import org.apache.ignite.igfs.mapreduce.records.IgfsNewLineRecordResolver +import org.apache.ignite.internal.util.{IgniteUtils => U} +import org.apache.ignite.scalar.scalar +import org.apache.ignite.scalar.scalar._ +import org.apache.ignite.{IgniteException, IgniteFileSystem} + +import java.io._ +import java.util.{Collection => JavaCollection, Collections, Comparator, HashSet => JavaHashSet, List => JavaList, TreeSet => JavaTreeSet} + +import scala.collection.JavaConversions._ + +/** + * Example that shows how to use [[IgfsTask]] to find lines matching particular pattern in the file in pretty + * the same way as `grep` command does. + * <p> + * Remote nodes should always be started with configuration file which includes + * IGFS: `'ignite.sh examples/config/filesystem/example-igfs.xml'`. + * <p> + * Alternatively you can run [[IgfsNodeStartup]] in another JVM which will start + * node with `examples/config/filesystem/example-igfs.xml` configuration. + */ +object ScalarIgfsMapReduceExample extends App { + /** Configuration file name. */ + private val CONFIG = "examples/config/filesystem/example-igfs.xml" + + if (args.length == 0) + println("Please provide file name and regular expression.") + else if (args.length == 1) + println("Please provide regular expression.") + else { + scalar(CONFIG) { + println() + println(">>> IGFS map reduce example started.") + + val fileName = args(0) + + val file = new File(fileName) + + val regexStr = args(1) + + val fs = ignite$.fileSystem("igfs") + + val workDir = new IgfsPath("/examples/fs") + val fsPath = new IgfsPath(workDir, file.getName) + + writeFile(fs, fsPath, file) + + val lines: JavaCollection[Line] = + fs.execute(new GrepTask, IgfsNewLineRecordResolver.NEW_LINE, Collections.singleton(fsPath), regexStr) + + if (lines.isEmpty) { + println() + println("No lines were found.") + } + else { + for (line <- lines) print(line.fileLine) + } + } + } + + /** + * Write file to the Ignite file system. + * + * @param fs Ignite file system. + * @param fsPath Ignite file system path. + * @param file File to write. + * @throws Exception In case of exception. + */ + @throws(classOf[Exception]) + private def writeFile(fs: IgniteFileSystem, fsPath: IgfsPath, file: File) { + println() + println("Copying file to IGFS: " + file) + + val os = fs.create(fsPath, true) + val fis = new FileInputStream(file) + + try { + val buf = new Array[Byte](2048) + + var read = fis.read(buf) + + while (read != -1) { + os.write(buf, 0, read) + + read = fis.read(buf) + } + } + finally { + U.closeQuiet(os) + U.closeQuiet(fis) + } + } + + /** + * Print particular string. + * + * @param str String. + */ + private def print(str: String) { + println(">>> " + str) + } +} + +/** + * Grep task. + */ +private class GrepTask extends IgfsTask[String, JavaCollection[Line]] { + def createJob(path: IgfsPath, range: IgfsFileRange, args: IgfsTaskArgs[String]): IgfsJob = { + new GrepJob(args.userArgument()) + } + + override def reduce(results: JavaList[ComputeJobResult]): JavaCollection[Line] = { + val lines: JavaCollection[Line] = new JavaTreeSet[Line](new Comparator[Line] { + def compare(line1: Line, line2: Line): Int = { + if (line1.rangePosition < line2.rangePosition) + -1 + else if (line1.rangePosition > line2.rangePosition) + 1 + else + line1.lineIndex - line2.lineIndex + } + }) + + for (res <- results) { + if (res.getException != null) + throw res.getException + val line = res.getData[JavaCollection[Line]] + + if (line != null) + lines.addAll(line) + } + + lines + } +} + +/** + * Grep job. + * + * @param regex Regex string. + */ +private class GrepJob(private val regex: String) extends IgfsInputStreamJobAdapter { + @throws(classOf[IgniteException]) + @throws(classOf[IOException]) + def execute(igfs: IgniteFileSystem, in: IgfsRangeInputStream): AnyRef = { + var res: JavaCollection[Line] = null + + val start = in.startOffset + + val br = new BufferedReader(new InputStreamReader(in)) + + try { + var ctr = 0 + + var line = br.readLine + + while (line != null) { + if (line.matches(".*" + regex + ".*")) { + if (res == null) + res = new JavaHashSet[Line] + + res.add(new Line(start, ctr, line)) + + ctr += 1 + } + + line = br.readLine + } + } + finally { + U.closeQuiet(br) + } + + res + } +} + +/** + * Single file line with it's position. + * + * @param rangePos Line start position in the file. + * @param lineIdx Matching line index within the range. + * @param line File line. + */ +private class Line(private val rangePos: Long, private val lineIdx: Int, private val line: String) { + /** + * @return Range position. + */ + def rangePosition: Long = { + rangePos + } + + /** + * @return Matching line index within the range. + */ + def lineIndex: Int = { + lineIdx + } + + /** + * @return File line. + */ + def fileLine: String = { + line + } +}