一個典型的單線程伺服器範例如下:
while (true) { Socket socket = null; try { // 接收客户连接 socket = serverSocket.accept(); // 从socket中获得输入流与输出流,与客户通信 ... } catch(IOException e) { e.printStackTrace() } finally { try { if(socket != null) { // 断开连接 socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
服務端接收到一個客戶連接,就與客戶進行通信,通信完畢後斷開連接,然後接收下一個客戶連接,假如同時有多個客戶連線請求這些客戶就必須排隊等候。如果長時間讓客戶等待,就會使網站失去信譽,從而降低訪問量。
一般用並發效能來衡量一個伺服器同時回應多個客戶的能力,一個具有好的並發效能的伺服器,必須符合兩個條件:
能同時接收並處理多個客戶連線
對於每個客戶,都會快速給予回應
用多個執行緒來同時為多個客戶提供服務,這是提高伺服器並發效能的最常用的手段,一般有三種方式:
為每個客戶分配一個工作執行緒
建立一個執行緒池,由其中的工作執行緒來為客戶服務
利用Java 類別庫中現成的執行緒池,由它的工作執行緒來為客戶服務
伺服器的主線程負責接收客戶的連接,每次接收到一個客戶連接,都會創建一個工作線程,由它負責與客戶的通訊
public class EchoServer { private int port = 8000; private ServerSocket serverSocket; public EchoServer() throws IOException { serverSocket = new ServerSocket(port); System.out.println("服务器启动"); } public void service() { while(true) { Socket socket = null; try { // 接教客户连接 socket = serverSocket.accept(); // 创建一个工作线程 Thread workThread = new Thread(new Handler(socket)); // 启动工作线程 workThread.start(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String args[])throws TOException { new EchoServer().service(); } // 负责与单个客户的通信 class Handler implements Runnable { private Socket socket; pub1ic Handler(Socket socket) { this.socket = socket; } private PrintWriter getWriter(Socket socket) throws IOException {...} private BufferedReader getReader(Socket socket) throws IOException {...} public String echo(String msg) {...} public void run() { try { System.out.println("New connection accepted" + socket.getInetAddress() + ":" + socket.getPort()); BufferedReader br = getReader(socket); PrintWriter pw = getWriter(socket); String msg = null; // 接收和发送数据,直到通信结束 while ((msg = br.readLine()) != null) { System.out.println("from "+ socket.getInetAddress() + ":" + socket.getPort() + ">" + msg); pw.println(echo(msg)); if (msg.equals("bye")) break; } } catch (IOException e) { e.printStackTrace(); } finally { try { // 断开连接 if(socket != nulll) socket.close(); } catch (IOException e) { e,printStackTrace(); } } } } }
上一種實作方式有以下不足之處:
public class ThreadPool extends ThreadGroup { // 线程池是否关闭 private boolean isClosed = false; // 表示工作队列 private LinkedList<Runnable> workQueue; // 表示线程池ID private static int threadPoolID; // 表示工作线程ID // poolSize 指定线程池中的工作线程数目 public ThreadPool(int poolSize) { super("ThreadPool-"+ (threadPoolID++)); setDaemon(true); // 创建工作队列 workQueue = new LinkedList<Runnable>(); for (int i = 0; i < poolSize; i++) { // 创建并启动工作线程 new WorkThread().start(); } } /** * 向工作队列中加入一个新任务,由工作线程去执行任务 */ public synchronized void execute(Runnable tank) { // 线程池被关则抛出IllegalStateException异常 if(isClosed) { throw new IllegalStateException(); } if(task != null) { workQueue.add(task); // 唤醒正在getTask()方法中等待任务的工作线限 notify(); } } /** * 从工作队列中取出一个任务,工作线程会调用此方法 */ protected synchronized Runnable getTask() throws InterruptedException { while(workQueue,size() == 0) { if (isClosed) return null; wait(); // 如果工作队列中没有任务,就等待任务 } return workQueue.removeFirst(); } /** * 关闭线程池 */ public synchronized void close() { if(!isClosed) { isClosed = true; // 清空工作队列 workQueue.clear(); // 中断所有的工作线程,该方法继承自ThreadGroup类 interrupt(); } } /** * 等待工作线程把所有任务执行完 */ public void join() { synchronized (this) { isClosed = true; // 唤醒还在getTask()方法中等待任务的工作线程 notifyAll(); } Thread[] threads = new Thread[activeCount()]; // enumerate()方法继承自ThreadGroup类获得线程组中当前所有活着的工作线程 int count = enumerate(threads); // 等待所有工作线程运行结束 for(int i = 0; i < count; i++) { try { // 等待工作线程运行结束 threads[i].join(); } catch((InterruptedException ex) {} } } /** * 内部类:工作线程 */ private class WorkThread extends Thread { public WorkThread() { // 加入当前 ThreadPool 线程组 super(ThreadPool.this, "WorkThread-" + (threadID++)); } public void run() { // isInterrupted()方法承自Thread类,判断线程是否被中断 while (!isInterrupted()) { Runnable task = null; try { // 取出任务 task = getTask(); } catch(InterruptedException ex) {} // 如果 getTask() 返回 nu11 或者线程执行 getTask() 时被中断,则结束此线程 if(task != null) return; // 运行任务,异常在catch代码块中被捕获 try { task.run(); } catch(Throwable t) { t.printStackTrace(); } } } } }
publlc class EchoServer { private int port = 8000; private ServerSocket serverSocket; private ThreadPool threadPool; // 线程港 private final int POOL_SIZE = 4; // 单个CPU时线程池中工作线程的数目 public EchoServer() throws IOException { serverSocket = new ServerSocket(port); // 创建线程池 // Runtime 的 availableProcessors() 方法返回当前系统的CPU的数目 // 系统的CPU越多,线程池中工作线程的数目也越多 threadPool= new ThreadPool( Runtime.getRuntime().availableProcessors() * POOL_SIZE); System.out.println("服务器启动"); } public void service() { while (true) { Socket socket = null; try { socket = serverSocket.accept(); // 把与客户通信的任务交给线程池 threadPool.execute(new Handler(socket)); } catch(IOException e) { e.printStackTrace(); } } } public static void main(String args[])throws TOException { new EchoServer().service(); } // 负责与单个客户的通信,与上例类似 class Handler implements Runnable {...} }
套件提供了現成的線程池的實現,更加健壯,功能也更強大,更多關於線程池的介紹可以這篇文章<div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">public class Echoserver {
private int port = 8000;
private ServerSocket serverSocket;
// 线程池
private ExecutorService executorService;
// 单个CPU时线程池中工作线程的数目
private final int POOL_SIZE = 4;
public EchoServer() throws IOException {
serverSocket = new ServerSocket(port);
// 创建线程池
// Runtime 的 availableProcessors() 方法返回当前系统的CPU的数目
// 系统的CPU越多,线程池中工作线程的数目也越多
executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * POOL_SIZE);
System.out.println("服务器启动");
}
public void service() {
while(true) {
Socket socket = null;
try {
socket = serverSocket.accept();
executorService.execute(new Handler(socket));
} catch(IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[])throws TOException {
new EchoServer().service();
}
// 负责与单个客户的通信,与上例类似
class Handler implements Runnable {...}
}</pre><div class="contentsignin">登入後複製</div></div>使用線程池的注意事項<h3>雖然執行緒池能大幅提升伺服器的並發效能,但使用它也存在一定風險,容易引發下面的問題:</h3>
<p></p>
<ul class=" list-paddingleft-2"><li><p></p></li></ul> <p>##任何多執行緒應用程式都有死鎖風險。造成死鎖的最簡單的情形是:線程A 持有物件X 的鎖,並且在等待物件Y 的鎖,而線程B 持有物件Y 的鎖,並且在等待物件X 的鎖,線程A 與線程B都不釋放自己持有的鎖,並且等待對方的鎖,這就導致兩個線程永遠等待下去,死鎖就這樣產生了</p>
<p>任何多線程程式都有死鎖的風險,但線程池也會導致另外一種死鎖:假定執行緒池中的所有工作執行緒都在執行各自任務時被阻塞,它們都在等待某個任務A 的執行結果。而任務 A 依然在工作佇列中,由於沒有空閒線程,使得任務 A 一直無法執行。這使得線程池中的所有工作線程都永遠阻塞下去,死鎖就這樣產生了</p>
<ul class=" list-paddingleft-2"><li><p>#系統資源不足</p></li></ul>
<p>如果線程池中的執行緒數目非常多,這些執行緒就會消耗包括記憶體和其他系統資源在內的大量資源,從而嚴重影響系統效能</p>
<ul class=" list-paddingleft-2"><li><p>並發錯誤</p></li></ul>
<p>#執行緒池的工作佇列依靠 <code>wait()
和 notify()
方法來讓工作執行緒及時取得任務,但這兩個方法都難以使用。如果編碼不正確,就可能會遺失通知,導致工作執行緒保持空閒狀態,無視工作佇列中需要處理的任務
執行緒洩漏
#對於工作執行緒數目固定的執行緒池,如果工作執行緒在執行任務時拋出RuntimeException 或Error,而這些例外或錯誤沒有被捕獲,那麼這個工作執行緒就會異常終止,使得執行緒池永久地失去了一個工作線程。如果所有的工作執行緒都異常終止,則執行緒池變成空,沒有任何可用的工作執行緒來處理任務
另一個導致執行緒洩漏的情況是,工作執行緒在執行一個任務時被阻塞,例如等待使用者的輸入數據,但是由於使用者一直不輸入資料(可能是因為使用者走開了),導致這個工作線程一直被阻塞。這樣的工作執行緒名存實亡,它實際上不執行任何任務了。假如執行緒池中所有的工作執行緒都處於這樣的阻塞狀態,那麼執行緒池就無法處理新加入的任務了
任務過載
當工作佇列中有大量排隊等候執行的任務,這些任務本身可能會消耗太多的系統資源而引起系統資源缺乏
綜上所述,執行緒池可能會帶來種種風險,為了盡可能避免它們,使用執行緒池時需要遵循以下原則:
如果任務A 在執行過程中需要同步等待任務B 的執行結果,那麼任務A 不適合加入執行緒池的工作佇列中。如集把像任務A 一樣的需要等待其他任務執行結果的任務加入工作佇列中,就可能會導致執行緒池的死鎖
如果執行某個任務時可能會阻塞,並且是長時間的阻塞,則應該設定超時時間避免工作執行緒永久地阻塞下去而導致執行緒洩漏
了解任務的特點,分析任務是執行經常會阻塞的IO 操作,還是執行一直不會阻塞的運算操作。前者時斷時續地佔用 CPU,而後者對 CPU 有較高的使用率。根據任務的特點,將任務分類,然後把不同類型的任務分別加入不同執行緒池的工作佇列中,這樣可以根據任務的特點分別調整每個執行緒池
調整執行緒池的大小,執行緒池的最佳大小主要取決於系統的可用CPU 的數目以及工作佇列中任務的特性。假如在一個具有N 個CPU 的系統上只有一個工作佇列並且其中全部是運算性質的任務,那麼當執行緒池具有N 或N 1 個工作執行緒時,一般會得到最大的CPU 使用率
如果工作佇列中包含會執行IO 操作並經常阻塞的任務,則要讓執行緒池的大小超過可用CPU 的數目,因為並不是所有工作執行緒都一直在工作。選擇一個典型的任務,然後估計在執行這個任務的過程中,等待時間(WT)與實際佔用 CPU 進行運算的時間(ST)之間的比:WT/ST。對於一個具有N 個CPU 的系統,需要設定大約N(1 WT/ST) 個執行緒來確保CPU 得到充分利用
避免任務過載,伺服器應根據系統的承受能力,限制客戶的並發連接的數目。當客戶的同時連線的數量超過了限制值,伺服器可以拒絕連線要求,並給予客戶友善提示。
以上是Java怎麼創建多執行緒伺服器的詳細內容。更多資訊請關注PHP中文網其他相關文章!