拿來測試 UDP 的 UI
public class TestMain {
private static final int MAX_PACKET_SIZE = 8192;
public static void main(String[] args) {
TestMain test = new TestMain();
test.test();
}
private void test() {
new UDPReceiverThread(1234).start();
new UDPSenderThread(1234).start();
}
class UDPReceiverThread extends Thread {
private int listenport;
public UDPReceiverThread(int listenport) {
this.listenport = listenport;
}
@Override
public void run() {
try {
final AtomicInteger receivedPackets = new AtomicInteger();
final JFrame f = new JFrame("Receiver : listen to port " + listenport);
f.setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE);
f.getContentPane().setLayout(new BorderLayout());
final JPanel p = new JPanel(new BorderLayout());
f.getContentPane().add( p, BorderLayout.CENTER );
final JTextArea status = new JTextArea(30, 30);
p.add(new JScrollPane(status), BorderLayout.CENTER);
JButton resetBtn = new JButton("Reset");
resetBtn.addActionListener(new ActionListener(){
public void actionPerformed(ActionEvent e) {
status.setText("");
receivedPackets.set(0);
}
});
p.add(resetBtn, BorderLayout.NORTH);
f.pack();
f.setVisible(true);
byte[] data = new byte[MAX_PACKET_SIZE];
final DatagramSocket receiver = new DatagramSocket(listenport);
DatagramPacket packet = new DatagramPacket(data, MAX_PACKET_SIZE);
f.addWindowListener(new WindowAdapter() {
@Override
public void windowClosing(WindowEvent e) {
receiver.close();
}
});
while (true) {
receiver.receive(packet);
status.insert("Receive the " + receivedPackets.incrementAndGet() + " packet :" + new String(packet.getData(), 0, packet.getLength()) + " \n", 0);
packet.setLength( MAX_PACKET_SIZE );
}
} catch (Exception ex) {
Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
class UDPSenderThread extends Thread {
private int targetport;
public UDPSenderThread(int targetport) {
this.targetport = targetport;
}
@Override
public void run() {
try {
final DatagramSocket sender = new DatagramSocket();
final JFrame f = new JFrame("sender: send to port " + targetport);
f.getContentPane().setLayout(new BorderLayout());
f.setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE);
JPanel p = new JPanel(new BorderLayout());
final JFormattedTextField field = new JFormattedTextField(NumberFormat.getNumberInstance());
field.setColumns(10);
final JTextArea status = new JTextArea(30, 30);
field.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent e) {
int packets = ((Number) field.getValue()).intValue();
if ( packets < 0 ) {
JOptionPane.showMessageDialog(f, "The input number must be positive!");
return;
}
try {
status.setText("");
InetAddress receiverHost = InetAddress.getByName("localhost");
DatagramPacket packet = new DatagramPacket(new byte[0], 0, receiverHost, targetport);
for ( int i = 0; i < packets; i++ ) {
byte[] data = String.valueOf(i+1).getBytes();
packet.setData( data );
packet.setLength( data.length );
status.append("Send the " + (i+1) + " packet \n");
sender.send(packet);
}
} catch (Exception ex) {
Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
JOptionPane.showMessageDialog(f, ex.getMessage());
}
}
});
f.getContentPane().add(p, BorderLayout.CENTER);
p.add( field, BorderLayout.NORTH );
p.add( new JScrollPane(status), BorderLayout.CENTER );
f.addWindowListener(new WindowAdapter() {
@Override
public void windowClosing(WindowEvent e) {
System.out.println("Close sender..");
sender.close();
}
});
f.pack();
f.setVisible(true);
} catch (Exception ex) {
Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
Posted at 07:40下午 八月 29, 2010 by shooeugenesea in Java | 迴響[0]
傳送與接收 UDP 封包
public class TestMain {
private static final int MAX_PACKET_SIZE = 8192;
public static void main(String[] args) {
TestMain test = new TestMain();
test.test();
}
private void test() {
new UDPReceiverThread().start();
new UDPSenderThread().start();
}
class UDPReceiverThread extends Thread {
@Override
public void run() {
try {
DatagramSocket receiver = new DatagramSocket(1234);
DatagramPacket packet = new DatagramPacket(new byte[MAX_PACKET_SIZE], MAX_PACKET_SIZE);
receiver.receive(packet);
System.out.println("Receive data : " + new String(packet.getData(), 0, packet.getLength()));
receiver.close();
} catch (Exception ex) {
Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
class UDPSenderThread extends Thread {
@Override
public void run() {
try {
DatagramSocket sender = new DatagramSocket();
byte[] data = "Hello".getBytes();
sender.send(new DatagramPacket(data, data.length, InetAddress.getByName("localhost"), 1234));
sender.close();
} catch (Exception ex) {
Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
Posted at 04:35下午 八月 29, 2010 by shooeugenesea in Java | 迴響[0]
JMS Consumer and Producer, with swing
public class TestMain {
public static final String QUEUE_NAME = "TestQueue";
private ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:61616");
public static void main(String[] args) {
try {
TestMain test = new TestMain();
test.test();
} catch (Exception ex) {
Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
}
}
private void test() throws Exception {
try {
new ProducerThread().start();
new ProducerThread().start();
new ProducerThread().start();
new ConsumerThread().start();
new ConsumerThread().start();
} catch (Exception ex) {
Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
}
}
private void startAndCloseResourcesFinally(String title, JComponent c, final Session session, final Connection conn) {
try {
JFrame f = new JFrame(title);
conn.start();
f.addWindowStateListener(new WindowAdapter() {
@Override
public void windowClosed(WindowEvent e) {
try {
session.close();
conn.close();
} catch (Exception ex) {
Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
}
}
});
f.setDefaultCloseOperation(WindowConstants.EXIT_ON_CLOSE);
f.getContentPane().setLayout(new BorderLayout());
f.getContentPane().add(c, BorderLayout.CENTER);
f.pack();
f.setVisible(true);
} catch (JMSException ex) {
Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
}
}
private class ConsumerThread extends Thread {
@Override
public void run() {
try {
Connection conn = factory.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(dest);
final JTextArea t = new JTextArea(15, 40);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
try {
if (TextMessage.class.isInstance(msg)) {
TextMessage txtMsg = (TextMessage) msg;
t.insert(txtMsg.getText() + "\n", 0);
}
} catch (Exception ex) {
Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
}
}
});
startAndCloseResourcesFinally("Consumer", new JScrollPane(t), session, conn);
} catch (Exception ex) {
Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
private class ProducerThread extends Thread {
@Override
public void run() {
try {
Connection conn = factory.createConnection();
final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = session.createQueue(QUEUE_NAME);
final MessageProducer producer = session.createProducer(dest);
final JTextField t = new JTextField(20);
JPanel p = new JPanel(new FlowLayout());
p.add(t);
t.addKeyListener(new KeyAdapter() {
@Override
public void keyPressed(KeyEvent e) {
if (e.getKeyCode() == KeyEvent.VK_ENTER) {
try {
producer.send(session.createTextMessage(t.getText()));
} catch (JMSException ex) {
Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
} finally {
t.setText("");
}
}
}
});
startAndCloseResourcesFinally("Producer", p, session, conn);
} catch (Exception ex) {
Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
Posted at 03:08上午 八月 10, 2010 by shooeugenesea in Java | 迴響[0]
JMS 的 Hello World
private void test() throws Exception {
ConnectionFactory f = new ActiveMQConnectionFactory("tcp://0.0.0.0:61616");
Connection conn = f.createConnection();
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
Topic dest = session.createTopic("TestTopic"); // create topic
MessageProducer producer = session.createProducer(dest);
MessageConsumer consumer = session.createConsumer(dest);
final CountDownLatch l = new CountDownLatch(1);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
try {
if ( TextMessage.class.isInstance(msg) ) {
TextMessage textMsg = (TextMessage) msg;
System.out.println(textMsg.getText());
} else {
System.out.println(msg);
}
} catch (Exception ex) {
Logger.getLogger(TestMain.class.getName()).log(Level.SEVERE, null, ex);
} finally {
l.countDown();
}
}
});
producer.send(session.createTextMessage("Hello World"));
conn.start();
session.commit();
l.await(1, TimeUnit.MINUTES);
session.close();
conn.stop();
conn.close();
}
Posted at 02:39上午 八月 09, 2010 by shooeugenesea in Java | 迴響[0]
nio selector

ServerSocketChannel selectableChannel = ServerSocketChannel.open();
ServerSocket serverSocket = selectableChannel.socket();
serverSocket.bind( new InetSocketAddress(1234) );
selectableChannel.configureBlocking(false);
Selector testSelector = Selector.open();
boolean validOpsContainsAccept = (selectableChannel.validOps() & SelectionKey.OP_ACCEPT) != 0;
if ( validOpsContainsAccept ) {
SelectionKey createdKey = selectableChannel.register(testSelector, SelectionKey.OP_ACCEPT);
// 這裡可以顯示 SelectionKey 保存了 SelectableChannel 與 Selector 之間的關係
System.out.println( createdKey.channel() == selectableChannel ); // true
System.out.println( createdKey.selector() == testSelector ); // true
}
ServerSocketChannel selectableChannel = ServerSocketChannel.open();
ServerSocket serverSocket = selectableChannel.socket();
serverSocket.bind( new InetSocketAddress(1234) );
selectableChannel.configureBlocking(false);
Selector testSelector = Selector.open();
selectableChannel.register(testSelector, SelectionKey.OP_ACCEPT);
testSelector.select();
Set<SelectionKey> selectedKeys = testSelector.selectedKeys();
for ( Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {
SelectionKey key = i.next();
key.cancel();
// 如果打算在這重新註冊一次就會有 exception
selectableChannel.register(testSelector, SelectionKey.OP_ACCEPT); // throws exception
}
ServerSocketChannel selectableChannel = ServerSocketChannel.open();
ServerSocket serverSocket = selectableChannel.socket();
serverSocket.bind( new InetSocketAddress(1234) );
selectableChannel.configureBlocking(false);
Selector testSelector = Selector.open();
selectableChannel.register(testSelector, SelectionKey.OP_ACCEPT);
testSelector.select();
Set<SelectionKey> selectedKeys = testSelector.selectedKeys();
for ( Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {
SelectionKey key = i.next();
System.out.println( key.isValid() ); //true
key.cancel();
System.out.println( key.isValid() ); //false
}
Posted at 08:59上午 八月 04, 2010 by shooeugenesea in Java | 迴響[0]
nio 使用 selector 時, read 之後要判斷 EOF
ByteBuffer buffer = ByteBuffer.allocate(2);
ServerSocketChannel selectableChannel = ServerSocketChannel.open();
ServerSocket serverSocket = selectableChannel.socket();
serverSocket.bind( new InetSocketAddress(1234) );
selectableChannel.configureBlocking(false);
Selector testSelector = Selector.open();
selectableChannel.register(testSelector, SelectionKey.OP_ACCEPT);
while (true) {
System.out.println("selected key number : " + testSelector.select());
for ( Iterator<SelectionKey> i = testSelector.selectedKeys().iterator(); i.hasNext(); ) {
SelectionKey key = i.next();
if ( key.isAcceptable() ) {
System.out.println("accept");
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(testSelector, SelectionKey.OP_READ);
}
if ( key.isReadable() ) {
System.out.println("read");
SocketChannel socketChannel = (SocketChannel) key.channel();
buffer.clear();
socketChannel.read(buffer);
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer));
}
i.remove();
}
}
後來就看一下到底是為什麼, 結果很單純就只是沒判斷 socketChannel.read 回傳的數字是多少而已.
ByteBuffer buffer = ByteBuffer.allocate(2);
ServerSocketChannel selectableChannel = ServerSocketChannel.open();
ServerSocket serverSocket = selectableChannel.socket();
serverSocket.bind( new InetSocketAddress(1234) );
selectableChannel.configureBlocking(false);
Selector testSelector = Selector.open();
selectableChannel.register(testSelector, SelectionKey.OP_ACCEPT);
while (true) {
System.out.println("selected key number : " + testSelector.select());
for ( Iterator<SelectionKey> i = testSelector.selectedKeys().iterator(); i.hasNext(); ) {
SelectionKey key = i.next();
if ( key.isAcceptable() ) {
System.out.println("accept");
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(testSelector, SelectionKey.OP_READ);
}
if ( key.isReadable() ) {
System.out.println("read");
SocketChannel socketChannel = (SocketChannel) key.channel();
buffer.clear();
if (socketChannel.read(buffer) != -1) {
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer));
} else {
System.out.println("client socket is closed");
socketChannel.close();
}
}
i.remove();
}
}
重點就是這段
if (socketChannel.read(buffer) != -1) {
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer));
} else {
System.out.println("client socket is closed");
socketChannel.close();
}
判斷 -1 就是 client close 了, 很單純, 但其實疑惑了我好一陣子.Posted at 02:46上午 七月 29, 2010 by shooeugenesea in Java | 迴響[0]
載入大檔不 OutOfMemory - nio MappedByteBuffer
public class TestNIO {
public static void main(String[] params) {
TestNIO test = new TestNIO();
try {
test.test();
} catch (Exception e) {
e.printStackTrace();
}
}
private int kb = 1024;
private int mb = 1024 * kb;
private int filesize = 10 * mb;
private void test() throws Exception {
System.out.println( "new way:" + testMappedByteBuffer() ); // spned 2938 millis to write 10 MB
System.out.println( "old way1:" + testRandomAccessFile() ); // 5984 millis to write 10 MB
System.out.println( "old way2:" + testFileOutputStream() ); // 8875 millis to write 10 MB
}
private long testFileOutputStream() throws Exception {
long start = System.currentTimeMillis();
FileOutputStream fout = new FileOutputStream("C:/testOutputStream.txt");
long writesize = 0;
byte[] toWrite = null;
while ( (writesize += (toWrite = (UUID.randomUUID().toString() + "\r\n").getBytes()).length) < filesize ) {
fout.write( toWrite );
}
fout.close();
System.out.println("finish");
return System.currentTimeMillis() - start;
}
private long testRandomAccessFile() throws Exception {
long start = System.currentTimeMillis();
RandomAccessFile fout = new RandomAccessFile("C:/test.txt", "rw");
long writesize = 0;
byte[] toWrite = null;
while ( (writesize += (toWrite = (UUID.randomUUID().toString() + "\r\n").getBytes()).length) < filesize ) {
fout.write( toWrite );
}
fout.close();
System.out.println("finish");
return System.currentTimeMillis() - start;
}
private long testMappedByteBuffer() throws Exception {
long start = System.currentTimeMillis();
RandomAccessFile file = new RandomAccessFile("C:/test.txt", "rw");
FileChannel fc = file.getChannel();
MappedByteBuffer buffer = fc.map(MapMode.READ_WRITE, 0, filesize);
byte[] toWrite = null;
while (buffer.remaining() > (toWrite = (UUID.randomUUID().toString() + "\r\n").getBytes()).length) {
buffer.put( toWrite );
}
fc.close();
file.close();
System.out.println("finish");
return System.currentTimeMillis() - start;
}
}
Posted at 02:59上午 六月 24, 2010 by shooeugenesea in Java | 迴響[0]
nio 在檔案讀寫文字 header
public class TestNIO {
private static final int HEADER_SIZE = 1000;
public static void main(String[] params) {
TestNIO test = new TestNIO();
try {
test.test();
} catch (Exception e) {
e.printStackTrace();
}
}
private void test() throws Exception {
prepareImageFile();
InputStream imageInputStream = readImageInputStreamFromSomewhere();
writeImageWithOwnerInfo( imageInputStream );
decodeImage();
}
private void decodeImage() throws Exception {
FileChannel fc = new FileInputStream("C:/test.ejpg").getChannel();
ByteBuffer ownerInfo = ByteBuffer.allocate( HEADER_SIZE );
fc.read( ownerInfo );
// rewind to print header
ownerInfo.rewind();
System.out.println( "owner:" + Charset.forName("UTF-8").decode( ownerInfo ) );
FileChannel fout = new FileOutputStream("C:/testdecode.jpg").getChannel();
fc.transferTo(HEADER_SIZE, fc.size()-HEADER_SIZE, fout);
// close resources
fc.close();
fout.close();
}
private void writeImageWithOwnerInfo(InputStream in) throws Exception {
FileChannel fc = new FileOutputStream("C:/test.ejpg").getChannel();
// Can write anything in the header
ByteBuffer ownerInfo = ByteBuffer.allocate( HEADER_SIZE );
ownerInfo.put( "中文名".getBytes("UTF-8") );
// rewind to write in FileChannel
ownerInfo.rewind();
fc.write(ownerInfo);
// Channels can convert InputStream into ReadableByteChannel
ReadableByteChannel channel = Channels.newChannel(in);
// Write stream after header
fc.transferFrom( channel, HEADER_SIZE, in.available());
// close resources
channel.close();
fc.close();
}
private InputStream readImageInputStreamFromSomewhere() throws Exception {
return new FileInputStream("C:/test.jpg");
}
private void prepareImageFile() throws Exception {
BufferedImage bimg = new BufferedImage( 100, 100, BufferedImage.TYPE_INT_ARGB);
Graphics2D g2 = bimg.createGraphics();
g2.setColor( Color.YELLOW );
g2.drawString("測試中文", 10, 10);
g2.dispose();
ImageIO.write(bimg, "jpg", new FileOutputStream("C:/test.jpg"));
}
}
Posted at 02:33上午 六月 22, 2010 by shooeugenesea in Java | 迴響[0]
simple JAXB example
this is a very simple example for recall
1. prepare class Person
@XmlRootElement
public class Person {
private String name = "";
// getter and setter
}
2. use JAXBContext
private void test() throws JAXBException {
Person inputP = new Person();
inputP.setName("TEST");
JAXBContext c = JAXBContext.newInstance(Person.class);
// will print <?xml version="1.0" encoding="UTF-8" standalone="yes"?><person><name>TEST</name></person>
c.createMarshaller().marshal(inputP, System.out);
Person outputP = (Person) c.createUnmarshaller().unmarshal( IOUtils.toInputStream( "<person><name>TEST</name></person>" ) );
System.out.println(outputP.getName()); // print TEST
}
Posted at 02:46下午 六月 18, 2010 by shooeugenesea in Java | 迴響[0]
用 FileChannel 分割與合併檔案
public class TestNIO {
public static void main(String[] params) {
TestNIO test = new TestNIO();
try {
test.test();
} catch (Exception e) {
e.printStackTrace();
}
}
private void test() throws Exception {
separateToTmpFiles( 10 );
combineSeparatedTmpFiles( 10 );
}
private void combineSeparatedTmpFiles(int tmpFileCount) throws Exception {
FileChannel fout = new FileOutputStream("D:/testnio/target.rmvb").getChannel();
for ( int i = 0; i <= tmpFileCount; i++ ) {
FileChannel fin = new FileInputStream( tmpFilePath(i) ).getChannel();
fout.transferFrom(fin, fout.size(), fin.size());
fin.close();
}
fout.close();
}
private void separateToTmpFiles(int tmpFileCount) throws Exception {
FileChannel fin = new FileInputStream("D:/testnio/source.rmvb").getChannel();
long sourceFileSize = fin.size();
long perTmpFileSize = (long) (sourceFileSize / tmpFileCount);
long transferredSize = 0;
for ( int i = 0; i <= tmpFileCount; i++ ) {
FileChannel fout = new FileOutputStream(tmpFilePath(i)).getChannel();
long notTransferredSize = sourceFileSize - perTmpFileSize;
boolean finalLoop = notTransferredSize < perTmpFileSize;
if ( finalLoop ) {
fin.transferTo(transferredSize, notTransferredSize, fout);
} else {
fin.transferTo(transferredSize, perTmpFileSize, fout);
transferredSize += perTmpFileSize;
}
fout.close();
}
fin.close();
}
private String tmpFilePath(int index) {
return "D:/testnio/testfile.tmp" + index;
}
}
Posted at 12:49上午 六月 14, 2010 by shooeugenesea in Java | 迴響[0]
nio simple echo server
public class TestNIO {
public static void main(String[] params) {
TestNIO test = new TestNIO();
try {
test.test();
} catch (Exception e) {
e.printStackTrace();
}
}
private void test() throws Exception {
new EchoServerThread().start();
TimeUnit.SECONDS.sleep(1);
for ( int i = 0; i < 100; i++ ) {
new EchoClientThread(i).start();
}
}
private static class EchoClientThread extends Thread {
public EchoClientThread(int index) {
super("client-" + index);
}
@Override
public void run() {
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final Socket socket = new Socket("localhost", 1234);
new Thread() {
public void run() {
try {
InputStream in = socket.getInputStream();
int readCount = 0;
byte[] data = new byte[1024];
while ( (readCount = in.read(data)) != -1 ) {
System.out.println( getName() + " get response : " + new String(data, 0, readCount) );
}
in.close();
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
PrintStream out = new PrintStream(socket.getOutputStream());
out.println("ABCDEFGHIJKLMNOPQRSTUVWXYZ\n");
out.flush();
countDownLatch.await();
System.out.println(getName() + " close connection");
out.close();
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static class EchoServerThread extends Thread {
public EchoServerThread() {
super("[server]");
}
@Override
public void run() {
try {
int closeCounts = 0;
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind( new InetSocketAddress(1234) );
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
System.out.println( "rest keys " + selector.select() );
Set<SelectionKey> keys = selector.selectedKeys();
for ( Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
SelectionKey key = i.next();
i.remove();
if ( key.isAcceptable() ) {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
SelectionKey readKey = channel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
readKey.attach( ByteBuffer.allocate(10) );
}
if ( key.isReadable() ) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
channel.read( buffer );
}
if ( key.isWritable() ) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.flip();
channel.write( buffer );
buffer.compact();
if ( new String(buffer.array()).contains("\n") ) {
key.cancel();
key.channel().close();
System.out.println("close count = " + closeCounts++);
}
}
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
Posted at 07:14下午 六月 13, 2010 by shooeugenesea in Java | 迴響[0]
nio simple time server
public class TestNIO {
public static void main(String[] params) {
TestNIO test = new TestNIO();
test.test();
}
private void test() {
long start = System.currentTimeMillis();
new NIOTimeServer(1234).start();
// new OldTimeServer(1234).start();
int clientCount = 10000;
CountDownLatch latch = new CountDownLatch(clientCount);
for ( int i = 0; i < clientCount; i++ ) {
new TimeClient(i, "localhost", 1234, latch).start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long stop = System.currentTimeMillis();
System.out.println( "spent " + ((stop - start) / 1000) + " seconds." );
}
private static class TimeClient extends Thread {
private final CountDownLatch latch;
private final String timeServerHost;
private final int timeServerPort;
public TimeClient(int clientNumber, String timeServerHost, int timeServerPort, CountDownLatch latch) {
super( "[client-" + clientNumber + "]" );
this.timeServerHost = timeServerHost;
this.timeServerPort = timeServerPort;
this.latch = latch;
}
@Override
public void run() {
Socket socket = null;
BufferedReader reader = null;
try {
socket = new Socket(timeServerHost, timeServerPort);
reader = new BufferedReader( new InputStreamReader(socket.getInputStream()) );
println( reader.readLine() );
} catch (Exception ex) {
ex.printStackTrace();
} finally {
try {
println( "close" );
if ( socket != null ) {
socket.close();
}
if ( reader != null ) {
reader.close();
}
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void println(String msg) {
System.out.println( getName() + ":" + msg );
}
}
private static class OldTimeServer extends Thread {
private final int port;
public OldTimeServer(int port) {
super("[server-" + port + "]");
this.port = port;
}
@Override
public void run() {
ServerSocket server = null;
try {
server = new ServerSocket(port);
while (true) {
Socket socket = server.accept();
println( "connection established " + socket );
OutputStream out = new BufferedOutputStream(socket.getOutputStream());
out.write( (new Date().toString() + "\r\n").getBytes() );
out.flush();
out.close();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if ( server != null ) {
server.close();
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
private void println(String msg) {
System.out.println( getName() + ":" + msg );
}
}
private static class NIOTimeServer extends Thread {
private final int port;
public NIOTimeServer(int port) {
super("[server-" + port + "]");
this.port = port;
}
@Override
public void run() {
try {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket serverSocket = serverChannel.socket();
serverSocket.bind( new InetSocketAddress(port) );
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
for ( Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
SelectionKey key = i.next();
try {
if ( key.isAcceptable() ) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false);
println("connection established " + channel);
SelectionKey writeKey = channel.register(selector, SelectionKey.OP_WRITE);
ByteBuffer buffer = ByteBuffer.wrap( (String.valueOf(new Date()) + "\r\n").getBytes() );
writeKey.attach( buffer );
} else if ( key.isWritable() ) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
channel.write( buffer );
channel.close();
}
i.remove();
} catch (Exception ex) {
ex.printStackTrace();
key.cancel();
}
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
private void println(String msg) {
System.out.println( getName() + ":" + msg );
}
}
}
Posted at 05:10下午 五月 30, 2010 by shooeugenesea in Java | 迴響[2]
使能先作別的事情, 等需要時再取計算結果: FutureTask
public class TestFuture {
public static void main(String[] args) {
TestFuture test = new TestFuture();
test.test();
}
private void test() {
FutureTask task = new FutureTask(new Callable() {
@Override
public Result call() throws Exception {
System.out.println("call..");
TimeUnit.SECONDS.sleep(3);
Result result = new Result();
result.setName( String.valueOf(System.currentTimeMillis()) );
System.out.println("call finished");
return result;
}
});
ExecutorService service = Executors.newFixedThreadPool(1);
service.execute(task);
try {
// do something
String prefix = "test-";
TimeUnit.SECONDS.sleep(2);
// finally we need Future result
System.out.println(prefix + task.get().getName());
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println("main finished");
service.shutdown();
}
private static class Result {
private String name = "";
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
Posted at 07:02下午 五月 11, 2010 by shooeugenesea in Java | 迴響[0]
等所有 thread 事情做完再繼續: CountDownLatch
public class TestCountDownLatch {
public static void main(String[] args) {
TestCountDownLatch test = new TestCountDownLatch();
test.test();
}
private void test() {
CountDownLatch latch = new CountDownLatch(5);
new SleepThread("1", latch, 1000).start();
new SleepThread("2", latch, 2000).start();
new SleepThread("3", latch, 3000).start();
new SleepThread("4", latch, 4000).start();
new SleepThread("5", latch, 5000).start();
try {
latch.await();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println("all waked up");
}
private static class SleepThread extends Thread {
private long sleepTime = 0;
private final CountDownLatch latch;
public SleepThread(String name, CountDownLatch latch, long sleepTime) {
super(name);
this.latch = latch;
this.sleepTime = sleepTime;
}
@Override
public void run() {
try {
System.out.println(getName() + " sleep...");
Thread.sleep(sleepTime);
System.out.println(getName() + " awake");
latch.countDown();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
Posted at 01:10上午 五月 10, 2010 by shooeugenesea in Java | 迴響[0]
非同步化交換兩個 thread 的物件: Exchanger
public class TestExchanger {
public static void main(String[] args) {
TestExchanger test = new TestExchanger();
test.test();
}
private void test() {
final Exchanger<IntValue> exchanger = new Exchanger<IntValue>();
new Thread() {
@Override
public void run() {
try {
System.out.println("exchange min");
IntValue val = exchanger.exchange(new IntValue(Integer.MIN_VALUE));
System.out.println(val.getVal() == Integer.MAX_VALUE);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}.start();
new Thread() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("exchange max");
IntValue val = exchanger.exchange(new IntValue(Integer.MAX_VALUE));
System.out.println(val.getVal() == Integer.MIN_VALUE);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}.start();
}
private static class IntValue {
private final int val;
public IntValue(int val) {
this.val = val;
}
public int getVal() {
return val;
}
}
}
Posted at 02:40上午 五月 07, 2010 by shooeugenesea in Java | 迴響[0]