SQOOP-2819: Use connector name instead of id in Repository.findJobsForConnector methods
authorColin Ma <colin@apache.org>
Sat, 6 Feb 2016 07:28:38 +0000 (15:28 +0800)
committerColin Ma <colin@apache.org>
Sat, 6 Feb 2016 07:28:38 +0000 (15:28 +0800)
 (Jarek Jarcec Cecho via Colin Ma)

core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
core/src/main/java/org/apache/sqoop/repository/Repository.java
core/src/test/java/org/apache/sqoop/repository/TestJdbcRepository.java
repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryHandler.java
repository/repository-common/src/main/java/org/apache/sqoop/repository/common/CommonRepositoryInsertUpdateDeleteSelectQuery.java
repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestJobHandling.java
repository/repository-mysql/src/test/java/org/apache/sqoop/integration/repository/mysql/TestJobHandling.java
repository/repository-postgresql/src/test/java/org/apache/sqoop/integration/repository/postgresql/TestJobHandling.java
server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java

index 1b7cd2e..5b70f95 100644 (file)
@@ -519,11 +519,11 @@ public class JdbcRepository extends Repository {
    */
   @SuppressWarnings("unchecked")
   @Override
-  public List<MJob> findJobsForConnectorUpgrade(final long connectorId) {
+  public List<MJob> findJobsForConnectorUpgrade(final String connectorName) {
     return (List<MJob>) doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) throws Exception {
-        return handler.findJobsForConnectorUpgrade(connectorId, conn);
+        return handler.findJobsForConnectorUpgrade(connectorName, conn);
       }
     });
   }
@@ -533,11 +533,11 @@ public class JdbcRepository extends Repository {
    */
   @SuppressWarnings("unchecked")
   @Override
-  public List<MJob> findJobsForConnector(final long connectorId) {
+  public List<MJob> findJobsForConnector(final String connectorName) {
     return (List<MJob>) doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) throws Exception {
-        return handler.findJobsForConnector(connectorId, conn);
+        return handler.findJobsForConnector(connectorName, conn);
       }
     });
   }
index 7047be9..feab2ad 100644 (file)
@@ -101,20 +101,20 @@ public abstract class JdbcRepositoryHandler {
    * Retrieve jobs which use the given link and derive their structure
    * entirely from the repository.
    *
-   * @param connectorId Connector ID whose jobs should be fetched
+   * @param connectorName Connector ID whose jobs should be fetched
    * @param conn JDBC link for querying repository
    * @return List of MJobs that use <code>linkID</code>.
    */
-  public abstract List<MJob> findJobsForConnectorUpgrade(long c, Connection conn);
+  public abstract List<MJob> findJobsForConnectorUpgrade(String connectorName, Connection conn);
 
   /**
    * Retrieve jobs which use the given link.
    *
-   * @param connectorId Connector ID whose jobs should be fetched
+   * @param connectorName Connector ID whose jobs should be fetched
    * @param conn JDBC link for querying repository
    * @return List of MJobs that use <code>linkID</code>.
    */
-  public abstract List<MJob> findJobsForConnector(long c, Connection conn);
+  public abstract List<MJob> findJobsForConnector(String connectorName, Connection conn);
 
   /**
    * Upgrade the connector with the new data supplied in the <tt>newConnector</tt>.
index 1f7bcfd..03989a3 100644 (file)
@@ -266,18 +266,18 @@ public abstract class Repository {
    * Retrieve jobs which use the given link deriving structure entirely from
    * the repository (rather than the connector itself).
    *
-   * @param connectorId Connector ID whose jobs should be fetched
+   * @param connectorName Connector ID whose jobs should be fetched
    * @return List of MJobs that use <code>linkID</code>.
    */
-  public abstract List<MJob> findJobsForConnectorUpgrade(long connectorId);
+  public abstract List<MJob> findJobsForConnectorUpgrade(String connectorName);
 
   /**
    * Retrieve jobs which use the given link.
    *
-   * @param connectorId Connector ID whose jobs should be fetched
+   * @param connectorName Connector ID whose jobs should be fetched
    * @return List of MJobs that use <code>linkID</code>.
    */
-  public abstract List<MJob> findJobsForConnector(long connectorId);
+  public abstract List<MJob> findJobsForConnector(String connectorName);
 
   /**
    * Create new submission record in repository.
@@ -437,7 +437,7 @@ public abstract class Repository {
       // 2. Get all links associated with the connector.
       List<MLink> existingLinksByConnector = findLinksForConnectorUpgrade(oldConnectorName);
       // 3. Get all jobs associated with the connector.
-      List<MJob> existingJobsByConnector = findJobsForConnectorUpgrade(connectorId);
+      List<MJob> existingJobsByConnector = findJobsForConnectorUpgrade(oldConnectorName);
       // -- BEGIN TXN --
       tx = getTransaction();
       tx.begin();
index d26ce71..2812de6 100644 (file)
@@ -234,7 +234,7 @@ public class TestJdbcRepository {
 
     // mock necessary methods for upgradeConnector() procedure
     doReturn(linkList).when(repoSpy).findLinksForConnectorUpgrade(anyString());
-    doReturn(jobList).when(repoSpy).findJobsForConnectorUpgrade(anyLong());
+    doReturn(jobList).when(repoSpy).findJobsForConnectorUpgrade(anyString());
     doNothing().when(repoSpy).updateLink(any(MLink.class), any(RepositoryTransaction.class));
     doNothing().when(repoSpy).updateJob(any(MJob.class), any(RepositoryTransaction.class));
     doNothing().when(repoSpy).upgradeConnectorAndConfigs(any(MConnector.class), any(RepositoryTransaction.class));
@@ -246,7 +246,7 @@ public class TestJdbcRepository {
     InOrder upgraderOrder = inOrder(connectorUpgraderMock);
 
     repoOrder.verify(repoSpy, times(1)).findLinksForConnectorUpgrade(anyString());
-    repoOrder.verify(repoSpy, times(1)).findJobsForConnectorUpgrade(anyLong());
+    repoOrder.verify(repoSpy, times(1)).findJobsForConnectorUpgrade(anyString());
     repoOrder.verify(repoSpy, times(1)).getTransaction();
     repoOrder.verify(repoSpy, times(1)).deleteJobInputs("JA", repoTransactionMock);
     repoOrder.verify(repoSpy, times(1)).deleteJobInputs("JB", repoTransactionMock);
@@ -398,15 +398,14 @@ public class TestJdbcRepository {
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "find jobs for connector error.");
-    doThrow(exception).when(repoHandlerMock).findJobsForConnectorUpgrade(anyLong(), any(Connection.class));
+    doThrow(exception).when(repoHandlerMock).findJobsForConnectorUpgrade(anyString(), any(Connection.class));
 
     try {
       repoSpy.upgradeConnector(oldConnector, newConnector);
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
       verify(repoHandlerMock, times(1)).findLinksForConnectorUpgrade(anyString(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).findJobsForConnectorUpgrade(anyLong()
-        , any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobsForConnectorUpgrade(anyString(), any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);
       return ;
     }
@@ -431,8 +430,7 @@ public class TestJdbcRepository {
     List<MJob> jobList = jobs(job(1, "JA", "fromConnectorName", "toConnectorName", "linkName1", "linkName1"),
             job(2, "JB", "fromConnectorName", "toConnectorName", "linkName2", "linkName1"));
     doReturn(linkList).when(repoHandlerMock).findLinksForConnectorUpgrade(anyString(), any(Connection.class));
-    doReturn(jobList).when(repoHandlerMock).findJobsForConnectorUpgrade
-      (anyLong(), any(Connection.class));
+    doReturn(jobList).when(repoHandlerMock).findJobsForConnectorUpgrade(anyString(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
         "delete job inputs for connector error.");
@@ -443,8 +441,7 @@ public class TestJdbcRepository {
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
       verify(repoHandlerMock, times(1)).findLinksForConnectorUpgrade(anyString(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).findJobsForConnectorUpgrade(anyLong()
-        , any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobsForConnectorUpgrade(anyString(), any(Connection.class));
       verify(repoHandlerMock, times(1)).deleteJobInputs(anyString(), any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);
       return ;
@@ -470,7 +467,7 @@ public class TestJdbcRepository {
     List<MJob> jobList = jobs(job(1, "JA", "fromConnectorName", "toConnectorName", "linkName1", "linkName1"),
             job(2, "JB", "fromConnectorName", "toConnectorName", "linkName2", "linkName1"));
     doReturn(linkList).when(repoHandlerMock).findLinksForConnectorUpgrade(anyString(), any(Connection.class));
-    doReturn(jobList).when(repoHandlerMock).findJobsForConnectorUpgrade(anyLong(), any(Connection.class));
+    doReturn(jobList).when(repoHandlerMock).findJobsForConnectorUpgrade(anyString(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class));
 
     SqoopException exception = new SqoopException(RepositoryError.JDBCREPO_0000,
@@ -482,7 +479,7 @@ public class TestJdbcRepository {
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
       verify(repoHandlerMock, times(1)).findLinksForConnectorUpgrade(anyString(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).findJobsForConnectorUpgrade(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobsForConnectorUpgrade(anyString(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyString(), any(Connection.class));
       verify(repoHandlerMock, times(1)).deleteLinkInputs(anyString(), any(Connection.class));
       verifyNoMoreInteractions(repoHandlerMock);
@@ -509,7 +506,7 @@ public class TestJdbcRepository {
     List<MJob> jobList = jobs(job(1, "JA", "fromConnectorName", "toConnectorName", "linkName1", "linkName1"),
             job(2, "JB", "fromConnectorName", "toConnectorName", "linkName2", "linkName1"));
     doReturn(linkList).when(repoHandlerMock).findLinksForConnectorUpgrade(anyString(), any(Connection.class));
-    doReturn(jobList).when(repoHandlerMock).findJobsForConnectorUpgrade(anyLong(), any(Connection.class));
+    doReturn(jobList).when(repoHandlerMock).findJobsForConnectorUpgrade(anyString(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteLinkInputs(anyString(), any(Connection.class));
 
@@ -522,7 +519,7 @@ public class TestJdbcRepository {
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
       verify(repoHandlerMock, times(1)).findLinksForConnectorUpgrade(anyString(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).findJobsForConnectorUpgrade(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobsForConnectorUpgrade(anyString(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyString(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteLinkInputs(anyString(), any(Connection.class));
       verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
@@ -552,7 +549,7 @@ public class TestJdbcRepository {
     List<MJob> jobList = jobs(job(1, "JA", "fromConnectorName", "toConnectorName", "linkName1", "linkName1"),
             job(2, "JB", "fromConnectorName", "toConnectorName", "linkName2", "linkName1"));
     doReturn(linkList).when(repoHandlerMock).findLinksForConnectorUpgrade(anyString(), any(Connection.class));
-    doReturn(jobList).when(repoHandlerMock).findJobsForConnectorUpgrade(anyLong(), any(Connection.class));
+    doReturn(jobList).when(repoHandlerMock).findJobsForConnectorUpgrade(anyString(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteLinkInputs(anyString(), any(Connection.class));
     doNothing().when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
@@ -566,7 +563,7 @@ public class TestJdbcRepository {
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
       verify(repoHandlerMock, times(1)).findLinksForConnectorUpgrade(anyString(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).findJobsForConnectorUpgrade(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobsForConnectorUpgrade(anyString(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyString(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteLinkInputs(anyString(), any(Connection.class));
       verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
@@ -595,7 +592,7 @@ public class TestJdbcRepository {
     List<MJob> jobList = jobs(job(1, "JA", "A1", "A1", "linkName1", "linkName1"),
             job(2, "JB", "A1", "A1", "linkName2", "linkName1"));
     doReturn(linkList).when(repoHandlerMock).findLinksForConnectorUpgrade(anyString(), any(Connection.class));
-    doReturn(jobList).when(repoHandlerMock).findJobsForConnectorUpgrade(anyLong(), any(Connection.class));
+    doReturn(jobList).when(repoHandlerMock).findJobsForConnectorUpgrade(anyString(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteJobInputs(anyString(), any(Connection.class));
     doNothing().when(repoHandlerMock).deleteLinkInputs(anyString(), any(Connection.class));
     doNothing().when(repoHandlerMock).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
@@ -611,7 +608,7 @@ public class TestJdbcRepository {
     } catch (SqoopException ex) {
       assertEquals(ex.getMessage(), exception.getMessage());
       verify(repoHandlerMock, times(1)).findLinksForConnectorUpgrade(anyString(), any(Connection.class));
-      verify(repoHandlerMock, times(1)).findJobsForConnectorUpgrade(anyLong(), any(Connection.class));
+      verify(repoHandlerMock, times(1)).findJobsForConnectorUpgrade(anyString(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteJobInputs(anyString(), any(Connection.class));
       verify(repoHandlerMock, times(2)).deleteLinkInputs(anyString(), any(Connection.class));
       verify(repoHandlerMock, times(1)).upgradeConnectorAndConfigs(any(MConnector.class), any(Connection.class));
index 5490324..15cc41b 100644 (file)
@@ -186,15 +186,15 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler {
    * {@inheritDoc}
    */
   @Override
-  public List<MJob> findJobsForConnectorUpgrade(long connectorId, Connection conn) {
+  public List<MJob> findJobsForConnectorUpgrade(String connectorName, Connection conn) {
     try (PreparedStatement stmt = conn.prepareStatement(crudQueries.getStmtSelectAllJobsForConnectorConfigurable())) {
 
-      stmt.setLong(1, connectorId);
-      stmt.setLong(2, connectorId);
+      stmt.setString(1, connectorName);
+      stmt.setString(2, connectorName);
       return loadJobsForUpgrade(stmt, conn);
 
     } catch (SQLException ex) {
-      logException(ex, connectorId);
+      logException(ex, connectorName);
       throw new SqoopException(CommonRepositoryError.COMMON_0028, ex);
     }
   }
@@ -203,15 +203,15 @@ public abstract class CommonRepositoryHandler extends JdbcRepositoryHandler {
    * {@inheritDoc}
    */
   @Override
-  public List<MJob> findJobsForConnector(long connectorId, Connection conn) {
+  public List<MJob> findJobsForConnector(String connectorName, Connection conn) {
     try (PreparedStatement stmt = conn.prepareStatement(crudQueries.getStmtSelectAllJobsForConnectorConfigurable())) {
 
-      stmt.setLong(1, connectorId);
-      stmt.setLong(2, connectorId);
+      stmt.setString(1, connectorName);
+      stmt.setString(2, connectorName);
       return loadJobs(stmt, conn);
 
     } catch (SQLException ex) {
-      logException(ex, connectorId);
+      logException(ex, connectorName);
       throw new SqoopException(CommonRepositoryError.COMMON_0028, ex);
     }
   }
index 58404d7..ae16b85 100644 (file)
@@ -490,8 +490,8 @@ public class CommonRepositoryInsertUpdateDeleteSelectQuery {
   // DML: Select all jobs for a Connector
   private static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR_CONFIGURABLE =
       STMT_SELECT_JOB_ALL +
-          " WHERE FROM_CONNECTOR." + CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNK_CONFIGURABLE) + " = ?" +
-          " OR TO_CONNECTOR." + CommonRepoUtils.escapeColumnName(COLUMN_SQ_LNK_CONFIGURABLE) + " = ?";
+          " WHERE FROM_CONF_NAME." + CommonRepoUtils.escapeColumnName(COLUMN_SQC_NAME) + " = ?" +
+          " OR TO_CONF_NAME." + CommonRepoUtils.escapeColumnName(COLUMN_SQC_NAME) + " = ?";
 
   /**
    * *******SUBMISSION TABLE *************
index ad8946b..2fd2334 100644 (file)
@@ -120,7 +120,7 @@ public class TestJobHandling extends DerbyTestCase {
     loadJobsForLatestVersion();
 
     // Load all 4 jobs on loaded repository
-    list = handler.findJobsForConnector(1, derbyConnection);
+    list = handler.findJobsForConnector("A", derbyConnection);
     assertEquals(4, list.size());
 
     assertEquals("JA0", list.get(0).getName());
@@ -138,7 +138,7 @@ public class TestJobHandling extends DerbyTestCase {
     loadJobsForLatestVersion();
 
     // Load all 4 jobs on loaded repository
-    list = handler.findJobsForConnectorUpgrade(1, derbyConnection);
+    list = handler.findJobsForConnectorUpgrade("A", derbyConnection);
     assertEquals(4, list.size());
 
     assertEquals("JA0", list.get(0).getName());
@@ -155,7 +155,7 @@ public class TestJobHandling extends DerbyTestCase {
     assertEquals(0, list.size());
     loadJobsForLatestVersion();
 
-    list = handler.findJobsForConnectorUpgrade(11, derbyConnection);
+    list = handler.findJobsForConnectorUpgrade("NONEXISTING", derbyConnection);
     assertEquals(0, list.size());
   }
 
index 9109212..7da0b1b 100644 (file)
@@ -142,10 +142,7 @@ public class TestJobHandling extends MySqlTestCase {
 
   @Test
   public void testFindJobsByConnector() throws Exception {
-    List<MJob> list = handler
-        .findJobsForConnectorUpgrade(
-          handler.findConnector("A", provider.getConnection())
-            .getPersistenceId(), provider.getConnection());
+    List<MJob> list = handler.findJobsForConnectorUpgrade("A", provider.getConnection());
     List<String> jobNames = new ArrayList<String>();
     for (MJob job : list) {
       jobNames.add(job.getName());
@@ -157,8 +154,7 @@ public class TestJobHandling extends MySqlTestCase {
 
   @Test
   public void testFindJobsForNonExistingConnector() throws Exception {
-    List<MJob> list = handler
-        .findJobsForConnectorUpgrade(11, provider.getConnection());
+    List<MJob> list = handler.findJobsForConnectorUpgrade("NONEXISTING", provider.getConnection());
     assertEquals(0, list.size());
   }
 
index 6636bd3..242684c 100644 (file)
@@ -137,9 +137,7 @@ public class TestJobHandling extends PostgresqlTestCase {
 
   @Test
   public void testFindJobsByConnector() throws Exception {
-    List<MJob> list = handler.findJobsForConnectorUpgrade(
-      handler.findConnector("A", provider.getConnection()).getPersistenceId(),
-      provider.getConnection());
+    List<MJob> list = handler.findJobsForConnectorUpgrade("A", provider.getConnection());
     assertEquals(2, list.size());
     assertEquals(JOB_A_NAME, list.get(0).getName());
     assertEquals(JOB_B_NAME, list.get(1).getName());
@@ -147,8 +145,7 @@ public class TestJobHandling extends PostgresqlTestCase {
 
   @Test
   public void testFindJobsForNonExistingConnector() throws Exception {
-    List<MJob> list = handler.findJobsForConnectorUpgrade(11, provider
-      .getConnection());
+    List<MJob> list = handler.findJobsForConnectorUpgrade("NONEXISTING", provider.getConnection());
     assertEquals(0, list.size());
   }
 
index 9dfcb0b..9d259e1 100644 (file)
@@ -272,8 +272,7 @@ public class JobRequestHandler implements RequestHandler {
       connectorIdentifier = ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM);
       AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
           ctx.getRequest().getRemoteAddr(), "get", "jobsByConnector", connectorIdentifier);
-      MConnector mConnector = HandlerUtils.getConnectorFromConnectorName(connectorIdentifier);
-      List<MJob> jobList = repository.findJobsForConnector(mConnector.getPersistenceId());
+      List<MJob> jobList = repository.findJobsForConnector(connectorIdentifier);
 
       // Authorization check
       jobList = AuthorizationEngine.filterResource(ctx.getUserName(), MResource.TYPE.JOB, jobList);