diff --git a/flexible/pubsub/README.md b/flexible/pubsub/README.md new file mode 100644 index 00000000000..a1706e14aad --- /dev/null +++ b/flexible/pubsub/README.md @@ -0,0 +1,59 @@ +# App Engine Flexible Environment - Pub/Sub Sample + +## Clone the sample app + +Copy the sample apps to your local machine, and cd to the pubsub directory: + +``` +git clone https://github.com/GoogleCloudPlatform/java-docs-samples +cd java-docs-samples/flexible/pubsub +``` + +## Setup + +Make sure [`gcloud`](https://cloud.google.com/sdk/docs/) is installed and authenticated. + +Create a topic +``` +gcloud beta pubsub topics create +``` + +Create a push subscription, to send messages to a Google Cloud Project URL + such as https://.appspot.com/push. +``` +gcloud beta pubsub subscriptions create \ + --topic \ + --push-endpoint \ + https://.appspot.com/pubsub/push?token= \ + --ack-deadline 30 +``` +## Run + +Set the following environment variables and run using shown Maven command. You can then +direct your browser to `http://localhost:8080/` + +``` +export PUBSUB_TOPIC= +export PUBSUB_VERIFICATION_TOKEN= +mvn jetty:run +``` + + +### Send fake subscription push messages with: + +``` +curl -H "Content-Type: application/json" -i --data @sample_message.json +"localhost:8080/pubsub/push?token=" +``` + +## Deploy + +Update the environment variables `PUBSUB_TOPIC` and `PUBSUB_VERIFICATION_TOKEN` in [`app.yaml`](src/main/appengine/app.yaml), +then: + +``` +mvn appengine:deploy +``` + +The home page of this application provides a form to publish messages and also provides a view of the most recent messages +received over the push endpoint and persisted in storage. \ No newline at end of file diff --git a/flexible/pubsub/pom.xml b/flexible/pubsub/pom.xml new file mode 100644 index 00000000000..c0ec8748ab8 --- /dev/null +++ b/flexible/pubsub/pom.xml @@ -0,0 +1,116 @@ + + + + 4.0.0 + war + 1.0-SNAPSHOT + com.example.flexible + flexible-pubsub + + + doc-samples + com.google.cloud + 1.0.0 + ../.. + + + + 1.3.0 + 1.8 + 1.8 + false + 9.3.8.v20160314 + + + + + javax.servlet + javax.servlet-api + 3.1.0 + jar + provided + + + + + com.google.cloud + google-cloud-pubsub + 0.13.0-alpha + + + com.google.cloud + google-cloud-datastore + 0.13.0-beta + + + + + + com.google.appengine + appengine-api-stubs + 1.9.38 + test + + + com.google.appengine + appengine-tools-sdk + 1.9.38 + test + + + org.eclipse.jetty + jetty-server + 9.4.3.v20170317 + + + junit + junit + test + + + org.mockito + mockito-all + 1.10.19 + test + + + org.eclipse.jetty + jetty-servlet + 9.3.14.v20161028 + + + + + ${project.build.directory}/${project.build.finalName}/WEB-INF/classes + + + com.google.cloud.tools + appengine-maven-plugin + ${appengine.maven.plugin} + + + + + + org.eclipse.jetty + jetty-maven-plugin + ${jetty.maven.plugin} + + + + + diff --git a/flexible/pubsub/sample_message.json b/flexible/pubsub/sample_message.json new file mode 100644 index 00000000000..1c0e04caa1a --- /dev/null +++ b/flexible/pubsub/sample_message.json @@ -0,0 +1 @@ +{"message":{"data":"dGVzdA==","attributes":{},"messageId":"91010751788941","publishTime":"2017-04-05T23:16:42.302Z"}} diff --git a/flexible/pubsub/src/main/appengine/app.yaml b/flexible/pubsub/src/main/appengine/app.yaml new file mode 100644 index 00000000000..5233b58b21c --- /dev/null +++ b/flexible/pubsub/src/main/appengine/app.yaml @@ -0,0 +1,26 @@ +# Copyright 2017 Google Inc. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# [START appyaml] +runtime: java +env: flex + +handlers: +- url: /.* + script: this field is required, but ignored + +# [START env_variables] +env_variables: + PUBSUB_TOPIC: + PUBSUB_VERIFICATION_TOKEN: +# [END env_variables] +# [END appyaml] diff --git a/flexible/pubsub/src/main/java/com/example/flexible/pubsub/Message.java b/flexible/pubsub/src/main/java/com/example/flexible/pubsub/Message.java new file mode 100644 index 00000000000..de2c7e9d85a --- /dev/null +++ b/flexible/pubsub/src/main/java/com/example/flexible/pubsub/Message.java @@ -0,0 +1,52 @@ +/** + * Copyright 2017 Google Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example.flexible.pubsub; + +/** + * A message captures information from the Pubsub message received over the push endpoint and is + * persisted in storage. + */ +public class Message { + private String messageId; + private String publishTime; + private String data; + + public Message(String messageId) { + this.messageId = messageId; + } + + public String getMessageId() { + return messageId; + } + + public void setMessageId(String messageId) { + this.messageId = messageId; + } + + public String getPublishTime() { + return publishTime; + } + + public void setPublishTime(String publishTime) { + this.publishTime = publishTime; + } + + public String getData() { + return data; + } + + public void setData(String data) { + this.data = data; + } +} diff --git a/flexible/pubsub/src/main/java/com/example/flexible/pubsub/MessageRepository.java b/flexible/pubsub/src/main/java/com/example/flexible/pubsub/MessageRepository.java new file mode 100644 index 00000000000..2ec79d71c9a --- /dev/null +++ b/flexible/pubsub/src/main/java/com/example/flexible/pubsub/MessageRepository.java @@ -0,0 +1,29 @@ +/** + * Copyright 2017 Google Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example.flexible.pubsub; + +import java.util.List; + +public interface MessageRepository { + + /** Save message to persistent storage. */ + void save(Message message); + + /** + * Retrieve most recent stored messages. + * @param limit number of messages + * @return list of messages + */ + List retrieve(int limit); +} diff --git a/flexible/pubsub/src/main/java/com/example/flexible/pubsub/MessageRepositoryImpl.java b/flexible/pubsub/src/main/java/com/example/flexible/pubsub/MessageRepositoryImpl.java new file mode 100644 index 00000000000..ff6e7eab137 --- /dev/null +++ b/flexible/pubsub/src/main/java/com/example/flexible/pubsub/MessageRepositoryImpl.java @@ -0,0 +1,98 @@ +/** + * Copyright 2017 Google Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example.flexible.pubsub; + +import com.google.cloud.datastore.Datastore; +import com.google.cloud.datastore.DatastoreOptions; +import com.google.cloud.datastore.Entity; +import com.google.cloud.datastore.Key; +import com.google.cloud.datastore.KeyFactory; +import com.google.cloud.datastore.Query; +import com.google.cloud.datastore.QueryResults; +import com.google.cloud.datastore.StructuredQuery; + +import java.util.ArrayList; +import java.util.List; + +/** Storage for Message objects using Cloud Datastore. */ +public class MessageRepositoryImpl implements MessageRepository { + + private static MessageRepositoryImpl instance; + + private String messagesKind = "messages"; + private KeyFactory keyFactory = getDatastoreInstance().newKeyFactory().setKind(messagesKind); + + @Override + public void save(Message message) { + // Save message to "messages" + Datastore datastore = getDatastoreInstance(); + Key key = datastore.allocateId(keyFactory.newKey()); + + Entity.Builder messageEntityBuilder = Entity.newBuilder(key) + .set("messageId", message.getMessageId()); + + if (message.getData() != null) { + messageEntityBuilder = messageEntityBuilder.set("data", message.getData()); + } + + if (message.getPublishTime() != null) { + messageEntityBuilder = messageEntityBuilder.set("publishTime", message.getPublishTime()); + } + datastore.put(messageEntityBuilder.build()); + } + + @Override + public List retrieve(int limit) { + // Get Message saved in Datastore + Datastore datastore = getDatastoreInstance(); + Query query = + Query.newEntityQueryBuilder() + .setKind(messagesKind) + .setLimit(limit) + .addOrderBy(StructuredQuery.OrderBy.desc("publishTime")) + .build(); + QueryResults results = datastore.run(query); + + List messages = new ArrayList<>(); + while (results.hasNext()) { + Entity entity = results.next(); + Message message = new Message(entity.getString("messageId")); + String data = entity.getString("data"); + if (data != null) { + message.setData(data); + } + String publishTime = entity.getString("publishTime"); + if (publishTime != null) { + message.setPublishTime(publishTime); + } + messages.add(message); + } + return messages; + } + + private Datastore getDatastoreInstance() { + return DatastoreOptions.getDefaultInstance().getService(); + } + + private MessageRepositoryImpl() { + } + + // retrieve a singleton instance + public static synchronized MessageRepositoryImpl getInstance() { + if (instance == null) { + instance = new MessageRepositoryImpl(); + } + return instance; + } +} diff --git a/flexible/pubsub/src/main/java/com/example/flexible/pubsub/PubSubHome.java b/flexible/pubsub/src/main/java/com/example/flexible/pubsub/PubSubHome.java new file mode 100644 index 00000000000..c65e3ed9e00 --- /dev/null +++ b/flexible/pubsub/src/main/java/com/example/flexible/pubsub/PubSubHome.java @@ -0,0 +1,47 @@ +/** + * Copyright 2017 Google Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.flexible.pubsub; + +import java.util.List; + +public class PubSubHome { + + private static MessageRepository messageRepository = MessageRepositoryImpl.getInstance(); + private static int MAX_MESSAGES = 10; + + /** + * Retrieve received messages in html. + * + * @return html representation of messages (one per row) + */ + public static String getReceivedMessages() { + List messageList = messageRepository.retrieve(MAX_MESSAGES); + return convertToHtmlTable(messageList); + } + + private static String convertToHtmlTable(List messages) { + StringBuilder sb = new StringBuilder(); + for (Message message : messages) { + sb.append(""); + sb.append("" + message.getMessageId() + ""); + sb.append("" + message.getData() + ""); + sb.append("" + message.getPublishTime() + ""); + sb.append(""); + } + return sb.toString(); + } + + private PubSubHome() { } +} diff --git a/flexible/pubsub/src/main/java/com/example/flexible/pubsub/PubSubPublish.java b/flexible/pubsub/src/main/java/com/example/flexible/pubsub/PubSubPublish.java new file mode 100644 index 00000000000..f4706faaf0e --- /dev/null +++ b/flexible/pubsub/src/main/java/com/example/flexible/pubsub/PubSubPublish.java @@ -0,0 +1,68 @@ +/** + * Copyright 2017 Google Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example.flexible.pubsub; + +import com.google.cloud.ServiceOptions; +import com.google.cloud.pubsub.spi.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; +import org.apache.http.HttpStatus; + +import java.io.IOException; +import javax.servlet.ServletException; +import javax.servlet.annotation.WebServlet; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +// [START publish] +@WebServlet(name = "Publish with PubSub", value = "/pubsub/publish") +public class PubSubPublish extends HttpServlet { + + @Override + public void doPost(HttpServletRequest req, HttpServletResponse resp) + throws IOException, ServletException { + Publisher publisher = this.publisher; + try { + String topicId = System.getenv("PUBSUB_TOPIC"); + // create a publisher on the topic + if (publisher == null) { + publisher = Publisher.defaultBuilder( + TopicName.create(ServiceOptions.getDefaultProjectId(), topicId)) + .build(); + } + // construct a pubsub message from the payload + final String payload = req.getParameter("payload"); + PubsubMessage pubsubMessage = + PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build(); + + publisher.publish(pubsubMessage); + // redirect to home page + resp.sendRedirect("/"); + // [END publish] + } catch (Exception e) { + resp.sendError(HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage()); + } + } +// [END publish] + + private Publisher publisher; + + public PubSubPublish() { } + + PubSubPublish(Publisher publisher) { + this.publisher = publisher; + } +} diff --git a/flexible/pubsub/src/main/java/com/example/flexible/pubsub/PubSubPush.java b/flexible/pubsub/src/main/java/com/example/flexible/pubsub/PubSubPush.java new file mode 100644 index 00000000000..ce47254e7aa --- /dev/null +++ b/flexible/pubsub/src/main/java/com/example/flexible/pubsub/PubSubPush.java @@ -0,0 +1,81 @@ +/** + * Copyright 2017 Google Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example.flexible.pubsub; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; + +import java.io.IOException; +import java.util.Base64; +import java.util.stream.Collectors; +import javax.servlet.ServletException; +import javax.servlet.annotation.WebServlet; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +// [START push] +@WebServlet(value = "/pubsub/push") +public class PubSubPush extends HttpServlet { + + @Override + public void doPost(HttpServletRequest req, HttpServletResponse resp) + throws IOException, ServletException { + String pubsubVerificationToken = System.getenv("PUBSUB_VERIFICATION_TOKEN"); + // Do not process message if request token does not match pubsubVerificationToken + if (req.getParameter("token").compareTo(pubsubVerificationToken) != 0) { + resp.setStatus(HttpServletResponse.SC_BAD_REQUEST); + return; + } + // parse message object from "message" field in the request body json + // decode message data from base64 + Message message = getMessage(req); + try { + messageRepository.save(message); + // 200, 201, 204, 102 status codes are interpreted as success by the Pub/Sub system + resp.setStatus(HttpServletResponse.SC_OK); + } catch (Exception e) { + resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + } + } +// [END push] + + private Message getMessage(HttpServletRequest request) throws IOException { + String requestBody = request.getReader().lines().collect(Collectors.joining("\n")); + JsonElement jsonRoot = jsonParser.parse(requestBody); + String messageStr = jsonRoot.getAsJsonObject().get("message").toString(); + Message message = gson.fromJson(messageStr, Message.class); + // decode from base64 + String decoded = decode(message.getData()); + message.setData(decoded); + return message; + } + + private String decode(String data) { + return new String(Base64.getDecoder().decode(data)); + } + + private final Gson gson = new Gson(); + private final JsonParser jsonParser = new JsonParser(); + private MessageRepository messageRepository; + + PubSubPush(MessageRepository messageRepository) { + this.messageRepository = messageRepository; + } + + public PubSubPush() { + this.messageRepository = MessageRepositoryImpl.getInstance(); + } +} diff --git a/flexible/pubsub/src/main/test/com/example/flexible/pubsub/PubSubPublishTest.java b/flexible/pubsub/src/main/test/com/example/flexible/pubsub/PubSubPublishTest.java new file mode 100644 index 00000000000..6b7f06016e9 --- /dev/null +++ b/flexible/pubsub/src/main/test/com/example/flexible/pubsub/PubSubPublishTest.java @@ -0,0 +1,50 @@ +/** + * Copyright 2017 Google Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.example.flexible.pubsub; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.gax.core.SettableApiFuture; +import com.google.cloud.pubsub.spi.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import org.junit.Test; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +public class PubSubPublishTest { + + @Test + public void servletPublishesPayloadMessage() throws Exception { + assertNotNull(System.getenv("PUBSUB_TOPIC")); + HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getParameter("payload")).thenReturn("test-message"); + + HttpServletResponse response = mock(HttpServletResponse.class); + Publisher publisher = mock(Publisher.class); + PubsubMessage message = PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("test-message")).build(); + when(publisher.publish(eq(message))).thenReturn(SettableApiFuture.create()); + PubSubPublish pubSubPublish = new PubSubPublish(publisher); + // verify content of published test message + pubSubPublish.doPost(request, response); + verify(publisher, times(1)).publish(eq(message)); + } +} diff --git a/flexible/pubsub/src/main/test/com/example/flexible/pubsub/PubSubPushTest.java b/flexible/pubsub/src/main/test/com/example/flexible/pubsub/PubSubPushTest.java new file mode 100644 index 00000000000..ebb84c3156c --- /dev/null +++ b/flexible/pubsub/src/main/test/com/example/flexible/pubsub/PubSubPushTest.java @@ -0,0 +1,63 @@ +package com.example.flexible.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Test; + +/** + * Copyright 2017 Google Inc. + * + *

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.io.BufferedReader; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +public class PubSubPushTest { + + @Test + public void messageReceivedOverPushEndPointIsSaved() throws Exception { + MessageRepository messageRepository = mock(MessageRepository.class); + List messages = new ArrayList<>(); + doAnswer((invocation) -> { + messages.add((Message)invocation.getArguments()[0]); + return null; + } + ).when(messageRepository).save(any(Message.class)); + HttpServletRequest request = mock(HttpServletRequest.class); + assertNotNull(System.getenv("PUBSUB_VERIFICATION_TOKEN")); + when(request.getParameter("token")) + .thenReturn(System.getenv("PUBSUB_VERIFICATION_TOKEN")); + + HttpServletResponse response = mock(HttpServletResponse.class); + BufferedReader reader = mock(BufferedReader.class); + when (request.getReader()).thenReturn(reader); + Stream requestBody = Stream.of( + "{\"message\":{\"data\":\"dGVzdA==\",\"attributes\":{}," + + "\"messageId\":\"91010751788941\",\"publishTime\":\"2017-04-05T23:16:42.302Z\"}}"); + when(reader.lines()).thenReturn(requestBody); + PubSubPush servlet = new PubSubPush(messageRepository); + assertEquals(messages.size(), 0); + servlet.doPost(request, response); + assertEquals(messages.size(), 1); + } +} + diff --git a/flexible/pubsub/src/main/webapp/index.jsp b/flexible/pubsub/src/main/webapp/index.jsp new file mode 100644 index 00000000000..fa12f02a14d --- /dev/null +++ b/flexible/pubsub/src/main/webapp/index.jsp @@ -0,0 +1,24 @@ +<%@ page import="com.example.flexible.pubsub.PubSubHome" %> + + + + + An example of using PubSub on App Engine Flex + +

Publish a message

+
+ + + +
+

Last received messages

+ + + + + + + <%= PubSubHome.getReceivedMessages() %> +
IdDataPublishTime
+ + diff --git a/pom.xml b/pom.xml index 0a59c6fe3c2..cd46ab51242 100644 --- a/pom.xml +++ b/pom.xml @@ -61,6 +61,7 @@ flexible/mailgun flexible/mailjet flexible/memcache + flexible/pubsub flexible/sendgrid flexible/sparkjava flexible/static-files