1 21 22 package org.continuent.sequoia.common.protocol; 23 24 import java.io.ByteArrayInputStream ; 25 import java.io.ByteArrayOutputStream ; 26 import java.io.IOException ; 27 import java.io.ObjectInputStream ; 28 import java.io.ObjectOutputStream ; 29 import java.io.Serializable ; 30 import java.math.BigDecimal ; 31 import java.math.BigInteger ; 32 import java.sql.Clob ; 33 import java.sql.SQLException ; 34 import java.sql.Timestamp ; 35 36 import org.continuent.sequoia.common.exceptions.NotImplementedException; 37 import org.continuent.sequoia.common.stream.LongUTFDataInputStream; 38 import org.continuent.sequoia.common.stream.LongUTFDataOutputStream; 39 40 49 public final class SQLDataSerialization 50 { 51 52 56 60 61 private static final Serializer JAVA_STRING = new StringSerializer(); 62 private static final Serializer MATH_BIGDECIMAL = new BigDecimalBytesSerializer(); 63 private static final Serializer JAVA_BOOLEAN = new BooleanSerializer(); 64 private static final Serializer JAVA_INTEGER = new IntegerSerializer(); 65 private static final Serializer JAVA_LONG = new LongSerializer(); 66 private static final Serializer JAVA_FLOAT = new FloatSerializer(); 67 private static final Serializer JAVA_DOUBLE = new DoubleSerializer(); 68 private static final Serializer JAVA_BYTES = new BytesSerializer(); 69 70 private static final Serializer SQL_DATE = new DateSerializer(); 71 private static final Serializer SQL_TIME = new TimeSerializer(); 72 private static final Serializer SQL_TIMESTAMP = new TimestampSerializer(); 73 74 private static final Serializer SQL_BLOB = new BlobSerializer(); 76 77 private static final Serializer JAVA_SERIALIZABLE = new JavaSerializableSerializer(); 78 79 80 private static final Serializer UNKNOWN_TYPE = new UndefinedSerializer(); 81 82 private static final int STREAM_BUF_SIZE = 65536; 83 84 86 87 public abstract static class Serializer 88 { 89 protected TypeTag typeTag; 90 91 96 public boolean isUndefined() 97 { 98 return this == UNKNOWN_TYPE; 99 } 100 101 104 public TypeTag getTypeTag() 105 { 106 return typeTag; 107 } 108 109 118 119 public abstract void sendToStream(Object obj, LongUTFDataOutputStream output) 120 throws IOException , ClassCastException ; 121 122 130 public abstract Object receiveFromStream(LongUTFDataInputStream input) 131 throws IOException ; 132 133 } 134 135 150 public static Serializer getSerializer(Object sqlObjOrTypeTag) 151 throws NotImplementedException, IllegalArgumentException 152 { 153 return getSerializerImpl(sqlObjOrTypeTag); 154 } 155 156 157 public static Serializer getSerializer(TypeTag t) 158 throws IllegalArgumentException 159 { 160 try 161 { 162 return getSerializerImpl(t); 163 } 164 catch (NotImplementedException nie) 165 { 166 IllegalArgumentException ipe = new IllegalArgumentException ( 168 "Internal bug: there should be a serializer available for every" 169 + " existing TypeTag, including:" + t); 170 ipe.initCause(nie); 171 throw ipe; 172 } 173 } 174 175 private static Serializer getSerializerImpl(Object sqlObjOrTypeTag) 176 throws NotImplementedException, IllegalArgumentException 177 { 178 182 TypeTag tag = TypeTag.CONTROLLER_READY; 183 Object obj = JAVA_STRING; 184 185 189 if (sqlObjOrTypeTag instanceof TypeTag) 190 tag = (TypeTag) sqlObjOrTypeTag; 191 else 192 obj = sqlObjOrTypeTag; 193 194 if (obj == null || TypeTag.UNDEFINED.equals(tag)) 195 return UNKNOWN_TYPE; 196 197 205 if (obj instanceof String || TypeTag.STRING.equals(tag)) 207 return JAVA_STRING; 208 209 if (obj instanceof BigDecimal || TypeTag.BIGDECIMAL.equals(tag)) 211 return MATH_BIGDECIMAL; 212 213 if (obj instanceof Boolean || TypeTag.BOOLEAN.equals(tag)) 215 return JAVA_BOOLEAN; 216 217 if (obj instanceof Integer || TypeTag.INTEGER.equals(tag)) 219 return JAVA_INTEGER; 220 221 if (obj instanceof Long || TypeTag.LONG.equals(tag)) 223 return JAVA_LONG; 224 225 if (obj instanceof Float || TypeTag.FLOAT.equals(tag)) 227 return JAVA_FLOAT; 228 229 if (obj instanceof Double || TypeTag.DOUBLE.equals(tag)) 231 return JAVA_DOUBLE; 232 233 if (obj instanceof byte[] || TypeTag.BYTE_ARRAY.equals(tag)) 235 return JAVA_BYTES; 236 237 if (obj instanceof java.sql.Date || TypeTag.SQL_DATE.equals(tag)) 239 return SQL_DATE; 240 241 if (obj instanceof java.sql.Time || TypeTag.SQL_TIME.equals(tag)) 243 return SQL_TIME; 244 245 if (obj instanceof Timestamp || TypeTag.SQL_TIMESTAMP.equals(tag)) 247 return SQL_TIMESTAMP; 248 249 if (obj instanceof Clob || TypeTag.CLOB.equals(tag)) 251 throw new NotImplementedException( 252 "Clob serialization not yet implemented"); 253 254 if (obj instanceof java.sql.Blob || TypeTag.BLOB.equals(tag)) 256 return SQL_BLOB; 257 258 260 if (sqlObjOrTypeTag instanceof Serializable 262 || TypeTag.JAVA_SERIALIZABLE.equals(tag)) 263 return JAVA_SERIALIZABLE; 264 265 if (sqlObjOrTypeTag instanceof TypeTag) 266 throw new IllegalArgumentException ( 267 "Internal error: getSerializer() misused with unknown TypeTag argument:" 268 + tag); 269 270 273 throw new NotImplementedException("Unable to serialize unknown type " 275 + sqlObjOrTypeTag.getClass() + " of object " + sqlObjOrTypeTag); 276 } 277 278 281 282 private static final class StringSerializer extends Serializer 284 { 285 { 286 typeTag = TypeTag.STRING; 287 } 288 289 public void sendToStream(Object obj, LongUTFDataOutputStream output) 290 throws IOException 291 { 292 output.writeLongUTF((String ) obj); 293 } 294 295 public Object receiveFromStream(LongUTFDataInputStream input) 296 throws IOException 297 { 298 return input.readLongUTF(); 299 300 } 301 } 302 303 private static final class BigDecimalBytesSerializer extends Serializer 306 { 307 { 308 typeTag = TypeTag.BIGDECIMAL; 309 } 310 311 public void sendToStream(Object obj, LongUTFDataOutputStream output) 312 throws IOException 313 { 314 321 327 334 BigDecimal toBeSerialized = (BigDecimal ) obj; 336 337 byte[] byteArray = toBeSerialized.unscaledValue().abs().toByteArray(); 340 output.writeInt(byteArray.length); 341 342 int idx = 0, word = 0; 346 int padding = byteArray.length % 4; 347 if (padding > 0) 348 { 349 for (idx = 0; idx < padding; idx++) 356 { 357 word |= (byteArray[idx] & 0xFF) << (8 * (padding - idx - 1)); 359 } 360 output.writeInt(word); 362 } 363 for (; idx < byteArray.length; idx += 4) 366 { 367 word = (byteArray[idx] & 0xFF) << 24 368 | (byteArray[idx + 1] & 0xFF) << 16 369 | (byteArray[idx + 2] & 0xFF) << 8 | byteArray[idx + 3] & 0xFF; 370 output.writeInt(word); 371 } 372 373 output.writeInt(toBeSerialized.signum()); 375 376 output.writeInt(toBeSerialized.scale()); 378 } 379 380 public Object receiveFromStream(LongUTFDataInputStream input) 381 throws IOException 382 { 383 int byteArrayLength = input.readInt(); 387 byte[] byteArray = new byte[byteArrayLength]; 388 int idx = 0, wordRead = 0; 390 int padding = byteArrayLength % 4; 391 if (padding > 0) 394 { 395 wordRead = input.readInt(); 396 for (idx = 0; idx < padding; idx++) 397 { 398 byteArray[idx] = (byte) ((wordRead >> (8 * (padding - idx - 1))) & 0xFF); 399 } 400 } 401 for (; idx < byteArrayLength; idx += 4) 404 { 405 wordRead = input.readInt(); 406 byteArray[idx] = (byte) ((wordRead >> 24) & 0xFF); 407 byteArray[idx + 1] = (byte) ((wordRead >> 16) & 0xFF); 408 byteArray[idx + 2] = (byte) ((wordRead >> 8) & 0xFF); 409 byteArray[idx + 3] = (byte) (wordRead & 0xFF); 410 } 411 BigInteger intVal = new BigInteger (byteArray); 412 413 if (input.readInt() < 0) 415 intVal = intVal.negate(); 416 417 int scale = input.readInt(); 419 420 return new BigDecimal (intVal, scale); 421 } 422 } 423 424 private static final class BooleanSerializer extends Serializer 426 { 427 { 428 typeTag = TypeTag.BOOLEAN; 429 } 430 431 public void sendToStream(Object obj, LongUTFDataOutputStream output) 432 throws IOException 433 { 434 output.writeBoolean(((Boolean ) obj).booleanValue()); 435 } 436 437 public Object receiveFromStream(LongUTFDataInputStream input) 438 throws IOException 439 { 440 return new Boolean (input.readBoolean()); 441 } 442 } 443 444 private static final class IntegerSerializer extends Serializer 446 { 447 { 448 typeTag = TypeTag.INTEGER; 449 } 450 451 public void sendToStream(Object obj, LongUTFDataOutputStream output) 452 throws IOException 453 { 454 459 output.writeInt(((Number ) obj).intValue()); 460 } 461 462 public Object receiveFromStream(LongUTFDataInputStream input) 463 throws IOException 464 { 465 return new Integer (input.readInt()); 466 } 467 } 468 469 private static final class LongSerializer extends Serializer 471 { 472 { 473 typeTag = TypeTag.LONG; 474 } 475 476 public void sendToStream(Object obj, LongUTFDataOutputStream output) 477 throws IOException 478 { 479 output.writeLong(((Long ) obj).longValue()); 480 } 481 482 public Object receiveFromStream(LongUTFDataInputStream input) 483 throws IOException 484 { 485 return new Long (input.readLong()); 486 } 487 } 488 489 private static final class FloatSerializer extends Serializer 491 { 492 { 493 typeTag = TypeTag.FLOAT; 494 } 495 496 public void sendToStream(Object obj, LongUTFDataOutputStream output) 497 throws IOException 498 { 499 output.writeFloat(((Float ) obj).floatValue()); 500 } 501 502 public Object receiveFromStream(LongUTFDataInputStream input) 503 throws IOException 504 { 505 return new Float (input.readFloat()); 506 } 507 } 508 509 private static final class DoubleSerializer extends Serializer 511 { 512 { 513 typeTag = TypeTag.DOUBLE; 514 } 515 516 public void sendToStream(Object obj, LongUTFDataOutputStream output) 517 throws IOException 518 { 519 output.writeDouble(((Double ) obj).doubleValue()); 520 } 521 522 public Object receiveFromStream(LongUTFDataInputStream input) 523 throws IOException 524 { 525 return new Double (input.readDouble()); 526 } 527 } 528 529 private static final class BytesSerializer extends Serializer 531 { 532 { 533 typeTag = TypeTag.BYTE_ARRAY; 534 } 535 536 public void sendToStream(Object obj, LongUTFDataOutputStream output) 537 throws IOException 538 { 539 byte[] b = (byte[]) obj; 540 output.writeInt(b.length); 541 output.write(b); 542 } 543 544 public Object receiveFromStream(LongUTFDataInputStream input) 545 throws IOException 546 { 547 int len = input.readInt(); 548 byte[] b = new byte[len]; 549 input.readFully(b); 550 return b; 551 } 552 } 553 554 private static final class DateSerializer extends Serializer 556 { 557 { 558 typeTag = TypeTag.SQL_DATE; 559 } 560 561 public void sendToStream(Object obj, LongUTFDataOutputStream output) 562 throws IOException 563 { 564 output.writeLong(((java.sql.Date ) obj).getTime()); 565 } 566 567 public Object receiveFromStream(LongUTFDataInputStream input) 568 throws IOException 569 { 570 return new java.sql.Date (input.readLong()); 571 } 572 } 573 574 private static final class TimeSerializer extends Serializer 576 { 577 { 578 typeTag = TypeTag.SQL_TIME; 579 } 580 581 public void sendToStream(Object obj, LongUTFDataOutputStream output) 582 throws IOException 583 { 584 output.writeInt((int) ((java.sql.Time ) obj).getTime()); 585 } 586 587 public Object receiveFromStream(LongUTFDataInputStream input) 588 throws IOException 589 { 590 return new java.sql.Time (input.readInt()); 591 } 592 } 593 594 private static final class TimestampSerializer extends Serializer 596 { 597 { 598 typeTag = TypeTag.SQL_TIMESTAMP; 599 } 600 601 public void sendToStream(Object obj, LongUTFDataOutputStream output) 602 throws IOException 603 { 604 Timestamp ts = (Timestamp ) obj; 605 output.writeLong(ts.getTime()); 607 output.writeInt(ts.getNanos()); 608 } 609 610 public Object receiveFromStream(LongUTFDataInputStream input) 611 throws IOException 612 { 613 long tsWithMilli = input.readLong(); 614 Timestamp ts = new Timestamp ((tsWithMilli / 1000) * 1000); 616 ts.setNanos(input.readInt()); 617 return ts; 618 } 619 } 620 621 623 private static final class BlobSerializer extends Serializer 625 { 626 { 627 typeTag = TypeTag.BLOB; 628 } 629 630 public void sendToStream(Object obj, LongUTFDataOutputStream output) 631 throws IOException 632 { 633 java.sql.Blob blob = (java.sql.Blob ) obj; 634 try 635 { 636 640 if (blob.length() > Integer.MAX_VALUE) 641 throw new IOException ("Blobs bigger than " + Integer.MAX_VALUE 643 + " are not supported"); 644 645 output.writeInt((int) blob.length()); 647 648 byte[] tempBuffer = new byte[STREAM_BUF_SIZE]; 649 java.io.InputStream input = blob.getBinaryStream(); 650 int nbRead; 651 while (true) 652 { 653 nbRead = input.read(tempBuffer); 654 if (-1 == nbRead) 655 break; 656 output.write(tempBuffer, 0, nbRead); 657 } 658 } 659 catch (SQLException e) 660 { 661 throw (IOException ) new IOException (e.getLocalizedMessage()) 664 .initCause(e); 665 } 666 } 667 668 public Object receiveFromStream(LongUTFDataInputStream input) 669 throws IOException 670 { 671 byte[] b = (byte[]) JAVA_BYTES.receiveFromStream(input); 672 return new org.continuent.sequoia.common.protocol.ByteArrayBlob(b); 673 } 674 } 675 676 private static final class JavaSerializableSerializer extends Serializer 678 { 679 { 680 typeTag = TypeTag.JAVA_SERIALIZABLE; 681 } 682 683 public void sendToStream(Object obj, LongUTFDataOutputStream output) 684 throws IOException 685 { 686 ByteArrayOutputStream baos = new ByteArrayOutputStream (); 687 ObjectOutputStream oos = new ObjectOutputStream (baos); 688 oos.writeObject(obj); 689 oos.close(); 690 691 output.writeInt(baos.size()); 693 baos.writeTo(output); 694 baos.close(); 695 } 696 697 public Object receiveFromStream(LongUTFDataInputStream input) 698 throws IOException 699 { 700 byte[] b = (byte[]) JAVA_BYTES.receiveFromStream(input); 701 ObjectInputStream ois = new ObjectInputStream (new ByteArrayInputStream (b)); 702 try 703 { 704 Object obj = ois.readObject(); 705 return obj; 706 } 707 catch (ClassNotFoundException e) 708 { 709 ClassCastException ioe = new ClassCastException ( 710 "Class of deserialized object not found"); 711 ioe.initCause(e); 712 throw ioe; 713 } 714 } 715 } 716 717 private static final class UndefinedSerializer extends Serializer 719 { 720 { 721 typeTag = TypeTag.UNDEFINED; 722 } 723 724 public void sendToStream(Object obj, LongUTFDataOutputStream output) 725 { 726 throw new RuntimeException ( 727 "Internal bug: tried to send using the UNDEFINED serializer"); 728 } 729 730 public Object receiveFromStream(LongUTFDataInputStream input) 731 throws ClassCastException 732 { 733 throw new RuntimeException ( 734 "Internal bug: tried to receive using the UNDEFINED deserializer"); 735 } 736 } 737 738 } 739 | Popular Tags |