Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-sprint-3 5fcb07292 -> 8803e1798


# ignite-694


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/db754aba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/db754aba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/db754aba

Branch: refs/heads/ignite-sprint-3
Commit: db754aba1fcf9d762dba9b96e5333afde7efa0dd
Parents: 6553549
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Apr 7 17:17:08 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Apr 7 17:17:08 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    |  77 ++++++------
 .../GridCachePartitionExchangeManager.java      | 121 ++++++++++++++++++-
 .../IgniteCacheAtomicNodeJoinTest.java          |  44 +++++++
 .../IgniteCacheNodeJoinAbstractTest.java        | 109 +++++++++++++++++
 .../distributed/IgniteCacheTxNodeJoinTest.java  |  38 ++++++
 .../IgniteCacheFailoverTestSuite.java           |   4 +
 6 files changed, 356 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db754aba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 7bd6726..8805853 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -76,7 +77,6 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
 
     /** Message listener. */
     private GridMessageListener lsnr = new GridMessageListener() {
-        @SuppressWarnings("unchecked")
         @Override public void onMessage(final UUID nodeId, Object msg) {
             if (log.isDebugEnabled())
                 log.debug("Received unordered cache communication message 
[nodeId=" + nodeId +
@@ -84,44 +84,20 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
 
             final GridCacheMessage cacheMsg = (GridCacheMessage)msg;
 
-            int msgIdx = cacheMsg.lookupIndex();
+            AffinityTopologyVersion locAffVer = 
cctx.exchange().topologyVersion();
+            AffinityTopologyVersion rmtAffVer = cacheMsg.topologyVersion();
 
-            IgniteBiInClosure<UUID, GridCacheMessage> c = null;
-
-            if (msgIdx >= 0) {
-                IgniteBiInClosure[] cacheClsHandlers = 
idxClsHandlers.get(cacheMsg.cacheId());
-
-                if (cacheClsHandlers != null)
-                    c = cacheClsHandlers[msgIdx];
-            }
-
-            if (c == null)
-                c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), 
cacheMsg.getClass()));
-
-            if (c == null) {
-                if (log.isDebugEnabled())
-                    log.debug("Received message without registered handler 
(will ignore) [msg=" + msg +
-                        ", nodeId=" + nodeId + ']');
-
-                return;
-            }
-
-            long locTopVer = cctx.discovery().topologyVersion();
-            long rmtTopVer = cacheMsg.topologyVersion().topologyVersion();
-
-            if (locTopVer < rmtTopVer) {
+            if (locAffVer.compareTo(rmtAffVer) < 0) {
                 if (log.isDebugEnabled())
                     log.debug("Received message has higher topology version 
[msg=" + msg +
-                        ", locTopVer=" + locTopVer + ", rmtTopVer=" + 
rmtTopVer + ']');
-
-                IgniteInternalFuture<Long> topFut = 
cctx.discovery().topologyFuture(rmtTopVer);
+                        ", locTopVer=" + locAffVer + ", rmtTopVer=" + 
rmtAffVer + ']');
 
-                if (!topFut.isDone()) {
-                    final IgniteBiInClosure<UUID, GridCacheMessage> c0 = c;
+                IgniteInternalFuture<?> topFut = 
cctx.exchange().affinityReadyFuture(rmtAffVer);
 
-                    topFut.listen(new CI1<IgniteInternalFuture<Long>>() {
-                        @Override public void apply(IgniteInternalFuture<Long> 
t) {
-                            onMessage0(nodeId, cacheMsg, c0);
+                if (topFut != null && !topFut.isDone()) {
+                    topFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> t) 
{
+                            handleMessage(nodeId, cacheMsg);
                         }
                     });
 
@@ -129,10 +105,41 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
                 }
             }
 
-            onMessage0(nodeId, cacheMsg, c);
+            handleMessage(nodeId, cacheMsg);
         }
     };
 
+    /**
+     * @param nodeId Sender node ID.
+     * @param cacheMsg Message.
+     */
+    @SuppressWarnings("unchecked")
+    private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg) {
+        int msgIdx = cacheMsg.lookupIndex();
+
+        IgniteBiInClosure<UUID, GridCacheMessage> c = null;
+
+        if (msgIdx >= 0) {
+            IgniteBiInClosure[] cacheClsHandlers = 
idxClsHandlers.get(cacheMsg.cacheId());
+
+            if (cacheClsHandlers != null)
+                c = cacheClsHandlers[msgIdx];
+        }
+
+        if (c == null)
+            c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), 
cacheMsg.getClass()));
+
+        if (c == null) {
+            if (log.isDebugEnabled())
+                log.debug("Received message without registered handler (will 
ignore) [msg=" + cacheMsg +
+                    ", nodeId=" + nodeId + ']');
+
+            return;
+        }
+
+        onMessage0(nodeId, cacheMsg, c);
+    }
+
     /** {@inheritDoc} */
     @Override public void start0() throws IgniteCheckedException {
         retryDelay = cctx.gridConfig().getNetworkSendRetryDelay();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db754aba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d2d3531..e8e3ea1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -90,6 +90,14 @@ public class GridCachePartitionExchangeManager<K, V> extends 
GridCacheSharedMana
     /** */
     private volatile GridDhtPartitionsExchangeFuture lastInitializedFut;
 
+    /** */
+    private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> 
readyFuts = new ConcurrentHashMap8<>();
+
+    /** */
+    private final AtomicReference<AffinityTopologyVersion> readyTopVer =
+        new AtomicReference<>(AffinityTopologyVersion.NONE);
+
+
     /**
      * Partition map futures.
      * This set also contains already completed exchange futures to address 
race conditions when coordinator
@@ -311,6 +319,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
             f.onDone(new IgniteInterruptedCheckedException("Grid is stopping: 
" + cctx.gridName()));
 
+        for (AffinityReadyFuture f : readyFuts.values())
+            f.onDone(new IgniteInterruptedCheckedException("Grid is stopping: 
" + cctx.gridName()));
+
         U.cancel(exchWorker);
 
         if (log.isDebugEnabled())
@@ -372,7 +383,10 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @return Topology version.
      */
     public AffinityTopologyVersion topologyVersion() {
-        return lastInitializedFut.exchangeId().topologyVersion();
+        GridDhtPartitionsExchangeFuture lastInitializedFut0 = 
lastInitializedFut;
+
+        return lastInitializedFut0 != null
+            ? lastInitializedFut0.exchangeId().topologyVersion() : 
AffinityTopologyVersion.NONE;
     }
 
     /**
@@ -383,6 +397,49 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
+     * @param ver Topology version.
+     * @return Future or {@code null} is future is already completed.
+     */
+    @Nullable IgniteInternalFuture<?> 
affinityReadyFuture(AffinityTopologyVersion ver) {
+        GridDhtPartitionsExchangeFuture lastInitializedFut0 = 
lastInitializedFut;
+
+        if (lastInitializedFut0 != null && 
lastInitializedFut0.topologyVersion().compareTo(ver) >= 0) {
+            if (log.isDebugEnabled())
+                log.debug("Return lastInitializedFut for topology ready future 
" +
+                    "[ver=" + ver + ", fut=" + lastInitializedFut0 + ']');
+
+            return lastInitializedFut0;
+        }
+
+        AffinityTopologyVersion topVer = readyTopVer.get();
+
+        if (topVer.compareTo(ver) >= 0) {
+            if (log.isDebugEnabled())
+                log.debug("Return finished future for topology ready future 
[ver=" + ver + ", topVer=" + topVer + ']');
+
+            return null;
+        }
+
+        GridFutureAdapter<AffinityTopologyVersion> fut = 
F.addIfAbsent(readyFuts, ver,
+            new AffinityReadyFuture(ver));
+
+        if (log.isDebugEnabled())
+            log.debug("Created topology ready future [ver=" + ver + ", fut=" + 
fut + ']');
+
+        topVer = readyTopVer.get();
+
+        if (topVer.compareTo(ver) >= 0) {
+            if (log.isDebugEnabled())
+                log.debug("Completing created topology ready future " +
+                    "[ver=" + topVer + ", topVer=" + topVer + ", fut=" + fut + 
']');
+
+            fut.onDone(topVer);
+        }
+
+        return fut;
+    }
+
+    /**
      * @return {@code true} if entered to busy state.
      */
     private boolean enterBusy() {
@@ -642,8 +699,38 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
     /**
      * @param exchFut Exchange.
+     * @param err Error.
      */
-    public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, 
Throwable err) {
+    public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, 
@Nullable Throwable err) {
+        if (err == null) {
+            AffinityTopologyVersion topVer = exchFut.topologyVersion();
+
+            if (log.isDebugEnabled())
+                log.debug("Exchange done [topVer=" + topVer + ", fut=" + 
exchFut + ']');
+
+            while (true) {
+                AffinityTopologyVersion readyVer = readyTopVer.get();
+
+                if (readyVer.compareTo(topVer) >= 0)
+                    break;
+
+                if (readyTopVer.compareAndSet(readyVer, topVer))
+                    break;
+            }
+
+            for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry 
: readyFuts.entrySet()) {
+                if (entry.getKey().compareTo(topVer) <= 0) {
+                    if (log.isDebugEnabled())
+                        log.debug("Completing created topology ready future " +
+                            "[ver=" + topVer + ", fut=" + entry.getValue() + 
']');
+
+                    entry.getValue().onDone(topVer);
+                }
+            }
+        }
+        else if (log.isDebugEnabled())
+            log.debug("Exchange done with error [fut=" + exchFut + ", err=" + 
err + ']');
+
         ExchangeFutureSet exchFuts0 = exchFuts;
 
         if (exchFuts0 != null) {
@@ -1130,4 +1217,34 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
          */
         protected abstract void onMessage(ClusterNode node, M msg);
     }
+
+    /**
+     * Affinity ready future.
+     */
+    private class AffinityReadyFuture extends 
GridFutureAdapter<AffinityTopologyVersion> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private AffinityTopologyVersion topVer;
+
+        /**
+         * @param topVer Topology version.
+         */
+        private AffinityReadyFuture(AffinityTopologyVersion topVer) {
+            this.topVer = topVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(AffinityTopologyVersion res, @Nullable 
Throwable err) {
+            assert res != null || err != null;
+
+            boolean done = super.onDone(res, err);
+
+            if (done)
+                readyFuts.remove(topVer, this);
+
+            return done;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db754aba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeJoinTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeJoinTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeJoinTest.java
new file mode 100644
index 0000000..c0818bd
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeJoinTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicNodeJoinTest extends 
IgniteCacheNodeJoinAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicWriteOrderMode atomicWriteOrderMode() {
+        return PRIMARY;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected NearCacheConfiguration nearConfiguration() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db754aba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java
new file mode 100644
index 0000000..bed83ec
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public abstract class IgniteCacheNodeJoinAbstractTest extends 
IgniteCacheAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) 
throws Exception {
+        CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+        cfg.setReadFromBackup(false); // Force remote 'get'.
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGet() throws Exception {
+        final IgniteCache<Integer, Integer> cache = jcache(0);
+
+        final int KEYS = 1000;
+
+        Map<Integer, Integer> map = new HashMap<>();
+
+        for (int i = 0; i < KEYS; i++)
+            map.put(i, i);
+
+        for (int i = 0; i < 10; i++) {
+            log.info("Iteration: " + i);
+
+            cache.putAll(map);
+
+            final IgniteInternalFuture fut = GridTestUtils.runAsync(new 
Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    startGrid(1);
+
+                    return null;
+                }
+            });
+
+            final AtomicBoolean stop = new AtomicBoolean();
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    while (!stop.get() && !fut.isDone()) {
+                        for (int key = 0; key < KEYS; key++) {
+                            assertNotNull(cache.get(key));
+
+                            if (key % 100 == 0 && fut.isDone())
+                                break;
+                        }
+                    }
+
+                    return null;
+                }
+            }, 10, "test-get");
+
+            try {
+                fut.get(60_000);
+            }
+            finally {
+                stop.set(true);
+            }
+
+            stopGrid(1);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db754aba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxNodeJoinTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxNodeJoinTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxNodeJoinTest.java
new file mode 100644
index 0000000..d250cd6
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxNodeJoinTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheTxNodeJoinTest extends IgniteCacheNodeJoinAbstractTest 
{
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected NearCacheConfiguration nearConfiguration() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db754aba/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index 9556235..da79c55 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import junit.framework.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
@@ -56,6 +57,9 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
         //suite.addTestSuite(GridCacheColocatedFailoverSelfTest.class); TODO 
IGNITE-631.
         //suite.addTestSuite(GridCacheReplicatedFailoverSelfTest.class); TODO 
IGNITE-631.
 
+        suite.addTestSuite(IgniteCacheAtomicNodeJoinTest.class);
+        suite.addTestSuite(IgniteCacheTxNodeJoinTest.class);
+
         return suite;
     }
 }

Reply via email to