all may be done
This commit is contained in:
@@ -19,11 +19,11 @@ public class ChatHandler {
|
||||
private String userId, corespondent;
|
||||
private Connection connection;
|
||||
private Session session;
|
||||
int status = 0; // 0: offline, 1: online
|
||||
private final String TAG = "Task2_queue_";
|
||||
private final String BROKER_URL = "tcp://localhost:61616";
|
||||
|
||||
// Initialize connection and session
|
||||
public void initializeSession() throws JMSException {
|
||||
String BROKER_URL = "tcp://localhost:61616";
|
||||
ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
|
||||
this.connection = factory.createConnection();
|
||||
this.connection.start();
|
||||
@@ -67,14 +67,14 @@ public class ChatHandler {
|
||||
|
||||
// Create producer queue for sending messages
|
||||
private MessageProducer createProducer() throws JMSException {
|
||||
String from = "queue_" + userId;
|
||||
String from = TAG + userId;
|
||||
Queue prod = session.createQueue(from);
|
||||
return session.createProducer(prod);
|
||||
}
|
||||
|
||||
// Create consumer queue for receiving messages
|
||||
private MessageConsumer createConsumer() throws JMSException {
|
||||
String to = "queue_" + getCorespondent();
|
||||
String to = TAG+ getCorespondent();
|
||||
Queue cons = session.createQueue(to);
|
||||
return session.createConsumer(cons);
|
||||
}
|
||||
@@ -105,20 +105,21 @@ public class ChatHandler {
|
||||
Message msg = consumer.receive(300);
|
||||
if (msg instanceof TextMessage tm) {
|
||||
|
||||
String sender = tm.getStringProperty("sender");
|
||||
// Status handling for online/offline
|
||||
if(tm.getText().isBlank() || tm.getText().isEmpty()) {
|
||||
setStatus(1);
|
||||
System.out.println("> " + sender + " is online.");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Exit condition because correspondent went offline
|
||||
if(tm.getText().equalsIgnoreCase("exit")) {
|
||||
setStatus(0);
|
||||
System.out.println("> " + sender + " is offline.");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Display received message
|
||||
String sender = tm.getStringProperty("sender");
|
||||
// Display received message can be from anyone
|
||||
//String sender = tm.getStringProperty("sender");
|
||||
String body = tm.getText();
|
||||
System.out.println("[" + sender + "]: " + body);
|
||||
}
|
||||
@@ -189,18 +190,4 @@ public class ChatHandler {
|
||||
this.corespondent = corespondent;
|
||||
}
|
||||
|
||||
// update status of correspondent and print status message
|
||||
private void setStatus(int status) {
|
||||
|
||||
// just incase of duplicate status messages, if status is same, return
|
||||
if(this.status == status) {
|
||||
return;
|
||||
}
|
||||
this.status = status;
|
||||
if(status == 0) {
|
||||
System.out.println(getCorespondent()+" is now offline.");
|
||||
} else {
|
||||
System.out.println(getCorespondent()+" is now online.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,242 @@
|
||||
package org.distributed;
|
||||
import java.util.Scanner;
|
||||
import javax.jms.*;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
|
||||
public class ChatHandler2 {
|
||||
|
||||
private String userId, corespondent;
|
||||
private Connection connection;
|
||||
private Session senderSession, receiverSession;
|
||||
private final String TAG = "Task2_queue_";
|
||||
private final String BROKER_URL = "tcp://localhost:61616";
|
||||
/**
|
||||
* Initialize JMS session
|
||||
* @throws JMSException
|
||||
*/
|
||||
public void initializeSession() throws JMSException {
|
||||
ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
|
||||
this.connection = factory.createConnection();
|
||||
this.connection.start();
|
||||
this.senderSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
this.receiverSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start conversation loop
|
||||
* @param sc
|
||||
* @throws JMSException
|
||||
*/
|
||||
public void startConversation(Scanner sc) throws JMSException {
|
||||
|
||||
// init receiving thread
|
||||
Thread t = setupReceiver(userId);
|
||||
|
||||
// Continuous input loop
|
||||
try {
|
||||
while (sc.hasNextLine()){
|
||||
|
||||
// handle user input and send messages
|
||||
// or if exit command, break loop
|
||||
if(!respond(sc)){
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error in continuous conversation: " + e.getMessage());
|
||||
} finally {
|
||||
t.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle user input and send messages
|
||||
* @param sc
|
||||
* @throws JMSException
|
||||
*/
|
||||
private boolean respond(Scanner sc) throws JMSException {
|
||||
String line = sc.nextLine();
|
||||
|
||||
// exit condition
|
||||
if (line.equalsIgnoreCase("exit")) {
|
||||
System.out.println("Exiting conversation...");
|
||||
return false;
|
||||
}
|
||||
|
||||
// Send message to receiver
|
||||
sendMessage(createProducer(corespondent), line);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
// Setup producer and consumer for a receiver
|
||||
private Thread setupReceiver(String receiver) throws JMSException {
|
||||
MessageConsumer contact = createConsumer();
|
||||
|
||||
// add consumer to contacts map
|
||||
return getThread(contact);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a producer for a given contact ID so we can send messages
|
||||
* @param id
|
||||
* @return MessageProducer
|
||||
* @throws JMSException
|
||||
*/
|
||||
private MessageProducer createProducer(String id) throws JMSException {
|
||||
String from = TAG + id;
|
||||
Queue prod = senderSession.createQueue(from);
|
||||
return senderSession.createProducer(prod);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Subscribe to topics meant for this user
|
||||
* @return MessageConsumer
|
||||
* @throws JMSException
|
||||
*/
|
||||
private MessageConsumer createConsumer() throws JMSException {
|
||||
String to = TAG + userId;
|
||||
Queue cons = receiverSession.createQueue(to);
|
||||
return receiverSession.createConsumer(cons);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a messag
|
||||
* @param producer
|
||||
* @param line
|
||||
* @throws JMSException
|
||||
*/
|
||||
private void sendMessage(MessageProducer producer, String line) throws JMSException {
|
||||
|
||||
TextMessage m = senderSession.createTextMessage(line);
|
||||
// add sender identity to message
|
||||
m.setStringProperty("sender", userId);
|
||||
producer.send(m);
|
||||
}
|
||||
|
||||
/**
|
||||
* Close session and connection
|
||||
* @throws JMSException
|
||||
*/
|
||||
public void closeSession() throws JMSException {
|
||||
if (senderSession != null) {
|
||||
senderSession.close();
|
||||
}
|
||||
if( receiverSession != null) {
|
||||
receiverSession.close();
|
||||
}
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a thread object to handle receiving messages
|
||||
* @param consumer
|
||||
* @return Thread
|
||||
*/
|
||||
private Thread getThread(MessageConsumer consumer) {
|
||||
|
||||
Thread receivingThread = new Thread(() -> {
|
||||
try {
|
||||
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
|
||||
// receive message with timeout
|
||||
Message msg = consumer.receive(100); // 100ms timeout
|
||||
if (msg instanceof TextMessage tm) {
|
||||
|
||||
// message contents
|
||||
String sender = tm.getStringProperty("sender");
|
||||
String body = tm.getText();
|
||||
|
||||
boolean isEmptyMessage = body.isBlank() || body.isEmpty();
|
||||
|
||||
// display only if not blocked and not empty
|
||||
if (!isEmptyMessage) {
|
||||
System.out.println("[" + sender + "]: " + body);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
} catch (javax.jms.JMSException e) {
|
||||
// Silently handle interruption - normal shutdown behavior
|
||||
if (!Thread.currentThread().isInterrupted()) {
|
||||
System.err.println("Error receiving message: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
receivingThread.start();
|
||||
// we are returning this so we can interrupt it later
|
||||
return receivingThread;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Prompt for user ID
|
||||
* @param sc
|
||||
*/
|
||||
public void promptForUser(Scanner sc){
|
||||
|
||||
System.out.print("Enter your user ID: ");
|
||||
setUserId(prompt(sc));
|
||||
// Instructions
|
||||
System.out.print("Welcome "+getUserId());
|
||||
System.out.println(" -> Type 'exit' to quit.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Prompt for correspondent ID
|
||||
* @param sc
|
||||
*/
|
||||
public void promptForCorespondent(Scanner sc){
|
||||
|
||||
System.out.print("Who do you want to chat with? ");
|
||||
String contact = prompt(sc);
|
||||
setCorespondent(contact);
|
||||
System.out.println("You can start with: " + getCorespondent());
|
||||
}
|
||||
|
||||
/**
|
||||
* Generic prompt method prompts for whatever we need
|
||||
* @param sc Scanner object
|
||||
* @return String
|
||||
*/
|
||||
private String prompt(Scanner sc) {
|
||||
|
||||
String user = "";
|
||||
|
||||
while (user.isEmpty()) {
|
||||
try {
|
||||
user = sc.next();
|
||||
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error reading correspondent ID: " + e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return user;
|
||||
}
|
||||
|
||||
// Setters and getters
|
||||
// user
|
||||
public void setUserId(String userId) {
|
||||
this.userId = userId;
|
||||
}
|
||||
|
||||
public String getUserId() {
|
||||
return userId;
|
||||
}
|
||||
|
||||
// correspondent
|
||||
public String getCorespondent() {
|
||||
return corespondent;
|
||||
}
|
||||
|
||||
public void setCorespondent(String corespondent) {
|
||||
this.corespondent = corespondent;
|
||||
}
|
||||
}
|
||||
@@ -9,8 +9,7 @@ public class ChatHandler {
|
||||
|
||||
private String userId, corespondent;
|
||||
private Connection connection;
|
||||
private Session /*session,*/ senderSession, receiverSession;
|
||||
private int status = 0; // 0: offline, 1: online
|
||||
private Session senderSession, receiverSession;
|
||||
private final Map<String, MessageProducer> blocked = new ConcurrentHashMap<>();
|
||||
private final String TAG = "Task3_queue_";
|
||||
private final String BROKER_URL = "tcp://localhost:61616";
|
||||
@@ -22,7 +21,6 @@ public class ChatHandler {
|
||||
ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
|
||||
this.connection = factory.createConnection();
|
||||
this.connection.start();
|
||||
//this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
this.senderSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
this.receiverSession = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
@@ -65,17 +63,18 @@ public class ChatHandler {
|
||||
// commands to block or unblock contact
|
||||
if(line.startsWith("-")){
|
||||
delegateBlockingManagement(sc,line);
|
||||
//return true;
|
||||
// handling edge case where instead of user, exit comand is given
|
||||
return !corespondent.equalsIgnoreCase("exit");
|
||||
}
|
||||
|
||||
if (line.equalsIgnoreCase("exit")) {
|
||||
System.out.println("Exiting conversation...");
|
||||
//return false;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Send message to receiver
|
||||
sendMessage(createProducer(corespondent), line);
|
||||
return !line.equalsIgnoreCase("exit");//true;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -88,8 +87,17 @@ public class ChatHandler {
|
||||
|
||||
// block command is -b followed by contact id
|
||||
if(line.startsWith("-b")){
|
||||
blockContact(line);
|
||||
promptForCorespondent(sc);
|
||||
try {
|
||||
boolean wasCorespondent = blockContact(line);
|
||||
// if we blocked current correspondent, prompt for new one
|
||||
if(wasCorespondent){
|
||||
System.out.println("You have blocked your current correspondent. Please choose a new one.");
|
||||
promptForCorespondent(sc);
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
System.err.println(e.getMessage());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// unblock command is -u followed by contact id
|
||||
@@ -116,7 +124,6 @@ public class ChatHandler {
|
||||
MessageConsumer contact = createConsumer();
|
||||
|
||||
// add consumer to contacts map
|
||||
//contacts.put(receiver, contact);
|
||||
return getThread(contact);
|
||||
}
|
||||
|
||||
@@ -136,21 +143,32 @@ public class ChatHandler {
|
||||
* Block a contact
|
||||
* @param command
|
||||
*/
|
||||
private void blockContact(String command) {
|
||||
private boolean blockContact(String command) throws JMSException {
|
||||
// remove the command part to get the contact id
|
||||
String contact = command.replace("-b", "").trim();
|
||||
try {
|
||||
MessageProducer ignored = createProducer(contact);
|
||||
// remove from contacts map
|
||||
// and close his ass
|
||||
if(ignored != null){
|
||||
ignored.close();
|
||||
blocked.put(contact, ignored);
|
||||
System.out.println(contact + " has been blocked.");
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
System.err.println("Error unblocking consumer: " + e.getMessage());
|
||||
if(contact.isEmpty()){
|
||||
throw new JMSException("No contact specified to block.");
|
||||
}
|
||||
|
||||
boolean isCurrentCorespondent = contact.equals(getCorespondent());
|
||||
MessageProducer ignored = createProducer(contact);
|
||||
|
||||
// check if already blocked
|
||||
if(blocked.containsKey(contact)){
|
||||
System.out.println(contact + " is already blocked.");
|
||||
return isCurrentCorespondent;
|
||||
}
|
||||
// remove from contacts map
|
||||
// and close his ass
|
||||
if(ignored != null){
|
||||
ignored.close();
|
||||
blocked.put(contact, ignored);
|
||||
System.out.println(contact + " has been blocked.");
|
||||
return isCurrentCorespondent;
|
||||
}else{
|
||||
throw new JMSException("Could not block contact: " + contact);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -219,6 +237,7 @@ public class ChatHandler {
|
||||
|
||||
/**
|
||||
* Returns a thread object to handle receiving messages
|
||||
* checks if sender is blocked before displaying
|
||||
* @param consumer
|
||||
* @return Thread
|
||||
*/
|
||||
@@ -232,13 +251,18 @@ public class ChatHandler {
|
||||
// receive message with timeout
|
||||
Message msg = consumer.receive(100); // 100ms timeout
|
||||
if (msg instanceof TextMessage tm) {
|
||||
// handle received message
|
||||
String sender = tm.getStringProperty("sender");
|
||||
|
||||
// display message if sender is not blocked
|
||||
if (!isBlocked(sender)) {
|
||||
String body = tm.getText();
|
||||
displayMessage(sender, body);
|
||||
// message contents
|
||||
String sender = tm.getStringProperty("sender");
|
||||
String body = tm.getText();
|
||||
|
||||
// display conditions
|
||||
boolean isBlocked = blocked.containsKey(sender);
|
||||
boolean isEmptyMessage = body.isBlank() || body.isEmpty();
|
||||
|
||||
// display only if not blocked and not empty
|
||||
if (!isBlocked && !isEmptyMessage) {
|
||||
System.out.println("[" + sender + "]: " + body);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -258,47 +282,6 @@ public class ChatHandler {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Display received message
|
||||
* Checks if sender is blocked
|
||||
* checks if message is empty for status update
|
||||
* checks for exit condition
|
||||
* @param tm
|
||||
* @throws JMSException
|
||||
*/
|
||||
|
||||
private void displayMessage(String sender, String body) {
|
||||
|
||||
|
||||
// Exit condition because correspondent went offline
|
||||
if(body.equalsIgnoreCase("exit")) {
|
||||
setStatus(0);
|
||||
return;
|
||||
}
|
||||
|
||||
// display only if not empty
|
||||
if(!body.isBlank() || !body.isEmpty()) {
|
||||
// Status handling for online/offline
|
||||
System.out.println("[" + sender + "]: " + body);
|
||||
}
|
||||
setStatus(1);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if sender is blocked
|
||||
* if sender is blocked, print info message
|
||||
* @param sender
|
||||
* @return
|
||||
*/
|
||||
private boolean isBlocked(String sender) {
|
||||
boolean isBlocked = blocked.containsKey(sender);
|
||||
if(isBlocked){
|
||||
System.out.println(" >> Message from '"+sender+"' ignored. because user is blocked.");
|
||||
}
|
||||
return isBlocked;
|
||||
}
|
||||
|
||||
/**
|
||||
* Prompt for user ID
|
||||
* @param sc
|
||||
@@ -306,7 +289,6 @@ public class ChatHandler {
|
||||
public void promptForUser(Scanner sc){
|
||||
|
||||
System.out.print("Enter your user ID: ");
|
||||
//userId = prompt(sc);
|
||||
setUserId(prompt(sc));
|
||||
// Instructions
|
||||
System.out.print("Welcome "+getUserId());
|
||||
@@ -327,6 +309,7 @@ public class ChatHandler {
|
||||
System.out.println(contact + " is currently blocked. Unblock to chat.");
|
||||
return;
|
||||
}
|
||||
System.out.println("You can start with: " + getCorespondent());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -367,30 +350,5 @@ public class ChatHandler {
|
||||
|
||||
public void setCorespondent(String corespondent) {
|
||||
this.corespondent = corespondent;
|
||||
System.out.println("You can chat with: " + corespondent);
|
||||
// reset old status when new corepondent is set
|
||||
setStatus(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates corespondents status
|
||||
* @param status
|
||||
*/
|
||||
private void setStatus(int status) {
|
||||
|
||||
// just in case of duplicate status messages, if status is same, return
|
||||
if(this.status == status) {
|
||||
return;
|
||||
}
|
||||
|
||||
// update status
|
||||
this.status = status;
|
||||
|
||||
// and print accordingly
|
||||
if(status == 0) {
|
||||
System.out.println(getCorespondent()+" is now offline.");
|
||||
} else {
|
||||
System.out.println(getCorespondent()+" is now online.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user