@@ -64,9 +64,17 @@ public class Bucket extends PaginatedComponent {
6464 private static final long FIRST_CHUNK = -2L ; // USE -2 TO MARK THE FIRST CHUNK OF A BIG RECORD. FOLLOWS THE CHUNK SIZE AND THE POINTER TO THE NEXT CHUNK
6565 private static final long NEXT_CHUNK = -3L ; // USE -3 TO MARK THE SECOND AND FURTHER CHUNK THAT IS PART OF A BIG RECORD THAT DOES NOT FIT A PAGE. FOLLOWS THE CHUNK SIZE AND THE POINTER TO THE NEXT CHUNK OR 0 IF THE CURRENT CHUNK IS THE LAST (NO FURTHER CHUNKS)
6666 private static final long RECORD_PLACEHOLDER_CONTENT = MINIMUM_RECORD_SIZE * -1L ; // < -5 FOR SURROGATE RECORDS
67+ private static final long MINIMUM_SPACE_LEFT_IN_PAGE = 50L ;
6768 protected final int contentHeaderSize ;
6869 private final int maxRecordsInPage = DEF_MAX_RECORDS_IN_PAGE ;
6970
71+ private static class AvailableSpace {
72+ public BasePage page = null ;
73+ public int newPosition = -1 ;
74+ public int recordCountInPage = -1 ;
75+ public boolean createNewPage = false ;
76+ }
77+
7078 public static class PaginatedComponentFactoryHandler implements PaginatedComponentFactory .PaginatedComponentFactoryHandler {
7179 @ Override
7280 public PaginatedComponent createOnLoad (final DatabaseInternal database , final String name , final String filePath , final int id ,
@@ -525,58 +533,29 @@ private RID createRecordInternal(final Record record, final boolean isPlaceHolde
525533 while (buffer .size () < MINIMUM_RECORD_SIZE )
526534 buffer .putByte (buffer .size () - 1 , (byte ) 0 );
527535
528- final int bufferSize = buffer .size ();
536+ int bufferSize = buffer .size ();
529537
530538 try {
531539 int newPosition = -1 ;
532540 int recordCountInPage = -1 ;
533- boolean createNewPage = false ;
541+ boolean createNewPage ;
534542 BasePage lastPage = null ;
535543
536- final int txPageCounter = getTotalPages ();
544+ int txPageCounter = getTotalPages ();
537545
538546 if (txPageCounter > 0 ) {
539- lastPage = database .getTransaction ().getPage (new PageId (file .getFileId (), txPageCounter - 1 ), pageSize );
540- recordCountInPage = lastPage .readShort (PAGE_RECORD_COUNT_IN_PAGE_OFFSET );
541- if (recordCountInPage >= maxRecordsInPage )
542- // MAX NUMBER OF RECORDS IN A PAGE REACHED, USE A NEW PAGE
543- createNewPage = true ;
544- else if (recordCountInPage > 0 ) {
545- // GET FIRST EMPTY POSITION
546- final int lastRecordPositionInPage = (int ) lastPage .readUnsignedInt (PAGE_RECORD_TABLE_OFFSET + (recordCountInPage - 1 ) * INT_SERIALIZED_SIZE );
547- if (lastRecordPositionInPage == 0 )
548- // CLEANED CORRUPTED RECORD
549- createNewPage = true ;
550-
551- final long [] lastRecordSize = lastPage .readNumberAndSize (lastRecordPositionInPage );
552-
553- if (lastRecordSize [0 ] > 0 )
554- // RECORD PRESENT, CONSIDER THE RECORD SIZE + VARINT SIZE
555- newPosition = lastRecordPositionInPage + (int ) lastRecordSize [0 ] + (int ) lastRecordSize [1 ];
556- else if (lastRecordSize [0 ] == RECORD_PLACEHOLDER_POINTER )
557- // PLACEHOLDER, CONSIDER NEXT 9 BYTES
558- newPosition = lastRecordPositionInPage + LONG_SERIALIZED_SIZE + 1 ;
559- else if (lastRecordSize [0 ] == FIRST_CHUNK || lastRecordSize [0 ] == NEXT_CHUNK ) {
560- // 1ST CHUNK
561- final int chunkSize = lastPage .readInt ((int ) (lastRecordPositionInPage + lastRecordSize [1 ]));
562- newPosition = lastRecordPositionInPage + INT_SERIALIZED_SIZE + LONG_SERIALIZED_SIZE + chunkSize ;
563- } else
564- // PLACEHOLDER CONTENT, CONSIDER THE RECORD SIZE (CONVERTED FROM NEGATIVE NUMBER) + VARINT SIZE
565- newPosition = lastRecordPositionInPage + (int ) (-1 * lastRecordSize [0 ]) + (int ) lastRecordSize [1 ];
566-
567- final long spaceLeft = lastPage .getMaxContentSize () - newPosition ;
568- final int varIntSizeOfBuffer = Binary .getNumberSpace (isPlaceHolder ? (-1L * bufferSize ) : bufferSize );
569- if (spaceLeft < varIntSizeOfBuffer + bufferSize )
570- // RECORD TOO BIG FOR THIS PAGE, USE A NEW PAGE
571- createNewPage = true ;
547+ // CHECK IF THERE IS SPACE IN THE LAST PAGE
548+ final AvailableSpace availableSpace = checkForSpaceInPage (txPageCounter - 1 , isPlaceHolder , bufferSize );
549+
550+ lastPage = availableSpace .page ;
551+ newPosition = availableSpace .newPosition ;
552+ recordCountInPage = availableSpace .recordCountInPage ;
553+ createNewPage = availableSpace .createNewPage ;
572554
573- } else
574- // FIRST RECORD, START RIGHT AFTER THE HEADER
575- newPosition = contentHeaderSize ;
576555 } else
577556 createNewPage = true ;
578557
579- final MutablePage selectedPage ;
558+ MutablePage selectedPage ;
580559 if (createNewPage ) {
581560 selectedPage = database .getTransaction ().addPage (new PageId (file .getFileId (), txPageCounter ), pageSize );
582561 newPosition = contentHeaderSize ;
@@ -586,10 +565,22 @@ else if (lastRecordSize[0] == FIRST_CHUNK || lastRecordSize[0] == NEXT_CHUNK) {
586565
587566 final RID rid = new RID (database , file .getFileId (), ((long ) selectedPage .getPageId ().getPageNumber ()) * maxRecordsInPage + recordCountInPage );
588567
589- final int byteWritten = selectedPage .writeNumber (newPosition , isPlaceHolder ? (-1L * bufferSize ) : bufferSize );
590- selectedPage .writeByteArray (newPosition + byteWritten , buffer .getContent (), buffer .getContentBeginOffset (), bufferSize );
591- selectedPage .writeUnsignedInt (PAGE_RECORD_TABLE_OFFSET + recordCountInPage * INT_SERIALIZED_SIZE , newPosition );
592- selectedPage .writeShort (PAGE_RECORD_COUNT_IN_PAGE_OFFSET , (short ) ++recordCountInPage );
568+ final int spaceAvailableInCurrentPage = selectedPage .getMaxContentSize () - newPosition ;
569+ final int spaceNeeded = Binary .getNumberSpace (isPlaceHolder ? (-1L * bufferSize ) : bufferSize ) + bufferSize ;
570+
571+ if (spaceNeeded > spaceAvailableInCurrentPage ) {
572+ // MULTI-PAGE RECORD
573+ selectedPage .writeUnsignedInt (PAGE_RECORD_TABLE_OFFSET + recordCountInPage * INT_SERIALIZED_SIZE , newPosition );
574+ selectedPage .writeShort (PAGE_RECORD_COUNT_IN_PAGE_OFFSET , (short ) ++recordCountInPage );
575+
576+ writeMultiPageRecord (buffer , selectedPage , newPosition , spaceAvailableInCurrentPage );
577+
578+ } else {
579+ final int byteWritten = selectedPage .writeNumber (newPosition , isPlaceHolder ? (-1L * bufferSize ) : bufferSize );
580+ selectedPage .writeByteArray (newPosition + byteWritten , buffer .getContent (), buffer .getContentBeginOffset (), bufferSize );
581+ selectedPage .writeUnsignedInt (PAGE_RECORD_TABLE_OFFSET + recordCountInPage * INT_SERIALIZED_SIZE , newPosition );
582+ selectedPage .writeShort (PAGE_RECORD_COUNT_IN_PAGE_OFFSET , (short ) ++recordCountInPage );
583+ }
593584
594585 LogManager .instance ()
595586 .log (this , Level .FINE , "Created record %s (page=%s records=%d threadId=%d)" , rid , selectedPage , recordCountInPage , Thread .currentThread ().getId ());
@@ -665,7 +656,7 @@ private boolean updateRecordInternal(final Record record, final RID rid, final b
665656 recordSize [0 ] *= -1L ;
666657 }
667658
668- final int bufferSize = buffer .size ();
659+ int bufferSize = buffer .size ();
669660 if (bufferSize > recordSize [0 ]) {
670661 // UPDATED RECORD IS LARGER THAN THE PREVIOUS VERSION: MAKE ROOM IN THE PAGE IF POSSIBLE
671662
@@ -675,24 +666,27 @@ private boolean updateRecordInternal(final Record record, final RID rid, final b
675666 if (lastRecordSize [0 ] == RECORD_PLACEHOLDER_POINTER ) {
676667 lastRecordSize [0 ] = LONG_SERIALIZED_SIZE ;
677668 lastRecordSize [1 ] = 1L ;
669+ } else if (lastRecordSize [0 ] == FIRST_CHUNK || lastRecordSize [0 ] == NEXT_CHUNK ) {
670+ // CONSIDER THE CHUNK SIZE
671+ lastRecordSize [0 ] = page .readInt ((int ) (lastRecordPositionInPage + recordSize [1 ]));
672+ lastRecordSize [1 ] = INT_SERIALIZED_SIZE + LONG_SERIALIZED_SIZE ;
678673 } else if (lastRecordSize [0 ] < RECORD_PLACEHOLDER_CONTENT ) {
679674 lastRecordSize [0 ] *= -1L ;
680675 }
681676
682677 final int pageOccupied = (int ) (lastRecordPositionInPage + lastRecordSize [0 ] + lastRecordSize [1 ]);
683-
678+ int spaceAvailableInCurrentPage = page . getMaxContentSize () - pageOccupied ;
684679 final int bufferSizeLength = Binary .getNumberSpace (isPlaceHolder ? -1L * bufferSize : bufferSize );
680+ final int additionalSpaceNeeded = (int ) (bufferSize + bufferSizeLength - recordSize [0 ] - recordSize [1 ]);
685681
686- final int delta = (int ) (bufferSize + bufferSizeLength - recordSize [0 ] - recordSize [1 ]);
687-
688- if (page .getMaxContentSize () - pageOccupied > delta ) {
682+ if (additionalSpaceNeeded <= spaceAvailableInCurrentPage ) {
689683 // THERE IS SPACE LEFT IN THE PAGE, SHIFT ON THE RIGHT THE EXISTENT RECORDS
690684
691685 if (positionInPage < recordCountInPage - 1 ) {
692686 // NOT LAST RECORD IN PAGE, SHIFT NEXT RECORDS
693687 final int nextRecordPositionInPage = (int ) page .readUnsignedInt (PAGE_RECORD_TABLE_OFFSET + (positionInPage + 1 ) * INT_SERIALIZED_SIZE );
694688
695- final int newPos = nextRecordPositionInPage + delta ;
689+ final int newPos = nextRecordPositionInPage + additionalSpaceNeeded ;
696690
697691 page .move (nextRecordPositionInPage , newPos , pageOccupied - nextRecordPositionInPage );
698692
@@ -703,9 +697,9 @@ private boolean updateRecordInternal(final Record record, final RID rid, final b
703697 if (nextRecordPosInPage == 0 )
704698 page .writeUnsignedInt (PAGE_RECORD_TABLE_OFFSET + pos * INT_SERIALIZED_SIZE , 0 );
705699 else
706- page .writeUnsignedInt (PAGE_RECORD_TABLE_OFFSET + pos * INT_SERIALIZED_SIZE , nextRecordPosInPage + delta );
700+ page .writeUnsignedInt (PAGE_RECORD_TABLE_OFFSET + pos * INT_SERIALIZED_SIZE , nextRecordPosInPage + additionalSpaceNeeded );
707701
708- assert nextRecordPosInPage + delta < page .getMaxContentSize ();
702+ assert nextRecordPosInPage + additionalSpaceNeeded < page .getMaxContentSize ();
709703 }
710704 }
711705
@@ -722,13 +716,24 @@ private boolean updateRecordInternal(final Record record, final RID rid, final b
722716 // CANNOT CREATE A PLACEHOLDER OF PLACEHOLDER
723717 return false ;
724718
725- // STORE THE RECORD SOMEWHERE ELSE AND CREATE HERE A PLACEHOLDER THAT POINTS TO THE NEW POSITION. IN THIS WAY THE RID IS PRESERVED
726- final RID realRID = createRecordInternal (record , true , false );
719+ final int availableSpaceForChunk = (int ) (recordSize [0 ] + recordSize [1 ]);
727720
728- final int bytesWritten = page .writeNumber (recordPositionInPage , RECORD_PLACEHOLDER_POINTER );
729- page .writeLong (recordPositionInPage + bytesWritten , realRID .getPosition ());
730- LogManager .instance ().log (this , Level .FINE , "Updated record %s by allocating new space with a placeholder (page=%s threadId=%d)" , null , rid , page ,
731- Thread .currentThread ().getId ());
721+ if (availableSpaceForChunk < 2 + LONG_SERIALIZED_SIZE + INT_SERIALIZED_SIZE ) {
722+ final int bytesWritten = page .writeNumber (recordPositionInPage , RECORD_PLACEHOLDER_POINTER );
723+
724+ final RID realRID = createRecordInternal (record , true , false );
725+ page .writeLong (recordPositionInPage + bytesWritten , realRID .getPosition ());
726+
727+ LogManager .instance ().log (this , Level .FINE , "Updated record %s by allocating new space with a placeholder (page=%s threadId=%d)" , null , rid , page ,
728+ Thread .currentThread ().getId ());
729+ } else {
730+ // SPLIT THE RECORD IN CHUNKS AS LINKED LIST AND STORE THE FIRST PART ON CURRENT PAGE ISSUE https://github.com/ArcadeData/arcadedb/issues/332
731+ writeMultiPageRecord (buffer , page , recordPositionInPage , availableSpaceForChunk );
732+
733+ LogManager .instance ()
734+ .log (this , Level .FINE , "Updated record %s by splitting it in multiple chunks to be saved in multiple pages (page=%s threadId=%d)" , null , rid ,
735+ page , Thread .currentThread ().getId ());
736+ }
732737 }
733738 } else {
734739 // UPDATED RECORD CONTENT IS NOT LARGER THAN PREVIOUS VERSION: OVERWRITE THE CONTENT
@@ -795,8 +800,6 @@ private void deleteRecordInternal(final RID rid, final boolean deletePlaceholder
795800 // LAST CHUNK
796801 break ;
797802
798- deleteRecordInternal (new RID (database , id , nextChunkPointer ), false , true );
799-
800803 // READ THE NEXT CHUNK
801804 final int chunkPageId = (int ) (nextChunkPointer / maxRecordsInPage );
802805 final int chunkPositionInPage = (int ) (nextChunkPointer % maxRecordsInPage );
@@ -807,6 +810,8 @@ private void deleteRecordInternal(final RID rid, final boolean deletePlaceholder
807810
808811 if (recordSize [0 ] != NEXT_CHUNK )
809812 throw new DatabaseOperationException ("Error on fetching multi page record " + rid + " chunk " + chunkId );
813+
814+ deleteRecordInternal (new RID (database , id , nextChunkPointer ), false , true );
810815 }
811816
812817 } else if (recordSize [0 ] == NEXT_CHUNK ) {
@@ -881,4 +886,135 @@ private Binary loadMultiPageRecord(final RID originalRID, BasePage page, int rec
881886
882887 return record ;
883888 }
889+
890+ private void writeMultiPageRecord (final Binary buffer , MutablePage currentPage , int newPosition , final int availableSpaceForChunk ) throws IOException {
891+ int bufferSize = buffer .size ();
892+
893+ // WRITE THE 1ST CHUNK
894+ int byteWritten = currentPage .writeNumber (newPosition , FIRST_CHUNK );
895+
896+ newPosition += byteWritten ;
897+
898+ // WRITE CHUNK SIZE
899+ int chunkSize = availableSpaceForChunk - byteWritten - INT_SERIALIZED_SIZE - LONG_SERIALIZED_SIZE ;
900+ currentPage .writeInt (newPosition , chunkSize );
901+
902+ newPosition += INT_SERIALIZED_SIZE ;
903+
904+ // SAVE THE POSITION OF THE POINTER TO THE NEXT CHUNK
905+ int nextChunkPointerOffset = newPosition ;
906+
907+ newPosition += LONG_SERIALIZED_SIZE ;
908+
909+ final byte [] content = buffer .getContent ();
910+ int contentOffset = buffer .getContentBeginOffset ();
911+
912+ currentPage .writeByteArray (newPosition , content , contentOffset , chunkSize );
913+
914+ bufferSize -= chunkSize ;
915+ contentOffset += chunkSize ;
916+
917+ // WRITE ALL THE REMAINING CHUNKS IN NEW PAGES
918+ int txPageCounter = getTotalPages ();
919+ while (bufferSize > 0 ) {
920+ MutablePage nextPage = null ;
921+ int recordIdInPage = 0 ;
922+ if (currentPage .getPageId ().getPageNumber () < txPageCounter - 1 ) {
923+ // LOOK IN THE LAST PAGE FOR SPACE
924+ final AvailableSpace availableSpace = checkForSpaceInPage (txPageCounter - 1 , false , bufferSize );
925+ if (!availableSpace .createNewPage ) {
926+ nextPage = database .getTransaction ().getPageToModify (availableSpace .page .pageId , pageSize , false );
927+ newPosition = availableSpace .newPosition ;
928+ recordIdInPage = availableSpace .recordCountInPage ;
929+
930+ nextPage .writeUnsignedInt (PAGE_RECORD_TABLE_OFFSET + availableSpace .recordCountInPage * INT_SERIALIZED_SIZE , newPosition );
931+ nextPage .writeShort (PAGE_RECORD_COUNT_IN_PAGE_OFFSET , (short ) (recordIdInPage + 1 ));
932+ }
933+ }
934+
935+ if (nextPage == null ) {
936+ // CREATE A NEW PAGE
937+ nextPage = database .getTransaction ().addPage (new PageId (file .getFileId (), txPageCounter ++), pageSize );
938+ newPosition = contentHeaderSize ;
939+ nextPage .writeUnsignedInt (PAGE_RECORD_TABLE_OFFSET , newPosition );
940+ nextPage .writeShort (PAGE_RECORD_COUNT_IN_PAGE_OFFSET , (short ) 1 );
941+ }
942+
943+ // WRITE IN THE PREVIOUS PAGE POINTER THE CURRENT POSITION OF THE NEXT CHUNK
944+ currentPage .writeLong (nextChunkPointerOffset , (long ) nextPage .getPageId ().getPageNumber () * maxRecordsInPage + recordIdInPage );
945+
946+ int spaceAvailableInCurrentPage = nextPage .getMaxContentSize () - newPosition ;
947+
948+ byteWritten = nextPage .writeNumber (newPosition , NEXT_CHUNK );
949+ spaceAvailableInCurrentPage -= byteWritten ;
950+
951+ // WRITE CHUNK SIZE
952+ chunkSize = spaceAvailableInCurrentPage - INT_SERIALIZED_SIZE - LONG_SERIALIZED_SIZE ;
953+ final boolean lastChunk = bufferSize < chunkSize ;
954+ if (lastChunk )
955+ // LAST CHUNK
956+ chunkSize = bufferSize ;
957+
958+ newPosition += byteWritten ;
959+ nextPage .writeInt (newPosition , chunkSize );
960+
961+ // SAVE THE POSITION OF THE POINTER TO THE NEXT CHUNK
962+ nextChunkPointerOffset = newPosition + INT_SERIALIZED_SIZE ;
963+ if (lastChunk )
964+ nextPage .writeLong (nextChunkPointerOffset , 0L );
965+
966+ newPosition += INT_SERIALIZED_SIZE + LONG_SERIALIZED_SIZE ;
967+
968+ nextPage .writeByteArray (newPosition , content , contentOffset , chunkSize );
969+
970+ bufferSize -= chunkSize ;
971+ contentOffset += chunkSize ;
972+ currentPage = nextPage ;
973+ }
974+ }
975+
976+ private AvailableSpace checkForSpaceInPage (final int pageNumber , final boolean isPlaceHolder , final int bufferSize ) throws IOException {
977+ final AvailableSpace result = new AvailableSpace ();
978+
979+ result .page = database .getTransaction ().getPage (new PageId (file .getFileId (), pageNumber ), pageSize );
980+
981+ result .recordCountInPage = result .page .readShort (PAGE_RECORD_COUNT_IN_PAGE_OFFSET );
982+ if (result .recordCountInPage >= maxRecordsInPage )
983+ // MAX NUMBER OF RECORDS IN A PAGE REACHED, USE A NEW PAGE
984+ result .createNewPage = true ;
985+ else if (result .recordCountInPage > 0 ) {
986+ // GET FIRST EMPTY POSITION
987+ final int lastRecordPositionInPage = (int ) result .page .readUnsignedInt (PAGE_RECORD_TABLE_OFFSET + (result .recordCountInPage - 1 ) * INT_SERIALIZED_SIZE );
988+ if (lastRecordPositionInPage == 0 )
989+ // CLEANED CORRUPTED RECORD
990+ result .createNewPage = true ;
991+ else {
992+ final long [] lastRecordSize = result .page .readNumberAndSize (lastRecordPositionInPage );
993+
994+ if (lastRecordSize [0 ] > 0 )
995+ // RECORD PRESENT, CONSIDER THE RECORD SIZE + VARINT SIZE
996+ result .newPosition = lastRecordPositionInPage + (int ) lastRecordSize [0 ] + (int ) lastRecordSize [1 ];
997+ else if (lastRecordSize [0 ] == RECORD_PLACEHOLDER_POINTER )
998+ // PLACEHOLDER, CONSIDER NEXT 9 BYTES
999+ result .newPosition = lastRecordPositionInPage + LONG_SERIALIZED_SIZE + 1 ;
1000+ else if (lastRecordSize [0 ] == FIRST_CHUNK || lastRecordSize [0 ] == NEXT_CHUNK ) {
1001+ // CHUNK
1002+ final int chunkSize = result .page .readInt ((int ) (lastRecordPositionInPage + lastRecordSize [1 ]));
1003+ result .newPosition = lastRecordPositionInPage + (int ) lastRecordSize [1 ] + INT_SERIALIZED_SIZE + LONG_SERIALIZED_SIZE + chunkSize ;
1004+ } else
1005+ // PLACEHOLDER CONTENT, CONSIDER THE RECORD SIZE (CONVERTED FROM NEGATIVE NUMBER) + VARINT SIZE
1006+ result .newPosition = lastRecordPositionInPage + (int ) (-1 * lastRecordSize [0 ]) + (int ) lastRecordSize [1 ];
1007+
1008+ final long spaceAvailableInCurrentPage = result .page .getMaxContentSize () - result .newPosition ;
1009+ int spaceNeeded = Binary .getNumberSpace (isPlaceHolder ? (-1L * bufferSize ) : bufferSize ) + bufferSize ;
1010+ if (spaceNeeded > spaceAvailableInCurrentPage && spaceAvailableInCurrentPage < MINIMUM_SPACE_LEFT_IN_PAGE )
1011+ // RECORD TOO BIG FOR THIS PAGE, USE A NEW PAGE
1012+ result .createNewPage = true ;
1013+ }
1014+ } else
1015+ // FIRST RECORD, START RIGHT AFTER THE HEADER
1016+ result .newPosition = contentHeaderSize ;
1017+
1018+ return result ;
1019+ }
8841020}
0 commit comments