This is an automated email from the ASF dual-hosted git repository. billblough pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/axis-axis2-java-savan.git
commit d0220b0bade8933beeb96a7c36d8c0611f6f54f5 Author: Hemapani Srinath Perera <hemap...@apache.org> AuthorDate: Sat Jun 9 09:21:41 2007 +0000 integrate derby data source --- .../java/org/apache/savan/atom/AtomConstants.java | 6 +- .../java/org/apache/savan/atom/AtomDataSource.java | 190 ++++++++++++++++ .../org/apache/savan/atom/AtomMessageReceiver.java | 102 +++++---- .../java/org/apache/savan/atom/AtomSubscriber.java | 94 ++++---- .../savan/atom/AtomSubscriptionProcessor.java | 15 +- .../src/main/java/org/apache/savan/atom/Feed.java | 20 +- .../apache/savan/eventing/EventingConstants.java | 4 +- .../messagereceiver/PublishingMessageReceiver.java | 67 +++++- .../org/apache/axis2/savan/atom/DerbyTest.java | 246 +++++++++++++++++++++ modules/mar/module.xml | 2 +- 10 files changed, 655 insertions(+), 91 deletions(-) diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java b/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java index 851dcc3..60feee7 100644 --- a/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java +++ b/modules/core/src/main/java/org/apache/savan/atom/AtomConstants.java @@ -40,7 +40,7 @@ public class AtomConstants { String SUBSCRIBER_UUID = "SAVAN_EVENTING_SUBSCRIBER_UUID"; } - interface ElementNames { + public interface ElementNames { String Subscribe = "Subscribe"; String EndTo = "EndTo"; String Delivery = "Delivery"; @@ -58,13 +58,15 @@ public class AtomConstants { String GetStatus = "GetStatus"; String GetStatusResponse = "GetStatusResponse"; String FeedUrl = "FeedUrl"; - + String Entry = "entry"; + String Content = "content"; String deleteFeedResponse = "DeleteFeedResponse"; } interface Properties { String SOAPVersion = "SOAPVersion"; String feedUrl = "feedUrl"; + String DataSource = "DataSource"; } } diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java b/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java new file mode 100644 index 0000000..9cced07 --- /dev/null +++ b/modules/core/src/main/java/org/apache/savan/atom/AtomDataSource.java @@ -0,0 +1,190 @@ +package org.apache.savan.atom; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.StringWriter; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Properties; + +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; + +import org.apache.axiom.om.OMAbstractFactory; +import org.apache.axiom.om.OMDocument; +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.OMFactory; +import org.apache.axiom.om.OMNamespace; +import org.apache.axiom.om.impl.builder.StAXBuilder; +import org.apache.axiom.om.impl.builder.StAXOMBuilder; +import org.apache.axiom.om.util.StAXUtils; +import org.apache.axis2.context.MessageContext; +import org.apache.savan.SavanException; + +public class AtomDataSource { + public static final String SQL_CREATE_FEEDS = "CREATE TABLE FEEDS(id CHAR(250) NOT NULL, " + + "title CHAR(250), updated TIMESTAMP, author CHAR(250), PRIMARY KEY(id))"; + public static final String SQL_CREATE_ENTRIES = "CREATE TABLE ENTIES(feed CHAR(250), content VARCHAR(2000))"; + + public static final String SQL_ADD_FEED = "INSERT INTO FEEDS(id,title, updated,author) VALUES(?,?,?,?)"; + public static final String SQL_ADD_ENTRY = "INSERT INTO ENTIES(feed, content) VALUES(?,?)"; + public static final String SQL_GET_ENTRIES_4_FEED = "SELECT content from ENTIES WHERE feed=?"; + public static final String SQL_GET_FEED_DATA = "SELECT id,title,updated,author from FEEDS WHERE id=?"; + + + + + public String framework = "embedded"; + public String driver = "org.apache.derby.jdbc.EmbeddedDriver"; + public String protocol = "jdbc:derby:"; + private Properties props; + + public AtomDataSource() throws SavanException{ + try { + Class.forName(driver).newInstance(); + System.out.println("Loaded the appropriate driver."); + + props = new Properties(); + props.put("user", "user1"); + props.put("password", "user1"); + + Connection connection = getConnection(); + Statement statement = connection.createStatement(); + + ResultSet feedtable = connection.getMetaData().getTables(null, null, "FEEDS", null); + if(!feedtable.next()){ + statement.execute(SQL_CREATE_FEEDS); + } + ResultSet entirestable = connection.getMetaData().getTables(null, null, "ENTIES", null); + if(!entirestable.next()){ + statement.execute(SQL_CREATE_ENTRIES); + } + connection.close(); + } catch (InstantiationException e) { + throw new SavanException(e); + } catch (IllegalAccessException e) { + throw new SavanException(e); + } catch (ClassNotFoundException e) { + throw new SavanException(e); + } catch (SQLException e) { + throw new SavanException(e); + } + + } + + public Connection getConnection() throws SavanException{ + try { + /* + The connection specifies create=true to cause + the database to be created. To remove the database, + remove the directory derbyDB and its contents. + The directory derbyDB will be created under + the directory that the system property + derby.system.home points to, or the current + directory if derby.system.home is not set. + */ + return DriverManager.getConnection(protocol + + "derbyDB;create=true", props); + } catch (SQLException e) { + throw new SavanException(e); + } + + } + + + public void addFeed(String id,String title,Date lastEditedtime,String author) throws SavanException{ + + try { + Connection connection = getConnection(); + try{ + PreparedStatement statement = connection.prepareStatement(SQL_ADD_FEED); + statement.setString(1,id ); + statement.setString(2,title ); + Timestamp t = new Timestamp(lastEditedtime.getTime()); + statement.setTimestamp(3, t); + statement.setString(4, author); + statement.executeUpdate(); + }finally{ + connection.close(); + } + } catch (SQLException e) { + throw new SavanException(e); + } + + + } + + public void addEntry(String id,OMElement entry) throws SavanException{ + try { + StringWriter w = new StringWriter(); + entry.serialize(w); + Connection connection = getConnection(); + try{ + PreparedStatement statement = connection.prepareStatement(SQL_ADD_ENTRY); + statement.setString(1,id ); + statement.setString(2,w.getBuffer().toString() ); + statement.executeUpdate(); + }finally{ + connection.close(); + } + } catch (SQLException e) { + throw new SavanException(e); + } catch (XMLStreamException e) { + throw new SavanException(e); + } + } + + + public OMElement getFeedAsXml(String feedId) throws SavanException{ + + try { + Connection connection = getConnection(); + try{ + PreparedStatement statement = connection.prepareStatement(SQL_GET_FEED_DATA); + statement.setString(1,feedId ); + ResultSet results = statement.executeQuery(); + if(results.next()){ + String title = results.getString("title"); + Timestamp updatedTime = results.getTimestamp("updated"); + String author = results.getString("author"); + + Feed feed = new Feed(title,feedId,author,updatedTime); + + statement.close(); + + statement = connection.prepareStatement(SQL_GET_ENTRIES_4_FEED); + statement.setString(1,feedId ); + results = statement.executeQuery(); + while(results.next()){ + String entryAsStr = results.getString("content"); + InputStream atomIn = new ByteArrayInputStream(entryAsStr.getBytes()); + XMLStreamReader xmlreader = StAXUtils.createXMLStreamReader(atomIn, MessageContext.DEFAULT_CHAR_SET_ENCODING); + StAXBuilder builder = new StAXOMBuilder(feed.getFactory(),xmlreader); + feed.addEntry(builder.getDocumentElement()); + } + return feed.getFeedAsXml(); + }else{ + throw new SavanException("No such feed "+feedId); + } + }finally{ + connection.close(); + } + } catch (SQLException e) { + throw new SavanException(e); + } catch (XMLStreamException e) { + throw new SavanException(e); + } + } + + + + + +} diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java b/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java index 181abc6..8d817fb 100644 --- a/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java +++ b/modules/core/src/main/java/org/apache/savan/atom/AtomMessageReceiver.java @@ -49,44 +49,64 @@ public class AtomMessageReceiver implements MessageReceiver{ OMElement bodyContent = envlope.getBody().getFirstElement(); OMElement feedID = bodyContent.getFirstElement(); - String pathWRTRepository = "atom/"+feedID.getText(); - File atomFile = messageCtx.getConfigurationContext().getRealPath(pathWRTRepository); - if(pathWRTRepository.equals("atom/all.atom") && !atomFile.exists()){ - AtomSubscriber atomSubscriber = new AtomSubscriber(); - atomSubscriber.setId(new URI("All")); - atomSubscriber.setAtomFile(atomFile); - atomSubscriber.setAuthor("DefaultUser"); - atomSubscriber.setTitle("default Feed"); - - String serviceAddress = messageCtx.getTo().getAddress(); - int cutIndex = serviceAddress.indexOf("services"); - if(cutIndex > 0){ - serviceAddress = serviceAddress.substring(0,cutIndex-1); - } - atomSubscriber.setFeedUrl(serviceAddress+"/services/"+messageCtx.getServiceContext().getAxisService().getName() +"/atom?feed=all.atom"); - - - SubscriberStore store = CommonUtil.getSubscriberStore(messageCtx.getAxisService()); - if (store == null) - throw new AxisFault ("Cant find the Savan subscriber store"); - store.store(atomSubscriber); - } - - if(!atomFile.exists()){ - throw new AxisFault("no feed exisits for "+feedID.getText() + " no file found "+ atomFile.getAbsolutePath()); - } - FileInputStream atomIn = new FileInputStream(atomFile); - + + + String feedIDAsUrn = feedID.getText().replaceAll("_", ":").replaceAll(".atom", ""); + + SubscriberStore store = CommonUtil.getSubscriberStore(messageCtx.getAxisService()); + if (store == null) + throw new AxisFault ("Cant find the Savan subscriber store"); + + + AtomSubscriber subscriber = (AtomSubscriber)store.retrieve(feedIDAsUrn); + SOAPFactory fac = getSOAPFactory(messageCtx); SOAPEnvelope envelope = fac.getDefaultEnvelope(); + + OMElement result = subscriber.getFeedAsXml(); + + + + + +// String pathWRTRepository = "atom/"+feedID.getText(); +// +// File atomFile = messageCtx.getConfigurationContext().getRealPath(pathWRTRepository); +// if(pathWRTRepository.equals("atom/all.atom") && !atomFile.exists()){ +// AtomSubscriber atomSubscriber = new AtomSubscriber(); +// atomSubscriber.setId(new URI("All")); +// atomSubscriber.setAtomFile(atomFile); +// atomSubscriber.setAuthor("DefaultUser"); +// atomSubscriber.setTitle("default Feed"); +// +// String serviceAddress = messageCtx.getTo().getAddress(); +// int cutIndex = serviceAddress.indexOf("services"); +// if(cutIndex > 0){ +// serviceAddress = serviceAddress.substring(0,cutIndex-1); +// } +// atomSubscriber.setFeedUrl(serviceAddress+"/services/"+messageCtx.getServiceContext().getAxisService().getName() +"/atom?feed=all.atom"); +// +// +// SubscriberStore store = CommonUtil.getSubscriberStore(messageCtx.getAxisService()); +// if (store == null) +// throw new AxisFault ("Cant find the Savan subscriber store"); +// store.store(atomSubscriber); +// } +// +// +// if(!atomFile.exists()){ +// throw new AxisFault("no feed exisits for "+feedID.getText() + " no file found "+ atomFile.getAbsolutePath()); +// } +// FileInputStream atomIn = new FileInputStream(atomFile); + //add the content of the file to the response - XMLStreamReader xmlreader = StAXUtils.createXMLStreamReader - (atomIn, MessageContext.DEFAULT_CHAR_SET_ENCODING); - StAXBuilder builder = new StAXOMBuilder(fac,xmlreader); - OMElement result = (OMElement) builder.getDocumentElement(); +// XMLStreamReader xmlreader = StAXUtils.createXMLStreamReader +// (atomIn, MessageContext.DEFAULT_CHAR_SET_ENCODING); +// StAXBuilder builder = new StAXOMBuilder(fac,xmlreader); +// OMElement result = (OMElement) builder.getDocumentElement(); envelope.getBody().addChild(result); //send beck the response @@ -125,15 +145,15 @@ public class AtomMessageReceiver implements MessageReceiver{ } catch (OMException e) { e.printStackTrace(); throw new AxisFault(e); - } catch (FileNotFoundException e) { - e.printStackTrace(); - throw new AxisFault(e); - } catch (XMLStreamException e) { - e.printStackTrace(); - throw new AxisFault(e); - } catch (URISyntaxException e) { - e.printStackTrace(); - throw new AxisFault(e); +// } catch (FileNotFoundException e) { +// e.printStackTrace(); +// throw new AxisFault(e); +// } catch (XMLStreamException e) { +// e.printStackTrace(); +// throw new AxisFault(e); +// } catch (URISyntaxException e) { +// e.printStackTrace(); +// throw new AxisFault(e); } } diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriber.java b/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriber.java index 77b6352..35adc57 100644 --- a/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriber.java +++ b/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriber.java @@ -21,14 +21,20 @@ import org.w3.x2005.x08.addressing.EndpointReferenceType; public class AtomSubscriber implements Subscriber{ private static final Log log = LogFactory.getLog(AtomSubscriber.class); private Date subscriptionEndingTime = null; - private Feed feed; + //private Feed feed; private Filter filter = null; private File atomFile; private String feedUrl; - + private AtomDataSource atomDataSource; private URI id; - private String title; - private String author; + + public void init(AtomDataSource dataSource,URI id,String title,String author) throws SavanException{ + this.atomDataSource = dataSource; + atomDataSource.addFeed(id.toString(), title, new Date(), author); + } + + + public URI getId() { return id; } @@ -36,7 +42,7 @@ public class AtomSubscriber implements Subscriber{ throw new UnsupportedOperationException(); } public void sendEventData(OMElement eventData) throws SavanException { - try { +// try { Date date = new Date (); boolean expired = false; @@ -47,25 +53,28 @@ public class AtomSubscriber implements Subscriber{ String message = "Cant notify the listner since the subscription has been expired"; log.debug(message); } - if(feed == null){ - feed = new Feed(title,id.toString(),author); - } - feed.addEntry(eventData); - if(!atomFile.getParentFile().exists()){ - atomFile.getParentFile().mkdir(); - } - FileOutputStream out = new FileOutputStream(atomFile); - feed.write(out); - out.close(); - System.out.println("Atom file at "+ atomFile + " is updated"); - } catch (FileNotFoundException e) { - throw new SavanException(e); - } catch (XMLStreamException e) { - throw new SavanException(e); - } catch (IOException e) { - throw new SavanException(e); - } + atomDataSource.addEntry(id.toString(), eventData); +// +// if(feed == null){ +// feed = new Feed(title,id.toString(),author); +// } +// feed.addEntry(eventData); +// +// if(!atomFile.getParentFile().exists()){ +// atomFile.getParentFile().mkdir(); +// } +// FileOutputStream out = new FileOutputStream(atomFile); +// feed.write(out); +// out.close(); +// System.out.println("Atom file at "+ atomFile + " is updated"); +// } catch (FileNotFoundException e) { +// throw new SavanException(e); +// } catch (XMLStreamException e) { +// throw new SavanException(e); +// } catch (IOException e) { +// throw new SavanException(e); +// } } public void setId(URI id) { this.id = id; @@ -84,22 +93,22 @@ public class AtomSubscriber implements Subscriber{ throw new UnsupportedOperationException(); } - public String getAuthor() { - return author; - } - - public void setAuthor(String author) { - this.author = author; - } - +// public String getAuthor() { +// return author; +// } +// +// public void setAuthor(String author) { +// this.author = author; +// } +// +// +// public String getTitle() { +// return title; +// } - public String getTitle() { - return title; - } - - public void setTitle(String title) { - this.title = title; - } +// public void setTitle(String title) { +// this.title = title; +// } public String getFeedUrl(){ @@ -121,6 +130,15 @@ public class AtomSubscriber implements Subscriber{ public void setFeedUrl(String feedUrl) { this.feedUrl = feedUrl; } +// public Feed getFeed() { +// return feed; +// } + public OMElement getFeedAsXml() throws SavanException { + return atomDataSource.getFeedAsXml(id.toString()); + } +// public void setAtomDataSource(AtomDataSource atomDataSource) { +// this.atomDataSource = atomDataSource; +// } } diff --git a/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java b/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java index 98203d6..a87c6f0 100644 --- a/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java +++ b/modules/core/src/main/java/org/apache/savan/atom/AtomSubscriptionProcessor.java @@ -31,6 +31,7 @@ import org.apache.axiom.soap.SOAPEnvelope; import org.apache.axiom.soap.SOAPHeader; import org.apache.axis2.AxisFault; import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.context.ServiceContext; import org.apache.savan.SavanConstants; import org.apache.savan.SavanException; import org.apache.savan.SavanMessageContext; @@ -98,6 +99,13 @@ public class AtomSubscriptionProcessor extends SubscriptionProcessor { if (envelope==null) return null; + ServiceContext serviceContext = smc.getMessageContext().getServiceContext(); + AtomDataSource dataSource = (AtomDataSource)serviceContext.getProperty(AtomConstants.Properties.DataSource); + if(dataSource == null){ + dataSource = new AtomDataSource(); + serviceContext.setProperty(AtomConstants.Properties.DataSource, dataSource); + } + String subscriberName = protocol.getDefaultSubscriber(); Subscriber subscriber = configurationManager.getSubscriberInstance(subscriberName); @@ -105,6 +113,8 @@ public class AtomSubscriptionProcessor extends SubscriptionProcessor { String message = "Savan only support implementations of Atom subscriber as Subscribers"; throw new SavanException (message); } + + //find the real path for atom feeds File repositoryPath = smc.getConfigurationContext().getRealPath("/"); @@ -118,6 +128,7 @@ public class AtomSubscriptionProcessor extends SubscriptionProcessor { } AtomSubscriber atomSubscriber = (AtomSubscriber) subscriber; + String id = UUIDGenerator.getUUID(); smc.setProperty(AtomConstants.TransferedProperties.SUBSCRIBER_UUID,id); atomSubscriber.setId(new URI(id)); @@ -151,8 +162,8 @@ public class AtomSubscriptionProcessor extends SubscriptionProcessor { } atomSubscriber.setFilter(filter); } - atomSubscriber.setAuthor(createFeed.getAuthor()); - atomSubscriber.setTitle(createFeed.getTitle()); + + atomSubscriber.init(dataSource, new URI(id), createFeed.getTitle(), createFeed.getAuthor()); smc.setProperty(AtomConstants.Properties.feedUrl, atomSubscriber.getFeedUrl()); return atomSubscriber; } catch (AxisFault e) { diff --git a/modules/core/src/main/java/org/apache/savan/atom/Feed.java b/modules/core/src/main/java/org/apache/savan/atom/Feed.java index 0bdebfb..aea30a4 100644 --- a/modules/core/src/main/java/org/apache/savan/atom/Feed.java +++ b/modules/core/src/main/java/org/apache/savan/atom/Feed.java @@ -56,11 +56,20 @@ public class Feed { // </feed> - public Feed(String title, String id, String author) { + public Feed(String title, String id, String author,Date lastUpdated) { this.title = title; + if(title != null){ + title = title.trim(); + } + if(author != null){ + author = author.trim(); + } + this.id = id; this.author = author; - lastUpdated = new Date(); + if(lastUpdated == null){ + lastUpdated = new Date(); + } factory = OMAbstractFactory.getOMFactory(); document = factory.createOMDocument(); atomNs = factory.createOMNamespace(AtomConstants.ATOM_NAMESPACE,AtomConstants.ATOM_PREFIX); @@ -107,4 +116,11 @@ public class Feed { // // } + public OMElement getFeedAsXml(){ + return document.getOMDocumentElement(); + } + public OMFactory getFactory() { + return factory; + } + } diff --git a/modules/core/src/main/java/org/apache/savan/eventing/EventingConstants.java b/modules/core/src/main/java/org/apache/savan/eventing/EventingConstants.java index 793de80..49e3027 100644 --- a/modules/core/src/main/java/org/apache/savan/eventing/EventingConstants.java +++ b/modules/core/src/main/java/org/apache/savan/eventing/EventingConstants.java @@ -20,6 +20,7 @@ package org.apache.savan.eventing; public interface EventingConstants { String EVENTING_NAMESPACE = "http://schemas.xmlsoap.org/ws/2004/08/eventing"; + String EXTENDED_EVENTING_NAMESPACE = "http://ws.apache.org/ws/2007/05/eventing-extended"; String EVENTING_PREFIX = "wse"; String DEFAULT_DELIVERY_MODE = "http://schemas.xmlsoap.org/ws/2004/08/eventing/DeliveryModes/Push"; String DEFAULT_FILTER_IDENTIFIER = FilterDialects.XPath; @@ -46,6 +47,7 @@ public interface EventingConstants { String Unsubscribe = "Unsubscribe"; String GetStatus = "GetStatus"; String GetStatusResponse = "GetStatusResponse"; + String Topic = "topic"; } interface Actions { @@ -57,7 +59,7 @@ public interface EventingConstants { String UnsubscribeResponse = "http://schemas.xmlsoap.org/ws/2004/08/eventing/UnsubscribeResponse"; String GetStatus = "http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus"; String GetStatusResponse = "http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatusResponse"; - String Publish = "http://wso2.com/ws/2007/05/eventing/Publish"; + String Publish = "http://ws.apache.org/ws/2007/05/eventing-extended/Publish"; } interface Properties { diff --git a/modules/core/src/main/java/org/apache/savan/messagereceiver/PublishingMessageReceiver.java b/modules/core/src/main/java/org/apache/savan/messagereceiver/PublishingMessageReceiver.java index fe4ebba..020c0dd 100644 --- a/modules/core/src/main/java/org/apache/savan/messagereceiver/PublishingMessageReceiver.java +++ b/modules/core/src/main/java/org/apache/savan/messagereceiver/PublishingMessageReceiver.java @@ -1,17 +1,76 @@ package org.apache.savan.messagereceiver; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.StringTokenizer; + +import javax.xml.namespace.QName; + +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.OMException; import org.apache.axiom.soap.SOAPEnvelope; import org.apache.axis2.AxisFault; import org.apache.axis2.context.MessageContext; import org.apache.axis2.context.ServiceContext; import org.apache.axis2.engine.MessageReceiver; +import org.apache.savan.atom.AtomConstants; +import org.apache.savan.eventing.EventingConstants; import org.apache.savan.publication.client.PublicationClient; +/** + * This Message reciver handles the publish requests. It will received all messages sent to SOAP/WS action + * http://ws.apache.org/ws/2007/05/eventing-extended/Publish, or request URL http://<host>:port//services/<service-name>/publish. + * It will search for topic in URL query parameter "topic" or + * Soap Header <eevt::topic xmlns="http://ws.apache.org/ws/2007/05/eventing-extended">...</topic> + * @author Srinath Perera (hemap...@apache.org) + */ public class PublishingMessageReceiver implements MessageReceiver{ + public void receive(MessageContext messageCtx) throws AxisFault { - SOAPEnvelope requestEnvelope = messageCtx.getEnvelope(); - ServiceContext serviceContext = messageCtx.getServiceContext(); - PublicationClient client = new PublicationClient(serviceContext.getConfigurationContext()); - client.sendPublication(requestEnvelope.getBody().getFirstElement(),serviceContext.getAxisService(),null); + try { + String toAddress = messageCtx.getTo().getAddress(); + //Here we try to locate the topic. It can be either a query parameter of the input address or a header + //in the SOAP evvelope + URI topic = null; + + SOAPEnvelope requestEnvelope = messageCtx.getEnvelope(); + int querySeperatorIndex = toAddress.indexOf('?'); + if(querySeperatorIndex > 0){ + String queryString = toAddress.substring(querySeperatorIndex+1); + HashMap map = new HashMap(); + StringTokenizer t = new StringTokenizer(queryString,"=&"); + while(t.hasMoreTokens()){ + map.put(t.nextToken(), t.nextToken()); + } + if(map.containsKey(EventingConstants.ElementNames.Topic)){ + topic = new URI((String)map.get(EventingConstants.ElementNames.Topic)); + } + }else{ + OMElement topicHeader = requestEnvelope.getHeader().getFirstChildWithName(new QName(EventingConstants.EXTENDED_EVENTING_NAMESPACE, + EventingConstants.ElementNames.Topic)); + if(topicHeader != null){ + topic = new URI(topicHeader.getText()); + } + } + + //Here we locate the content of the Event. If this is APP we unwrap APP wrapping elements. + OMElement eventData = requestEnvelope.getBody().getFirstElement(); + if(AtomConstants.ATOM_NAMESPACE.equals(eventData.getNamespace().getNamespaceURI()) && + AtomConstants.ElementNames.Entry.equals(eventData.getLocalName())){ + OMElement content = eventData.getFirstChildWithName(new QName(AtomConstants.ATOM_NAMESPACE,AtomConstants.ElementNames.Content)); + if(content != null && content.getFirstElement() != null){ + eventData.getFirstElement(); + } + } + //Use in memory API to publish the event + ServiceContext serviceContext = messageCtx.getServiceContext(); + PublicationClient client = new PublicationClient(serviceContext.getConfigurationContext()); + client.sendPublication(eventData,serviceContext.getAxisService(),topic); + } catch (OMException e) { + throw new AxisFault(e); + } catch (URISyntaxException e) { + throw new AxisFault(e); + } } } diff --git a/modules/core/src/test/java/org/apache/axis2/savan/atom/DerbyTest.java b/modules/core/src/test/java/org/apache/axis2/savan/atom/DerbyTest.java new file mode 100644 index 0000000..e24a658 --- /dev/null +++ b/modules/core/src/test/java/org/apache/axis2/savan/atom/DerbyTest.java @@ -0,0 +1,246 @@ +package org.apache.axis2.savan.atom; + +import java.io.StringWriter; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Date; +import java.util.Iterator; +import java.util.Properties; +import java.util.Random; + +import org.apache.axiom.om.OMAbstractFactory; +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.OMFactory; +import org.apache.axiom.om.OMNamespace; +import org.apache.savan.SavanException; +import org.apache.savan.atom.AtomDataSource; + +import junit.framework.TestCase; + +public class DerbyTest extends TestCase{ + + /* the default framework is embedded*/ + public String framework = "embedded"; + public String driver = "org.apache.derby.jdbc.EmbeddedDriver"; + public String protocol = "jdbc:derby:"; + + public void testDataSource() throws Exception{ + AtomDataSource dataSource = new AtomDataSource(); + String id = "id"+new Random().nextDouble(); + dataSource.addFeed(id, "foo", new Date(),"Srinath"); + dataSource.addEntry(id,getDummyMethodRequestElement()); + StringWriter w = new StringWriter(); + + OMElement result = dataSource.getFeedAsXml(id); +// Iterator it = result.getChildElements(); +// while(it.hasNext()){ +// System.out.println(it.next()); +// } + + result.serialize(w); + System.out.println(w.getBuffer().toString()); + } + +// public static void main(String[] args) +// { +// new DerbyTest().go(args); +// } + +// void go(String[] args) +// { +// /* parse the arguments to determine which framework is desired*/ +// parseArguments(args); +// +// System.out.println("SimpleApp starting in " + framework + " mode."); +// +// try +// { +// /* +// The driver is installed by loading its class. +// In an embedded environment, this will start up Derby, since it is not already running. +// */ +// Class.forName(driver).newInstance(); +// System.out.println("Loaded the appropriate driver."); +// +// Connection conn = null; +// Properties props = new Properties(); +// props.put("user", "user1"); +// props.put("password", "user1"); +// +// /* +// The connection specifies create=true to cause +// the database to be created. To remove the database, +// remove the directory derbyDB and its contents. +// The directory derbyDB will be created under +// the directory that the system property +// derby.system.home points to, or the current +// directory if derby.system.home is not set. +// */ +// conn = DriverManager.getConnection(protocol + +// "derbyDB;create=true", props); +// +// System.out.println("Connected to and created database derbyDB"); +// +// conn.setAutoCommit(false); +// +// /* +// Creating a statement lets us issue commands against +// the connection. +// */ +// Statement s = conn.createStatement(); +// +// /* +// We create a table, add a few rows, and update one. +// */ +// s.execute("create table derbyDB(num int, addr varchar(40))"); +// System.out.println("Created table derbyDB"); +// s.execute("insert into derbyDB values (1956,'Webster St.')"); +// System.out.println("Inserted 1956 Webster"); +// s.execute("insert into derbyDB values (1910,'Union St.')"); +// System.out.println("Inserted 1910 Union"); +// s.execute( +// "update derbyDB set num=180, addr='Grand Ave.' where num=1956"); +// System.out.println("Updated 1956 Webster to 180 Grand"); +// +// s.execute( +// "update derbyDB set num=300, addr='Lakeshore Ave.' where num=180"); +// System.out.println("Updated 180 Grand to 300 Lakeshore"); +// +// /* +// We select the rows and verify the results. +// */ +// ResultSet rs = s.executeQuery( +// "SELECT num, addr FROM derbyDB ORDER BY num"); +// +// if (!rs.next()) +// { +// throw new Exception("Wrong number of rows"); +// } +// +// if (rs.getInt(1) != 300) +// { +// throw new Exception("Wrong row returned"); +// } +// +// if (!rs.next()) +// { +// throw new Exception("Wrong number of rows"); +// } +// +// if (rs.getInt(1) != 1910) +// { +// throw new Exception("Wrong row returned"); +// } +// +// if (rs.next()) +// { +// throw new Exception("Wrong number of rows"); +// } +// +// System.out.println("Verified the rows"); +// +// s.execute("drop table derbyDB"); +// System.out.println("Dropped table derbyDB"); +// +// /* +// We release the result and statement resources. +// */ +// rs.close(); +// s.close(); +// System.out.println("Closed result set and statement"); +// +// /* +// We end the transaction and the connection. +// */ +// conn.commit(); +// conn.close(); +// System.out.println("Committed transaction and closed connection"); +// +// /* +// In embedded mode, an application should shut down Derby. +// If the application fails to shut down Derby explicitly, +// the Derby does not perform a checkpoint when the JVM shuts down, which means +// that the next connection will be slower. +// Explicitly shutting down Derby with the URL is preferred. +// This style of shutdown will always throw an "exception". +// */ +// boolean gotSQLExc = false; +// +// if (framework.equals("embedded")) +// { +// try +// { +// DriverManager.getConnection("jdbc:derby:;shutdown=true"); +// } +// catch (SQLException se) +// { +// gotSQLExc = true; +// } +// +// if (!gotSQLExc) +// { +// System.out.println("Database did not shut down normally"); +// } +// else +// { +// System.out.println("Database shut down normally"); +// } +// } +// } +// catch (Throwable e) +// { +// System.out.println("exception thrown:"); +// +// if (e instanceof SQLException) +// { +// printSQLError((SQLException) e); +// } +// else +// { +// e.printStackTrace(); +// } +// } +// +// System.out.println("SimpleApp finished"); +// } +// +// static void printSQLError(SQLException e) +// { +// while (e != null) +// { +// System.out.println(e.toString()); +// e = e.getNextException(); +// } +// } +// +// private void parseArguments(String[] args) +// { +// int length = args.length; +// +// for (int index = 0; index < length; index++) +// { +// if (args[index].equalsIgnoreCase("jccjdbcclient")) +// { +// framework = "jccjdbc"; +// driver = "com.ibm.db2.jcc.DB2Driver"; +// protocol = "jdbc:derby:net://localhost:1527/"; +// } +// if (args[index].equalsIgnoreCase("derbyclient")) +// { +// framework = "derbyclient"; +// driver = "org.apache.derby.jdbc.ClientDriver"; +// protocol = "jdbc:derby://localhost:1527/"; +// } +// } +// } + private final String applicationNamespaceName = "http://tempuri.org/"; + private final String dummyMethod = "dummyMethod"; + private OMElement getDummyMethodRequestElement() { + OMFactory fac = OMAbstractFactory.getOMFactory(); + OMNamespace namespace = fac.createOMNamespace(applicationNamespaceName,"ns1"); + return fac.createOMElement(dummyMethod, namespace); + } + } diff --git a/modules/mar/module.xml b/modules/mar/module.xml index 65a7bcf..d5b2b4b 100644 --- a/modules/mar/module.xml +++ b/modules/mar/module.xml @@ -18,7 +18,7 @@ <operation name="publish" mep="http://www.w3.org/2004/08/wsdl/in-out"> <messageReceiver class="org.apache.savan.messagereceiver.PublishingMessageReceiver"/> - <actionMapping>http://wso2.com/ws/2007/05/eventing/Publish</actionMapping> + <actionMapping>http://ws.apache.org/ws/2007/05/eventing-extended/Publish</actionMapping> </operation> </module>