1 24 25 package com.mckoi.database.jdbcserver; 26 27 import com.mckoi.debug.DebugLogger; 28 import com.mckoi.database.Database; 29 import com.mckoi.database.jdbc.ProtocolConstants; 30 import com.mckoi.database.jdbc.DatabaseInterface; 31 import com.mckoi.util.LengthMarkedBufferedInputStream; 32 import java.io.*; 33 34 40 41 abstract class StreamJDBCServerConnection extends JDBCProcessor 42 implements ServerConnection { 43 44 48 private static final int OUTPUT_BUFFER_SIZE = 32768; 49 50 54 private static final int INPUT_BUFFER_SIZE = 16384; 55 56 60 private LengthMarkedBufferedInputStream marked_input; 61 62 65 private DataOutputStream out; 66 67 70 StreamJDBCServerConnection(DatabaseInterface db_interface, 71 InputStream in, OutputStream out, DebugLogger logger) 72 throws IOException { 73 super(db_interface, logger); 74 75 this.marked_input = new LengthMarkedBufferedInputStream(in); 76 this.out = new DataOutputStream( 77 new BufferedOutputStream(out, OUTPUT_BUFFER_SIZE)); 78 79 } 80 81 83 public void sendEvent(byte[] event_msg) throws IOException { 86 synchronized (out) { 87 out.writeInt(4 + 4 + event_msg.length); 89 out.writeInt(-1); 91 out.writeInt(ProtocolConstants.DATABASE_EVENT); 93 out.write(event_msg, 0, event_msg.length); 95 out.flush(); 97 } 98 } 99 100 102 106 public boolean requestPending() throws IOException { 107 int state = getState(); 108 if (state == 100) { 109 return marked_input.pollForCommand(Integer.MAX_VALUE); 110 } 111 else { 112 return marked_input.pollForCommand(256); 113 } 114 } 115 116 119 public void processRequest() throws IOException { 120 int sequence_limit = 8; 124 125 int com_length = marked_input.available(); 128 while (com_length > 0) { 129 byte[] command = new byte[com_length]; 130 int read_index = 0; 131 while (read_index < com_length) { 132 read_index += 133 marked_input.read(command, read_index, (com_length - read_index)); 134 } 135 136 byte[] response = processJDBCCommand(command); 138 if (response != null) { 139 140 synchronized (out) { 141 out.writeInt(response.length); 143 out.write(response); 144 out.flush(); 145 } 146 147 } 148 149 com_length = 0; 151 if (sequence_limit > 0) { 152 if (requestPending()) { 153 com_length = marked_input.available(); 154 --sequence_limit; 155 } 156 } 157 158 } 160 } 163 164 167 public void blockForRequest() throws IOException { 168 marked_input.blockForCommand(); 169 } 170 171 174 public void ping() throws IOException { 175 synchronized (out) { 176 out.writeInt(8); 178 out.writeInt(-1); 180 out.writeInt(ProtocolConstants.PING); 182 out.flush(); 184 } 185 } 186 187 } 188
| Popular Tags
|