from socket import socket, create_server BUFFER_SIZE = 1024 ADDRESS = ("127.0.0.1", 12345) class Server: def __init__(self) -> None: try: print(f"Starting up at: {ADDRESS}") self.server_socket: socket = create_server(ADDRESS) except OSError: self.server_socket.close() print("\nServer stopped.") def accept(self) -> socket: conn, client_address = self.server_socket.accept() print(f"Connected to {client_address}") return conn def serve(self, conn: socket) -> None: try: while True: data = conn.recv(BUFFER_SIZE) if not data: break try: order = int(data.decode()) response = f"Thank you for ordering {order} pizzas!\n" except ValueError: response = "Wrong number of pizzas, please try again\n" print(f"Sending message to {conn.getpeername()}") conn.send(response.encode()) finally: print(f"Connection with {conn.getpeername()} has been closed") conn.close() def start(self) -> None: print("Server listening for incoming connections") try: while True: conn = self.accept() self.serve(conn) finally: self.server_socket.close() print("\nServer stopped.") if __name__ == "__main__": server = Server() server.start()
$ python3 pizza_server.py
Starting up at: (‘127.0.0.1’, 12345)
Server listening for incoming connections
Connected to (‘127.0.0.1’, 55292)
Sending message to (‘127.0.0.1’, 55292)
^CConnection with (‘127.0.0.1’, 55292) has been closed
$ nc 127.0.0.1 12345
10
Thank you for ordering 10 pizzas!
### サーバの並列化
from socket import socket, create_server from threading import Thread BUFFER_SIZE = 1024 ADDRESS = ("127.0.0.1", 12345) class Handler(Thread): def __init__(self, conn: socket): super().__init__() self.conn = conn def run(self) -> None: print(f"Connected to {self.conn.getpeername()}") try: while True: data = self.conn.recv(BUFFER_SIZE) if not data: break try: order = int(data.decode()) response = f"Thank you for ordering {order} pizzas!\n" except ValueError: response = "Wrong number of pizzas, please try again\n" print(f"Sending message to {self.conn.getpeername()}") self.conn.send(response.encode()) finally: print(f"Connection with {self.conn.getpeername()} has been closed") self.conn.close() class Server: def __init__(self) -> None: try: print(f"Starting up at: {ADDRESS}") self.server_socket: socket = create_server(ADDRESS) except OSError: self.server_socket.close() print("\nServer stopped.") def start(self) -> None: print("Server listening for incoming connections") try: while True: conn, address = self.server_socket.accept() print(f"Client connection request from {address}") thread = Handler(conn) thread.start() finally: self.server_socket.close() print("\nServer stopped.") if __name__ == "__main__": server = Server() server.start()
### ノンブロッキングモード
import typing as T from socket import socket, create_server BUFFER_SIZE = 1024 ADDRESS = ("127.0.0.1", 12345) class Server: clients: T.Set[socket] = set() def __init__(self) -> None: try: print(f"Starting up at: {ADDRESS}") self.server_socket: socket = create_server(ADDRESS) self.server_socket.setblocking(False) except OSError: self.server_socket.close() print("\nServer stopped.") def accept(self) -> socket: try: conn, address = self.server_socket.accept() print(f"Connected to {address}") conn.setblocking(False) self.clients.add(conn) except BlockingIOError: pass def serve(self, conn: socket) -> None: try: while True: data = conn.recv(BUFFER_SIZE) if not data: break try: order = int(data.decode()) response = f"Thank you for ordering {order} pizzas!\n" except ValueError: response = "Wrong number of pizzas, please try again\n" print(f"Sending message to {conn.getpeername()}") conn.send(response.encode()) except BlockingIOError: pass def start(self) -> None: print("Server listening for incoming connections") try: while True: self.accept() for conn in self.clients.copy(): self.serve(conn) finally: self.server_socket.close() print("\nServer stopped.") if __name__ == "__main__": server = Server() server.start()