Author: fhanik Date: Fri Mar 10 16:58:26 2006 New Revision: 384974 URL: http://svn.apache.org/viewcvs?rev=384974&view=rev Log: Completed the LazyReplcatedMap and a demo that proves its concept
Added: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java?rev=384974&r1=384973&r2=384974&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/group/GroupChannel.java Fri Mar 10 16:58:26 2006 @@ -93,7 +93,8 @@ public void send(Member[] destination, Serializable msg) throws ChannelException { if ( msg == null ) return; try { - if ( destination == null ) destination = getMembers(); + if ( destination == null ) throw new ChannelException("No destination given"); + if ( destination.length == 0 ) return; int options = 0; ClusterData data = new ClusterData();//generates a unique Id data.setAddress(getLocalMember()); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=384974&r1=384973&r2=384974&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Fri Mar 10 16:58:26 2006 @@ -58,6 +58,7 @@ private transient RpcChannel rpcChannel; private transient byte[] mapContextName; private transient boolean stateTransferred = false; + private transient Object stateMutex = new Object(); //------------------------------------------------------------------------------ @@ -184,6 +185,12 @@ ArrayList list = (ArrayList)msg.getValue(); for (int i=0; i<list.size(); i++ ) { MapMessage m = (MapMessage)list.get(i); + + //make sure we don't store that actual object as primary or backup + MapEntry local = (MapEntry)super.get(m.getKey()); + if ( local != null && (!local.isProxy() ) ) continue; + + //store the object MapEntry entry = new MapEntry(m.getKey(),m.getValue()); entry.setBackup(false); entry.setProxy(true); @@ -217,18 +224,20 @@ //state transfer request if ( mapmsg.getMsgType() == mapmsg.MSG_STATE ) { - ArrayList list = new ArrayList(); - Iterator i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry e = (Map.Entry) i.next(); - MapEntry entry = (MapEntry) e.getValue(); - MapMessage me = new MapMessage(mapContextName,MapMessage.MSG_PROXY, - false,(Serializable)entry.getKey(),(Serializable)entry.getValue(), - null,entry.getBackupNode()); - list.add(me); - } - mapmsg.setValue(list); - return mapmsg; + synchronized (stateMutex) { //make sure we dont do two things at the same time + ArrayList list = new ArrayList(); + Iterator i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry e = (Map.Entry) i.next(); + MapEntry entry = (MapEntry) e.getValue(); + MapMessage me = new MapMessage(mapContextName, MapMessage.MSG_PROXY, + false, (Serializable) entry.getKey(), (Serializable) entry.getValue(), + null, entry.getBackupNode()); + list.add(me); + } + mapmsg.setValue(list); + return mapmsg; + }//synchronized } return null; @@ -259,6 +268,11 @@ super.put(entry.getKey(),entry); } + if ( mapmsg.getMsgType() == MapMessage.MSG_REMOVE ) { + super.remove(mapmsg.getKey()); + } + + if ( mapmsg.getMsgType() == MapMessage.MSG_BACKUP ) { MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); if ( entry == null ) { @@ -268,6 +282,9 @@ entry.setBackupNode(mapmsg.getBackupNode()); super.put(entry.getKey(), entry); } else { + entry.setBackup(true); + entry.setProxy(false); + entry.setBackupNode(mapmsg.getBackupNode()); if ( entry.getValue() instanceof ReplicatedMapEntry ) { ReplicatedMapEntry diff = (ReplicatedMapEntry)entry.getValue(); if ( mapmsg.isDiff() ) { @@ -295,7 +312,23 @@ } public void memberAdded(Member member) { - //do nothing, we don't care + //select a backup node if we don't have one + synchronized (stateMutex) { + Iterator i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry e = (Map.Entry) i.next(); + MapEntry entry = (MapEntry) e.getValue(); + if (entry.isPrimary() && entry.getBackupNode() == null) { + try { + Member backup = publishEntryInfo(entry.getKey(), entry.getValue()); + entry.setBackupNode(backup); + } catch (ChannelException x) { + log.error("Unable to select backup node.", x); + }//catch + }//end if + } //while + }//synchronized + } public void memberDisappeared(Member member) { //todo move all sessions that are primary here to and have this member as @@ -342,6 +375,9 @@ protected Member publishEntryInfo(Object key, Object value) throws ChannelException { //select a backup node Member backup = getNextBackupNode(); + + if ( backup == null ) return null; + //publish the data out to all nodes MapMessage msg = new MapMessage(this.mapContextName, MapMessage.MSG_PROXY, false, (Serializable) key, null, null, backup); @@ -497,6 +533,10 @@ return set; } + public int sizeFull() { + return super.size(); + } + public int size() { //todo, implement a counter variable instead //only count active members in this node @@ -658,6 +698,8 @@ private byte[] diffvalue; private Member node; + public MapMessage(){} + public MapMessage(byte[] mapId, int msgtype, boolean diff, Serializable key,Serializable value, @@ -718,9 +760,16 @@ } else { value = (Serializable)in.readObject(); }//endif + byte[] d = new byte[in.readInt()]; + in.read(d); + if ( d.length > 0 ) node = McastMember.getMember(d); + break; + } + case MSG_RETRIEVE_BACKUP: { + key = (Serializable)in.readObject(); + value = (Serializable)in.readObject(); break; } - case MSG_RETRIEVE_BACKUP: case MSG_REMOVE : { key = (Serializable)in.readObject(); break; @@ -729,7 +778,7 @@ key = (Serializable)in.readObject(); byte[] d = new byte[in.readInt()]; in.read(d); - node = McastMember.getMember(d); + if ( d.length > 0 ) node = McastMember.getMember(d); break; } }//switch @@ -750,16 +799,23 @@ } else { out.writeObject(value); }//endif + byte[] d = node!=null?((McastMember)node).getData(false):new byte[0]; + out.writeInt(d.length); + out.write(d); + break; + } + case MSG_RETRIEVE_BACKUP:{ + out.writeObject(key); + out.writeObject(value); break; } - case MSG_RETRIEVE_BACKUP: case MSG_REMOVE : { out.writeObject(key); break; } case MSG_PROXY: { out.writeObject(key); - byte[] d = ((McastMember)node).getData(false); + byte[] d = node!=null?((McastMember)node).getData(false):new byte[0]; out.writeInt(d.length); out.write(d); break; Added: tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java?rev=384974&view=auto ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java (added) +++ tomcat/container/tc5.5.x/modules/groupcom/test/org/apache/catalina/tribes/demos/MapDemo.java Fri Mar 10 16:58:26 2006 @@ -0,0 +1,332 @@ +package org.apache.catalina.tribes.demos; + +import java.io.Serializable; + +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.tipis.RpcCallback; +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ManagedChannel; +import org.apache.catalina.tribes.tipis.RpcChannel; +import org.apache.catalina.tribes.tipis.Response; + +import javax.swing.JFrame; +import javax.swing.JPanel; +import javax.swing.JScrollPane; +import javax.swing.JTable; +import java.awt.Dimension; +import java.awt.GridLayout; +import java.awt.event.MouseAdapter; +import java.awt.event.MouseEvent; +import javax.swing.JButton; +import javax.swing.JTextField; +import java.awt.Panel; +import javax.swing.BoxLayout; +import java.awt.ComponentOrientation; +import javax.swing.table.TableModel; +import javax.swing.table.AbstractTableModel; +import javax.swing.table.TableColumnModel; +import javax.swing.table.DefaultTableColumnModel; +import javax.swing.table.TableColumn; +import java.awt.event.ActionEvent; +import java.awt.event.ActionListener; +import org.apache.catalina.tribes.tipis.LazyReplicatedMap; +import org.apache.catalina.tribes.MessageListener; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.MembershipListener; +import java.util.Map; +import java.awt.BorderLayout; + +/** + * <p>Title: </p> + * + * <p>Description: </p> + * + * <p>Copyright: Copyright (c) 2005</p> + * + * <p>Company: </p> + * + * @author not attributable + * @version 1.0 + */ +public class MapDemo implements ChannelListener, MembershipListener{ + + protected LazyReplicatedMap map; + protected SimpleTableDemo table; + + public MapDemo(Channel channel ) { + map = new LazyReplicatedMap(channel,"MapDemo"); + table = SimpleTableDemo.createAndShowGUI(map,channel.getLocalMember().getName()); + channel.addChannelListener(this); + channel.addMembershipListener(this); + } + + public boolean accept(Serializable msg, Member source) { + return true; + } + + public void messageReceived(Serializable msg, Member source) { + System.out.println("Recieved: "+msg); + table.dataModel.getValueAt(-1,-1); + } + + public void memberAdded(Member member) { + } + public void memberDisappeared(Member member) { + table.dataModel.getValueAt(-1,-1); + } + + public static void usage() { + System.out.println("Tribes MapDemo."); + System.out.println("Usage:\n\t" + + "java MapDemo [channel options]\n\t" + + "\tChannel options:" + + ChannelCreator.usage()); + } + + public static void main(String[] args) throws Exception { + ManagedChannel channel = (ManagedChannel) ChannelCreator.createChannel(args); + channel.start(channel.DEFAULT); + Runtime.getRuntime().addShutdownHook(new Shutdown(channel)); + MapDemo demo = new MapDemo(channel); + + System.out.println("System test complete, sleeping to let threads finish."); + Thread.sleep(60 * 1000 * 60); + } + + public static class Shutdown + extends Thread { + ManagedChannel channel = null; + public Shutdown(ManagedChannel channel) { + this.channel = channel; + } + + public void run() { + System.out.println("Shutting down..."); + SystemExit exit = new SystemExit(5000); + exit.setDaemon(true); + exit.start(); + try { + channel.stop(channel.DEFAULT); + + } catch (Exception x) { + x.printStackTrace(); + } + System.out.println("Channel stopped."); + } + } + + public static class SystemExit + extends Thread { + private long delay; + public SystemExit(long delay) { + this.delay = delay; + } + + public void run() { + try { + Thread.sleep(delay); + } catch (Exception x) { + x.printStackTrace(); + } + System.exit(0); + + } + } + + public static class SimpleTableDemo + extends JPanel implements ActionListener{ + private static int WIDTH = 500; + + private LazyReplicatedMap map; + private boolean DEBUG = false; + TableModel dataModel = new AbstractTableModel() { + + + String[] columnNames = { + "Key", + "Value", + "Backup Node", + "isPrimary", + "isProxy", + "isBackup"}; + + public int getColumnCount() { return columnNames.length; } + + public int getRowCount() {return map.sizeFull() +1; } + + public Object getValueAt(int row, int col) { + if ( row==-1 ) { + update(); + return ""; + } + if ( row == 0 ) return columnNames[col]; + Object[] entries = map.entrySetFull().toArray(); + + Map.Entry e = (Map.Entry)entries [row-1]; + LazyReplicatedMap.MapEntry entry = (LazyReplicatedMap.MapEntry)e.getValue(); + switch (col) { + case 0: return entry.getKey(); + case 1: return entry.getValue(); + case 2: return entry.getBackupNode()!=null?entry.getBackupNode().getName():""; + case 3: return new Boolean(entry.isPrimary()); + case 4: return new Boolean(entry.isProxy()); + case 5: return new Boolean(entry.isBackup()); + default: return ""; + } + } + + public void update() { + fireTableDataChanged(); + } + }; + + JTextField txtAddKey = new JTextField(20); + JTextField txtAddValue = new JTextField(20); + JTextField txtRemoveKey = new JTextField(20); + JTextField txtChangeKey = new JTextField(20); + JTextField txtChangeValue = new JTextField(20); + + + public SimpleTableDemo(LazyReplicatedMap map) { + super(); + this.map = map; + + this.setComponentOrientation(ComponentOrientation.LEFT_TO_RIGHT); + + //final JTable table = new JTable(data, columnNames); + final JTable table = new JTable(dataModel); + + table.setPreferredScrollableViewportSize(new Dimension(WIDTH, 150)); + + if (DEBUG) { + table.addMouseListener(new MouseAdapter() { + public void mouseClicked(MouseEvent e) { + printDebugData(table); + } + }); + } + + //setLayout(new GridLayout(5, 0)); + setLayout(new BoxLayout(this, BoxLayout.Y_AXIS)); + + //Create the scroll pane and add the table to it. + JScrollPane scrollPane = new JScrollPane(table); + + //Add the scroll pane to this panel. + add(scrollPane); + + //create a add value button + JPanel addpanel = new JPanel(); + addpanel.setPreferredSize(new Dimension(WIDTH,20)); + addpanel.add(createButton("Add","add")); + addpanel.add(txtAddKey); + addpanel.add(txtAddValue); + add(addpanel); + + //create a remove value button + JPanel removepanel = new JPanel( ); + removepanel.setPreferredSize(new Dimension(WIDTH,20)); + removepanel.add(createButton("Remove","remove")); + removepanel.add(txtRemoveKey); + + add(removepanel); + + //create a change value button + JPanel changepanel = new JPanel( ); + changepanel.add(createButton("Change","change")); + changepanel.add(txtChangeKey); + changepanel.add(txtChangeValue); + changepanel.setPreferredSize(new Dimension(WIDTH,20)); + + add(changepanel); + + //create sync button + JPanel syncpanel = new JPanel( ); + syncpanel.add(createButton("Synchronize","sync")); + syncpanel.add(createButton("Replicate","replicate")); + syncpanel.setPreferredSize(new Dimension(WIDTH,20)); + + add(syncpanel); + + + } + + public JButton createButton(String text, String command) { + JButton button = new JButton(text); + button.setActionCommand(command); + button.addActionListener(this); + return button; + } + + public void actionPerformed(ActionEvent e) { + System.out.println(e.getActionCommand()); + if ( "add".equals(e.getActionCommand()) ) { + System.out.println("Add key:"+txtAddKey.getText()+" value:"+txtAddValue.getText()); + map.put(txtAddKey.getText(),new StringBuffer(txtAddValue.getText())); + } + if ( "change".equals(e.getActionCommand()) ) { + System.out.println("Change key:"+txtChangeKey.getText()+" value:"+txtChangeValue.getText()); + StringBuffer buf = (StringBuffer)map.get(txtChangeKey.getText()); + if ( buf!=null ) { + buf.delete(0,buf.length()); + buf.append(txtChangeValue.getText()); + } + } + if ( "remove".equals(e.getActionCommand()) ) { + System.out.println("Remove key:"+txtRemoveKey.getText()); + map.remove(txtRemoveKey.getText()); + } + if ( "sync".equals(e.getActionCommand()) ) { + System.out.println("Syncing from another node."); + map.transferState(); + } + if ( "replicate".equals(e.getActionCommand()) ) { + System.out.println("Replicating out to the other nodes."); + map.replicate(true); + } + dataModel.getValueAt(-1,-1); + } + + private void printDebugData(JTable table) { + int numRows = table.getRowCount(); + int numCols = table.getColumnCount(); + javax.swing.table.TableModel model = table.getModel(); + + System.out.println("Value of data: "); + for (int i = 0; i < numRows; i++) { + System.out.print(" row " + i + ":"); + for (int j = 0; j < numCols; j++) { + System.out.print(" " + model.getValueAt(i, j)); + } + System.out.println(); + } + System.out.println("--------------------------"); + } + + /** + * Create the GUI and show it. For thread safety, + * this method should be invoked from the + * event-dispatching thread. + */ + public static SimpleTableDemo createAndShowGUI(LazyReplicatedMap map, String title) { + //Make sure we have nice window decorations. + JFrame.setDefaultLookAndFeelDecorated(true); + + //Create and set up the window. + JFrame frame = new JFrame("SimpleTableDemo - "+title); + frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); + + //Create and set up the content pane. + SimpleTableDemo newContentPane = new SimpleTableDemo(map); + newContentPane.setOpaque(true); //content panes must be opaque + frame.setContentPane(newContentPane); + + //Display the window. + frame.setSize(450,250); + frame.pack(); + frame.setVisible(true); + return newContentPane; + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]