CMPSC 473 - Project #2 - Multi-Threaded Sorting

Due Date: March 3, 2009. 100 points

Single person project. Do your own work!

In this project, you will write a program that takes an unsorted input and writes it to a file in the proper order. You will then take this program and enable it to work with multiple receiving threads. The threads have their own local store of buffers that they have read, but otherwise share access to the source pipe, destination file, the index of the current entry to write, and the index of the last entry.

Use an IPC subsystem (either pipe, IPC, or shared memory -- I used mypipe) from the Project 1 to send a file to a multi-threaded receiver process. The sender obtains a file consisting of a sequence of numbered lines (i.e., num line_text), but the lines are out of order. The sender forwards them to a receiver process that must print them to a file in order.

The receiver program will consist of six receiver threads and one main thread that creates the receivers. Each receiver thread runs the same code (rcv_file). When a thread receives an input line, it will store the input line and determine whether it has the next to be sent to the output file.

The program works as follows: type cse473-p2 input-file output-file at the prompt. The input files will be provided. The output file will be a sorted version of the input file (i.e., you can see that the output has been sequenced properly).

The project will consist of the following tasks:

  1. Download the following tarball Project 2 Code to your CSE account file space. You should have one file p2.tgz. There is a Makefile, which makes cse473-p2. I used the mypipe library from Project 1, but you may use the Linux IPC code instead. You can also use pipes, but they will require some reworking as you want to read only one line at a time.

  2. The sender process will be the same as in Project 1, although the interface in the main function is slightly modified. Any receiver must do a non-blocking read. In project 1, mypipe did a blocking read, but this may cause I thread to block there forever. Let Manu or I know if you need IPC code (it will cost you though).

  3. The main function in the receiver will call two functions that create and manage threads for this project: thread_pool_init, which initializes the tpool_t data structure, and run_pool_threads, which creates the threads that run the function rcv_file. Initially, create one such thread, but ultimately six threads must be run.

  4. The receiver will consist of three major functions, rcv_file, read_entry, and write_entry. Each thread will run rcv_file. Then, rcv_file will call read_entry to read and store an input line and then, write_entry to write a line to the output file. The pseudocode for single-threaded versions of these functions is provided below.

  5. When converting to a multi-threaded version, the following synchronization requirements are demanded:

    1. No data shared between threads may be accessed without mutual exclusion. One of these values is next_entry, the index of the next line to be written to the file. All threads must maintain this value, and if a thread has the next_entry, it must write it to the output file.

    2. A thread is allowed to read at most one entry before giving another thread a chance.

    3. If a thread does not have the next_entry in its store after reading one entry, then enable all waiting threads to run and wait (if it still does not have the next_entry).

    4. If a thread does have next_entry in its store, it will write that entry and keep writing entries until it finds a next_entry it does not have. Then, it will wait.

    5. If a thread is about to wait, it must check whether it has the next_entry right before it waits. Otherwise, a thread could wait when it can really write.

    6. When a thread reads a line, the line is placed in a store. These stores are of limited size, so the you must ensure that a store does not overflow. If a store becomes full, then you must cause thread to wait until it has the next_entry line before it can proceed. I will make sure that the input file does not cause all stores to fill.

  6. There are also a few hints for doing the project.

    1. Get the project pseudocode to work with only one thread (besides the main thread). Get thread_pool_init and run_pool_threads working to initiate a single thread. This will involve implementing the pseudocode (below) and the pthreads functionality to initialize, join, and terminate a thread. An interim submission of this code is required by 2/24 (see calendar).

    2. With a single thread, you can put the mutual exclusion code in to protect the shared data structures (requirement #1 above). You must identify these shared data structures and add the appropriate pthreads code for mutual exclusion. The variables for mutual exclusion are provided in the data structure tpool_t in cse473-pthread.h.

    3. Then, try two threads at a time. In this case, you will need to use a condition variable for a thread to wait and be awoken. The condition variable is also defined in tpool_t. With this condition, you should be able to handle requirements #2 and #3 above. Use a small test file to avoid running into space problems with the store (requirement #4).

      For debugging, it is useful to use printfs for thread decision points, and to use GDB. GDB is thread-aware. This helps a lot, but you will need the printfs too.

      Also, it is helpful to use printfs to show us that your program is working (switching among threads). Use the following code to print when you read and write entries:

        printf("=== read_entry[0x%x]: after read: buf: %s; bytes: %d\n",
      	 (unsigned)pthread_self(), buf, bytes);
        printf("=== write_entry[0x%x]: gonna write: buf: %s; bytes: %d\n",
               (unsigned)pthread_self(), buf, bytes);

      Only use "===" for these lines, so we can find them.

    4. Ultimately, you will need to run the receiver with six threads. Gradually increase the number of threads and run on a small test case. The variable RECEIVER_THREADS determines the number of threads.

    The pseudocode for the rcv_file, read_entry, and write_entry functions (single-threaded) are below. You will need to add code for mutual exclusion and condition variables to this. You are on your own for thread_pool_init and run_pool_threads.

         rcv_file( input_ipc, outfile )
           initialize( entry_store );
           initialize( next_entry = 1 );
           while ( more_to_receive OR more_to_write ) {
              if ( more_to_receive ) {
    	     read_entry( input_ipc, entry_store );
    	     determine if more_to_receive;
              get lowest_read from entry_store;
    	  while (TRUE) {
          	      if ( lowest_read == next_entry ) {
    	         write_entry( outfile, entry_store, lowest_read );
    	         incr next_entry
    	         get lowest_read
    	         determine if more_to_write 
    	         if ( !more_to_write ) break
              else break;
         read_entry( input_ipc, entry_store )
            initialize( buf, max_size );
            bytes = mypipe_read( input_ipc, buf, max_size );
            if ( buf has bytes ) {
              get_index( buf, index );
    	  check_for_eof( buf );
              store_entry( buf, index, bytes, entry_store );
              return index
    	else {
    	  dealloc( buf )
    	  return unusable index
         write_entry( outfile, entry_store, lowest_read )
           buf = get_entry_from_store( entry_store, lowest_read );
           write( outfile, buf );
           dealloc( buf );
  7. Please use the following test files. No program results need to be submitted.

  8. Please answer the following questions regarding the project:

    1. The fastest sorting algorithms take O(nlog n) time given n entries, but in this approach, we only need to read the entries and get the appropriate sequence of thread prints. What thread operation accounts for the additional time?

    2. Compare your use of condition variables and mutual exclusion to the monitor definition. What features does a monitor have and which does are implementation satisfy?

    3. If we could expand the store to arbitrary size and it was shared among all the threads, can you think of another way to implement the sorting of the lines? What kind of concurrency protections would be necessary to use this data structure?

  9. Your submission will consist of two things: (1) A tarball of your code, made from make tar and (2) answers to the project questions. We will run your code and look for correct solutions to the key parts of the program.

  10. Grading:

Trent Jaeger